Make action detection run in background all the time

This commit is contained in:
sosokker 2024-05-13 15:23:55 +07:00
parent c3554dad3c
commit eeb29dfd90

View File

@ -1,6 +1,7 @@
import os import os
import cv2 import cv2
import time import time
from fastapi.websockets import WebSocketState
import httpx import httpx
from cv2 import VideoCapture, imencode from cv2 import VideoCapture, imencode
@ -9,7 +10,7 @@ from datetime import datetime
from threading import Thread from threading import Thread
from database import SessionLocal from database import SessionLocal
from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi import APIRouter, Depends, FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
@ -18,6 +19,7 @@ from database import minio_client
from config import TEMP_VIDEO_FILE, VIDEO_BUCKET, LINE_NOTIFY_TOKEN from config import TEMP_VIDEO_FILE, VIDEO_BUCKET, LINE_NOTIFY_TOKEN
from scheme import Camera from scheme import Camera
from utils import save_to_config, read_cameras_from_config from utils import save_to_config, read_cameras_from_config
from models import ActionData
from analytic.action.action_model import ActionModel from analytic.action.action_model import ActionModel
@ -27,18 +29,38 @@ jobstores = {
} }
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Asia/Bangkok') scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Asia/Bangkok')
@asynccontextmanager #Dependency
async def lifespan(application: FastAPI): def get_db():
scheduler.start() db = SessionLocal()
yield try :
scheduler.shutdown() yield db
finally:
router = APIRouter() db.close()
cameras: list[Camera] = read_cameras_from_config('config.json') cameras: list[Camera] = read_cameras_from_config('config.json')
# ------ Action Model ------
action_model = ActionModel() action_model = ActionModel()
def run_action_model_continuously():
while True:
for camera in cameras:
if camera.status:
action_model.run_action_model(camera.link)
time.sleep(30)
action_thread = Thread(target=run_action_model_continuously, daemon=True)
@asynccontextmanager
async def lifespan(application: FastAPI):
scheduler.start()
action_thread.start()
yield
scheduler.shutdown()
action_thread.join()
router = APIRouter()
# --- UTILITY FUNCTIONS --- # --- UTILITY FUNCTIONS ---
def generate_camera_id() -> int: def generate_camera_id() -> int:
@ -89,14 +111,32 @@ def check_camera_status():
cap.release() cap.release()
save_to_config(key="cameras", value=cameras) save_to_config(key="cameras", value=cameras)
@scheduler.scheduled_job('interval', seconds=10)
def add_action_to_database():
"""Add action data to the database"""
global cameras
for camera in cameras:
if camera.status:
try:
for i in range(10):
action = ActionData(action=action_model.ACTION_LIST.pop(i), timestamp=datetime.now())
db = SessionLocal()
db.add(action)
db.commit()
db.close()
action_model.ACTION_LIST.clear()
except Exception as e:
print(f"Failed to add action data to database: {e}")
continue
@scheduler.scheduled_job('interval', seconds=10) @scheduler.scheduled_job('interval', seconds=10)
def check_falldown_action(): def check_falldown_action():
"""If action Fall Down is detected, notify the user""" """If action Fall Down is detected, notify the user"""
if "Fall Down" in action_model.ACTION_LIST: if action_model.IS_FALL_DOWN:
# send post to https://notify-api.line.me/api/notify # send post to https://notify-api.line.me/api/notify
# Authorization: Bearer use LINE_NOTIFY_TOKEN (ChannelAccessToken) # Authorization: Bearer use LINE_NOTIFY_TOKEN (ChannelAccessToken)
httpx.post("https://notify-api.line.me/api/notify", headers={"Authorization": f"Bearer {LINE_NOTIFY_TOKEN}"}, data={"message": "Fall Down detected!"}) httpx.post("https://notify-api.line.me/api/notify", headers={"Authorization": f"Bearer {LINE_NOTIFY_TOKEN}"}, data={"message": "Fall Down detected!"})
action_model.IS_FALL_DOWN = False
# --- ROUTER ENDPOINTS --- # --- ROUTER ENDPOINTS ---
@ -192,14 +232,14 @@ async def websocket_endpoint(camera_id: int, websocket: WebSocket):
while cap.isOpened(): while cap.isOpened():
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
await websocket.close(code=1000)
break break
ret, buffer = imencode('.png', frame) ret, buffer = imencode('.png', frame)
if not ret: if not ret:
await websocket.close(code=1000)
break break
await websocket.send_bytes(buffer.tobytes())
if websocket.client_state != WebSocketState.DISCONNECTED:
await websocket.send_bytes(buffer.tobytes())
except WebSocketDisconnect: except WebSocketDisconnect:
print("WebSocket disconnected") print("WebSocket disconnected")
except Exception as e: except Exception as e: