#本程序用于启用fastApi,用于与摄像头的数据交互建立连接 from fastapi import FastAPI, File, UploadFile, Form, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import os, shutil, subprocess, json app = FastAPI() VIDEO_SAVE_PATH = "/mnt/save/video" IMAGE_SAVE_PATH = "/mnt/save/warning" MODBUS_BIN_PATH = "/home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal" os.makedirs(VIDEO_SAVE_PATH, exist_ok=True) os.makedirs(IMAGE_SAVE_PATH, exist_ok=True) @app.websocket("/ws/distance") @app.websocket("/ws/distance/") async def websocket_distance(websocket: WebSocket): await websocket.accept() print("✅ WebSocket 客户端已连接") try: while True: data = await websocket.receive_text() try: msg = json.loads(data) distance = msg.get("distance") ts = msg.get("ts") print(f"收到距离: {distance}, 时间戳: {ts}") # # 写入日志,单独try,避免异常影响后续流程 # try: # now = datetime.now() # log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n" # with open("/home/orangepi/Opencv/time.txt", "a") as f: # f.write(log_line) # except Exception as log_e: # print(f"日志写入失败: {log_e}") #调用 Modbus 可执行文件 # print(f"调用 Modbus 程序,传递距离: {distance}") signal = "echo 'orangepi' | sudo "+ MODBUS_BIN_PATH result = subprocess.run( [signal], # shell=True,ocapture_output=True, text=True, timeout=0 shell=True ) # print("signal:", signal) # print(f"Modbus 程序返回: {result.stdout.strip()}, 错误: {result.stderr.strip()}") if result.returncode != 0: # 发送错误信息给客户端 await websocket.send_json({ "error": f"modbus 程序执行失败: {result.stderr.strip()}" }) continue # 正常发送结果 await websocket.send_json({ "distance": distance, "modbus_output": result.stdout.strip() }) except json.JSONDecodeError: await websocket.send_text("invalid JSON") except Exception as e: await websocket.send_text(f"server error: {e}") except WebSocketDisconnect: print("⚠️ WebSocket 客户端断开连接") except Exception as e: print("❌ WebSocket 处理出错:", e) @app.post("/upload_video/") async def upload_video( video: UploadFile = File(..., description="上传的视频(.mp4)") ): if not video.filename.lower().endswith('.mp4'): return JSONResponse(status_code=400, content={"error": "视频必须为.mp4格式"}) video_path = os.path.join(VIDEO_SAVE_PATH, video.filename) with open(video_path, "wb") as vid_file: shutil.copyfileobj(video.file, vid_file) return {"video_saved_to": video_path} @app.post("/upload_image/") async def upload_image( image: UploadFile = File(..., description="上传的图片(.jpg)") ): if not image.filename.lower().endswith('.jpg'): return JSONResponse(status_code=400, content={"error": "图片必须为.jpg格式"}) image_path = os.path.join(IMAGE_SAVE_PATH, image.filename) with open(image_path, "wb") as img_file: shutil.copyfileobj(image.file, img_file) return {"image_saved_to": image_path} @app.post("/upload_distance/") async def upload_distance( distance: float = Form(..., description="距离") ): # 调用本地 modbus 程序,将距离作为参数传递 try: #写入日志精确到毫秒 # now = datetime.now() # log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n" # with open("/home/orangepi/Opencv/time.txt", "a") as f: # f.write(log_line) result = subprocess.run( [MODBUS_BIN_PATH, str(int(distance))], capture_output=True, text=True, timeout=5 ) if result.returncode != 0: return JSONResponse(status_code=500, content={"error": f"modbus程序执行失败: {result.stderr}"}) return { "distance": distance, "modbus_output": result.stdout.strip() } except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)})