Sim-sim open: 目の前で私を認識するように玄関に教えた方法

新しいインターホンの設置を知らせるノックがドアをノックされたとき、金曜日のリモートワークの日はすでに終わりを告げていました。新しいインターコムに、家にいなくても電話に出られるモバイル アプリケーションがあることを知ったとき、私は興味を持ち、すぐにそれを電話にダウンロードしました。ログインした後、このアプリケーションの興味深い機能を発見しました。アパートに電話をかけなくても、いつでもインターホン カメラをのぞいてドアを開けられるというものです。「はい、入口ドアのオンラインARIです!」- 頭の中でクリックしました。来週末の運命は封印された。





記事の最後にビデオ デモンストレーション。





映画「フィフス・エレメント」より
映画「フィフス・エレメント」より

免責事項

. , - — .





API

, , . - — , . lkit — , http(s) Android .





— Android- Certificate authority , . , Android 7 .





root , Android, Android Studio. ADB , Certificate pinning .





HTTP Toolkit への正常な接続
HTTP Toolkit

, — , .





ドアオープンリクエスト

:





  1. : POST /rest/v1/places/{place_id}/accesscontrols/{control_id}/actions



    JSON- {"name": "accessControlOpen"}







  2. () : GET /rest/v1/places/{place_id}/accesscontrols/{control_id}/videosnapshots







  3. : GET /rest/v1/forpost/cameras/{camera_id}/video?LightStream=0







HTTP Authorization — , . Advanced REST Client, , Authorization API , , .





Python requests



, :





HEADERS = {"Authorization": "Bearer ###"}
ACTION_URL = "https://###.ru/rest/v1/places/###/accesscontrols/###/"
VIDEO_URL = "https://###.ru/rest/v1/forpost/cameras/###/video?LightStream=0"

def get_image():
    result = requests.get(f'{ACTION_URL}/videosnapshots', headers=HEADERS)
    if result.status_code != 200:
        logging.error(f"Failed to get an image with status code {result.status_code}")
        return None
    logging.warning(f"Image received successfully in {result.elapsed.total_seconds()}sec")
    return result.content

def open_door():
    result = requests.post(
        f'{ACTION_URL}/actions', headers=HEADERS, json={"name": "accessControlOpen"})
    if result.status_code != 200:
        logging.error(f"Failed to open the door with status code {result.status_code}")
        return False
    logging.warning(f"Door opened successfully in {result.elapsed.total_seconds()}sec")
    return True

def get_videostream_link():
    result = requests.get(VIDEO_URL, headers=HEADERS)
    if result.status_code != 200:
        logging.error(f"Failed to get stream link with status code {result.status_code}")
        return False
    logging.warning(f"Stream link received successfully in {result.elapsed.total_seconds()}sec")
    return result.json()['data']['URL']

      
      



, — Intel(R) Xeon(R) CPU E5-2650L v3 @ 1.80GHz



, 1GB 0 GPU. , , .





, . OpenVINO Toolkit — Intel, CPU.





Interactive Face Recognition Demo — , . , - 2020.3, pip 2021.1. OpenVINO .





, . ( ), , , :





class ImageProcessor:
    def __init__(self):
        self.frame_processor = FrameProcessor()

    def process(self, image):
        detections = self.frame_processor.process(image)
        labels = []
        for roi, landmarks, identity in zip(*detections):
            label = self.frame_processor.face_identifier.get_identity_label(
                identity.id)
            labels.append(label)
        return labels
      
      



. , get_image()



.





100 runs on an image with known face:
Total time: 7.356s
Time per frame: 0.007s
FPS: 135.944

100 runs on an image without faces:
Total time: 2.985s
Time per frame: 0.003s
FPS: 334.962
      
      



, .





1 FPS:

, , . , MVP get_image()



.





class ImageProcessor:
		# <...>
    def process_single_image(self, image):
        nparr = np.fromstring(image, np.uint8)
        img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        labels = self.process(img_np)
        return labels

def snapshot_based_intercom_id():
    processor = ImageProcessor()

    last_open_door_time = time.time()
    while True:
        start_time = time.time()
        image = get_image()
        result = processor.process_single_image(image)
        logging.info(f'{result} in {time.time() - start_time}s')
        # Successfull detections are "face{N}"
        if any(['face' in res for res in result]):
            if start_time - last_open_door_time > 5:
                open_door()
                with open(f'images/{start_time}_OK.jfif', 'wb') as f:
                    f.write(image)
                last_open_door_time = start_time
      
      



, , . , .. .





認識成功の瞬間、個別画像処理版
,

! , . , — , .. API . , 0.7 0.6 , .





30 FPS:

:





vcap = cv2.VideoCapture(link)
success, frame = vcap.read()
      
      



, 30 FPS. : read()



. , , , . , , 30 — , .





: vcap.set(CV_CAP_PROP_BUFFERSIZE, 0);



. , OpenCV 3.4, - , . , StackOverflow — , ( , ).





ImageProcessor



3 :





class CameraBufferCleanerThread(threading.Thread):
    def __init__(self, camera, name='camera-buffer-cleaner-thread'):
        self.camera = camera
        self.last_frame = None
        self.finished = False
        super(CameraBufferCleanerThread, self).__init__(name=name)
        self.start()

    def run(self):
        while not self.finished:
            ret, self.last_frame = self.camera.read()

    def __enter__(self): return self

    def __exit__(self, type, value, traceback):
        self.finished = True
        self.join()

class ImageProcessor:
		# <...>
    def process_stream(self, link):
        vcap = cv2.VideoCapture(link)
        interval = 0.3 # ~3 FPS
        with CameraBufferCleanerThread(vcap) as cam_cleaner:
            while True:
                frame = cam_cleaner.last_frame
                if frame is not None:
                    yield (self.process(frame), frame)
                else:
                    yield (None, None)
                time.sleep(interval)
      
      



snapshot_based_intercom_id



:





def stream_based_intercom_id():
    processor = ImageProcessor()

    link = get_videostream_link()
    # To notify about delays
    last_time = time.time()
    last_open_door_time = time.time()
    for result, np_image in processor.process_stream(link):
        current_time = time.time()
        delta_time = current_time - last_time
        if delta_time < 1:
            logging.info(f'{result} in {delta_time}')
        else:
            logging.warning(f'{result} in {delta_time}')
        last_time = current_time
        if result is None:
            continue
        if any(['face' in res for res in result]):
            if current_time - last_open_door_time > 5:
                logging.warning(
                  	f'Hey, I know you - {result[0]}! Opening the door...')
                last_open_door_time = current_time
                open_door()
                cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
      
      



— , .





認識成功の瞬間、動画ストリーム処理ありバージョン
,

Telegram

/. .





python-telegram-bot



, callback / .





class TelegramInterface:
    def __init__(self, login_whitelist, state_callback):
        self.state_callback = state_callback
        self.login_whitelist = login_whitelist
        self.updater = Updater(
            token = "###", use_context = True)
        self.run()

    def run(self):
        dispatcher = self.updater.dispatcher
        dispatcher.add_handler(CommandHandler("start", self.start))
        dispatcher.add_handler(CommandHandler("run", self.run_intercom))
        dispatcher.add_handler(CommandHandler("stop", self.stop_intercom))

        self.updater.start_polling()

    def run_intercom(self, update: Update, context: CallbackContext):
        user = update.message.from_user
        update.message.reply_text(
            self.state_callback(True) if user.username in self.login_whitelist else 'not allowed',
            reply_to_message_id=update.message.message_id)

    def stop_intercom(self, update: Update, context: CallbackContext):
        user = update.message.from_user
        update.message.reply_text(
            self.state_callback(False) if user.username in self.login_whitelist else 'not allowed',
            reply_to_message_id=update.message.message_id)

    def start(self, update: Update, context: CallbackContext) -> None:
        update.message.reply_text('Hi!')
        
        
class TelegramBotThreadWrapper(threading.Thread):
    def __init__(self, state_callback, name='telegram-bot-wrapper'):
        self.whitelist = ["###", "###"]
        self.state_callback = state_callback
        super(TelegramBotThreadWrapper, self).__init__(name=name)
        self.start()

    def run(self):
        self.bot = TelegramInterface(self.whitelist, self.state_callback)

      
      



intercom_id



, :





def stream_based_intercom_id_with_telegram():
    processor = ImageProcessor()

    loop_state_lock = threading.Lock()

    loop_should_run = False
    loop_should_change_state_cv = threading.Condition(loop_state_lock)

    is_loop_finished = True
    loop_changed_state_cv = threading.Condition(loop_state_lock)

    def stream_processing_loop():
        nonlocal loop_should_run
        nonlocal loop_should_change_state_cv
        nonlocal is_loop_finished
        nonlocal loop_changed_state_cv

        while True:
            with loop_should_change_state_cv:
                loop_should_change_state_cv.wait_for(lambda: loop_should_run)
                is_loop_finished = False
                loop_changed_state_cv.notify_all()
                logging.warning(f'Loop is started')
            link = get_videostream_link()
            last_time = time.time()
            last_open_door_time = time.time()
            for result, np_image in processor.process_stream(link):
                with loop_should_change_state_cv:
                    if not loop_should_run:
                        is_loop_finished = True
                        loop_changed_state_cv.notify_all()
                        logging.warning(f'Loop is stopped')
                        break
                current_time = time.time()
                delta_time = current_time - last_time
                if delta_time < 1:
                    logging.info(f'{result} in {delta_time}')
                else:
                    logging.warning(f'{result} in {delta_time}')
                last_time = current_time
                if result is None:
                    continue
                if any(['face' in res for res in result]):
                    if current_time - last_open_door_time > 5:
                        logging.warning(f'Hey, I know you - {result[0]}! Opening the door...')
                        last_open_door_time = current_time
                        open_door()
                        cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)

    def state_callback(is_running):
        nonlocal loop_should_run
        nonlocal loop_should_change_state_cv
        nonlocal is_loop_finished
        nonlocal loop_changed_state_cv

        with loop_should_change_state_cv:
            if is_running == loop_should_run:
                return "Intercom service state is not changed"
            loop_should_run = is_running
            if loop_should_run:
                loop_should_change_state_cv.notify_all()
                loop_changed_state_cv.wait_for(lambda: not is_loop_finished)
                return "Intercom service is up"
            else:
                loop_changed_state_cv.wait_for(lambda: is_loop_finished)
                return "Intercom service is down"

    telegram_bot = TelegramBotThreadWrapper(state_callback)
    logging.warning("Bot is ready")
    stream_processing_loop()
      
      



:





スマート インターコム テクノロジーが住民にもたらす可能性にもかかわらず、カメラとマイクを備えた何百 (何千?) のドライブウェイ ドア (はい、ランダムに受信したビデオ ストリームに音声が含まれています!) は、プライバシー侵害の新たな機会を生み出しています。





ビデオ ストリームへのアクセスは、アパートへの呼び出し時にのみ提供され、違反を開示する手段として位置付けられた進行中の 3 日間の記録は、会社のサーバーではなく、インターホンに直接保存されることを望みます。 、リクエストに応じてアクセスできます。または、まったくありません。








All Articles