From eeb29dfd90e605998d5736e53cfb027bc975eb16 Mon Sep 17 00:00:00 2001 From: sosokker Date: Mon, 13 May 2024 15:23:55 +0700 Subject: [PATCH] Make action detection run in background all the time --- StreamServer/src/routers/video.py | 66 +++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/StreamServer/src/routers/video.py b/StreamServer/src/routers/video.py index 2daf074..3188934 100644 --- a/StreamServer/src/routers/video.py +++ b/StreamServer/src/routers/video.py @@ -1,6 +1,7 @@ import os import cv2 import time +from fastapi.websockets import WebSocketState import httpx from cv2 import VideoCapture, imencode @@ -9,7 +10,7 @@ from datetime import datetime from threading import Thread from database import SessionLocal -from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, Depends, FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.responses import StreamingResponse from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -18,6 +19,7 @@ from database import minio_client from config import TEMP_VIDEO_FILE, VIDEO_BUCKET, LINE_NOTIFY_TOKEN from scheme import Camera from utils import save_to_config, read_cameras_from_config +from models import ActionData from analytic.action.action_model import ActionModel @@ -27,18 +29,38 @@ jobstores = { } scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Asia/Bangkok') -@asynccontextmanager -async def lifespan(application: FastAPI): - scheduler.start() - yield - scheduler.shutdown() - -router = APIRouter() +#Dependency +def get_db(): + db = SessionLocal() + try : + yield db + finally: + db.close() cameras: list[Camera] = read_cameras_from_config('config.json') +# ------ Action Model ------ action_model = ActionModel() +def run_action_model_continuously(): + while True: + for camera in cameras: + if camera.status: + action_model.run_action_model(camera.link) + time.sleep(30) + +action_thread = Thread(target=run_action_model_continuously, daemon=True) + +@asynccontextmanager +async def lifespan(application: FastAPI): + scheduler.start() + action_thread.start() + yield + scheduler.shutdown() + action_thread.join() + +router = APIRouter() + # --- UTILITY FUNCTIONS --- def generate_camera_id() -> int: @@ -89,14 +111,32 @@ def check_camera_status(): cap.release() save_to_config(key="cameras", value=cameras) +@scheduler.scheduled_job('interval', seconds=10) +def add_action_to_database(): + """Add action data to the database""" + global cameras + for camera in cameras: + if camera.status: + try: + for i in range(10): + action = ActionData(action=action_model.ACTION_LIST.pop(i), timestamp=datetime.now()) + db = SessionLocal() + db.add(action) + db.commit() + db.close() + action_model.ACTION_LIST.clear() + except Exception as e: + print(f"Failed to add action data to database: {e}") + continue @scheduler.scheduled_job('interval', seconds=10) def check_falldown_action(): """If action Fall Down is detected, notify the user""" - if "Fall Down" in action_model.ACTION_LIST: + if action_model.IS_FALL_DOWN: # send post to https://notify-api.line.me/api/notify # Authorization: Bearer use LINE_NOTIFY_TOKEN (ChannelAccessToken) httpx.post("https://notify-api.line.me/api/notify", headers={"Authorization": f"Bearer {LINE_NOTIFY_TOKEN}"}, data={"message": "Fall Down detected!"}) + action_model.IS_FALL_DOWN = False # --- ROUTER ENDPOINTS --- @@ -192,14 +232,14 @@ async def websocket_endpoint(camera_id: int, websocket: WebSocket): while cap.isOpened(): ret, frame = cap.read() if not ret: - await websocket.close(code=1000) break - + ret, buffer = imencode('.png', frame) if not ret: - await websocket.close(code=1000) break - await websocket.send_bytes(buffer.tobytes()) + + if websocket.client_state != WebSocketState.DISCONNECTED: + await websocket.send_bytes(buffer.tobytes()) except WebSocketDisconnect: print("WebSocket disconnected") except Exception as e: