#本程序用于启用fastApi,用于与摄像头的数据交互建立连接 from fastapi import FastAPI, File, UploadFile, Form, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import os, shutil, subprocess, json, time, threading, asyncio app = FastAPI() VIDEO_SAVE_PATH = "/mnt/save/video" IMAGE_SAVE_PATH = "/mnt/save/warning" MODBUS_BIN_PATH = "/home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal" GPIO_CONFIG_FILE = "/home/orangepi/RKApp/InitAuth/conf/.env" Camera_Config_File = "/opt/rknn-yolov11/.env" os.makedirs(VIDEO_SAVE_PATH, exist_ok=True) os.makedirs(IMAGE_SAVE_PATH, exist_ok=True) # ============================== GPIO 状态控制器 ============================== # # 全局状态变量 current_gpio_state = 'HIGH' # 当前GPIO状态 ('HIGH' 或 'LOW') last_person_time = 0.0 # 最后一次检测到人的时间 gpio_state_lock = threading.Lock() # 线程锁 GPIO_DELAY_SECONDS = 2.0 # 人离开后的延迟时间(秒) def update_gpio_config(value: str): """ 更新GPIO配置文件中的 outPutMode 值 Args: value: 'true' 或 'false' """ try: with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() modified = False for i, line in enumerate(lines): if line.strip().startswith('outPutMode'): lines[i] = f"outPutMode:{value}\n" modified = True break if not modified: lines.append(f"outPutMode:{value}\n") with open(GPIO_CONFIG_FILE, 'w', encoding='utf-8') as f: f.writelines(lines) print(f"[GPIO] 配置文件已更新: outPutMode={value}") return True except Exception as e: print(f"[GPIO ERROR] 更新配置文件失败: {e}") return False def call_gpio_program(): """调用GPIO控制程序""" try: signal = "echo 'orangepi' | sudo " + MODBUS_BIN_PATH result = subprocess.run([signal], shell=True, capture_output=True, text=True, timeout=5) if result.returncode == 0: print(f"[GPIO] GPIO程序调用成功") return True else: print(f"[GPIO ERROR] GPIO程序调用失败: {result.stderr}") return False except Exception as e: print(f"[GPIO ERROR] 调用GPIO程序时发生异常: {e}") return False def set_gpio_low(): """设置GPIO为低电平""" global current_gpio_state if update_gpio_config('false') and call_gpio_program(): current_gpio_state = 'LOW' print(f"[GPIO] ✅ 已切换到低电平 (检测到人)") return True return False def set_gpio_high(): """设置GPIO为高电平""" global current_gpio_state if update_gpio_config('true') and call_gpio_program(): current_gpio_state = 'HIGH' print(f"[GPIO] ✅ 已恢复高电平 (延迟{GPIO_DELAY_SECONDS}秒)") return True return False def gpio_monitor_task(): """ GPIO监控后台任务 定期检查是否需要恢复高电平 """ global current_gpio_state, last_person_time print("[GPIO] 🚀 GPIO监控线程已启动") while True: time.sleep(0.5) # 每0.5秒检查一次 with gpio_state_lock: current_time = time.time() time_since_last_person = current_time - last_person_time # 如果当前是低电平,且距离上次检测到人已超过延迟时间 if (current_gpio_state == 'LOW' and last_person_time > 0 and time_since_last_person >= GPIO_DELAY_SECONDS): print(f"[GPIO] ⏰ 距上次检测: {time_since_last_person:.1f}秒,准备恢复高电平") set_gpio_high() # 启动GPIO监控线程 gpio_monitor_thread = threading.Thread(target=gpio_monitor_task, daemon=True) gpio_monitor_thread.start() @app.websocket("/ws/distance") @app.websocket("/ws/distance/") async def websocket_distance(websocket: WebSocket): global current_gpio_state, last_person_time await websocket.accept() print("✅ WebSocket 客户端已连接") try: while True: data = await websocket.receive_text() try: msg = json.loads(data) distance = msg.get("distance") ts = msg.get("ts") print(f"📨 收到距离: {distance}m, 时间戳: {ts}") # # 写入日志,单独try,避免异常影响后续流程 # try: # now = datetime.now() # log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n" # with open("/home/orangepi/Opencv/time.txt", "a") as f: # f.write(log_line) # except Exception as log_e: # print(f"日志写入失败: {log_e}") # ⭐ GPIO状态控制逻辑 with gpio_state_lock: current_time = time.time() # 读取配置文件中的危险距离阈值 danger_distance = 3.0 # 默认值 try: with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: if line.strip().startswith('MAX_DISTANCE'): danger_distance = float(line.strip().split(':')[1]) break except Exception as e: print(f"[Config] ⚠️ 读取危险距离失败,使用默认值3.0m: {e}") print(f"[Config] 📏 危险距离阈值: {danger_distance}m, 当前距离: {distance}m") # 判断是否在危险区域内 if distance is not None and distance < danger_distance: # 人在危险区域内,更新时间戳 last_person_time = current_time print(f"[GPIO] ⚠️ 危险!距离 {distance:.2f}m < {danger_distance:.2f}m") # 如果当前是高电平,切换到低电平 if current_gpio_state == 'HIGH': print(f"[GPIO] 🔻 当前为高电平,准备切换到低电平") set_gpio_low() else: print(f"[GPIO] ⚡ 当前已是低电平,保持状态") else: # 人在安全区域,不触发GPIO print(f"[GPIO] ✅ 安全距离 {distance:.2f}m >= {danger_distance:.2f}m,不触发GPIO") # # 发送响应给客户端 # await websocket.send_json({ # "status": "success", # "distance": distance, # "gpio_state": current_gpio_state, # "message": f"已检测到人,GPIO状态: {current_gpio_state}" # }) except json.JSONDecodeError: await websocket.send_text("invalid JSON") except Exception as e: await websocket.send_text(f"server error: {e}") print(f"❌ 处理消息时出错: {e}") except WebSocketDisconnect: print("⚠️ WebSocket 客户端断开连接") except Exception as e: print("❌ WebSocket 处理出错:", e) @app.post("/upload_video/") async def upload_video( video: UploadFile = File(..., description="上传的视频(.mp4)") ): if not video.filename.lower().endswith('.mp4'): return JSONResponse(status_code=400, content={"error": "视频必须为.mp4格式"}) video_path = os.path.join(VIDEO_SAVE_PATH, video.filename) with open(video_path, "wb") as vid_file: shutil.copyfileobj(video.file, vid_file) return {"video_saved_to": video_path} @app.post("/upload_image/") async def upload_image( image: UploadFile = File(..., description="上传的图片(.jpg)") ): if not image.filename.lower().endswith('.jpg'): return JSONResponse(status_code=400, content={"error": "图片必须为.jpg格式"}) image_path = os.path.join(IMAGE_SAVE_PATH, image.filename) with open(image_path, "wb") as img_file: shutil.copyfileobj(image.file, img_file) return {"image_saved_to": image_path} @app.post("/upload_distance/") async def upload_distance( distance: float = Form(..., description="距离") ): # 调用本地 modbus 程序,将距离作为参数传递 try: #写入日志精确到毫秒 # now = datetime.now() # log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n" # with open("/home/orangepi/Opencv/time.txt", "a") as f: # f.write(log_line) result = subprocess.run( [MODBUS_BIN_PATH, str(int(distance))], capture_output=True, text=True, timeout=5 ) if result.returncode != 0: return JSONResponse(status_code=500, content={"error": f"modbus程序执行失败: {result.stderr}"}) return { "distance": distance, "modbus_output": result.stdout.strip() } except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)})