Files
RkApp/FastApi/fastApi.py
2025-09-28 16:03:54 +08:00

117 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#本程序用于启用fastApi,用于与摄像头的数据交互建立连接
from fastapi import FastAPI, File, UploadFile, Form, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
import os, shutil, subprocess, json
app = FastAPI()
VIDEO_SAVE_PATH = "/mnt/save/video"
IMAGE_SAVE_PATH = "/mnt/save/warning"
MODBUS_BIN_PATH = "/home/orangepi/RKApp/ModBus/modbus"
os.makedirs(VIDEO_SAVE_PATH, exist_ok=True)
os.makedirs(IMAGE_SAVE_PATH, exist_ok=True)
@app.websocket("/ws/distance")
@app.websocket("/ws/distance/")
async def websocket_distance(websocket: WebSocket):
await websocket.accept()
print("✅ WebSocket 客户端已连接")
try:
while True:
data = await websocket.receive_text()
try:
msg = json.loads(data)
distance = msg.get("distance")
ts = msg.get("ts")
print(f"收到距离: {distance}, 时间戳: {ts}")
# # 写入日志单独try避免异常影响后续流程
# try:
# now = datetime.now()
# log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n"
# with open("/home/orangepi/Opencv/time.txt", "a") as f:
# f.write(log_line)
# except Exception as log_e:
# print(f"日志写入失败: {log_e}")
#调用 Modbus 可执行文件
result = subprocess.run(
[MODBUS_BIN_PATH, str(int(distance))],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
# 发送错误信息给客户端
await websocket.send_json({
"error": f"modbus 程序执行失败: {result.stderr.strip()}"
})
continue
# 正常发送结果
await websocket.send_json({
"distance": distance,
"modbus_output": result.stdout.strip()
})
except json.JSONDecodeError:
await websocket.send_text("invalid JSON")
except Exception as e:
await websocket.send_text(f"server error: {e}")
except WebSocketDisconnect:
print("⚠️ WebSocket 客户端断开连接")
except Exception as e:
print("❌ WebSocket 处理出错:", e)
@app.post("/upload_video/")
async def upload_video(
video: UploadFile = File(..., description="上传的视频(.mp4")
):
if not video.filename.lower().endswith('.mp4'):
return JSONResponse(status_code=400, content={"error": "视频必须为.mp4格式"})
video_path = os.path.join(VIDEO_SAVE_PATH, video.filename)
with open(video_path, "wb") as vid_file:
shutil.copyfileobj(video.file, vid_file)
return {"video_saved_to": video_path}
@app.post("/upload_image/")
async def upload_image(
image: UploadFile = File(..., description="上传的图片(.jpg")
):
if not image.filename.lower().endswith('.jpg'):
return JSONResponse(status_code=400, content={"error": "图片必须为.jpg格式"})
image_path = os.path.join(IMAGE_SAVE_PATH, image.filename)
with open(image_path, "wb") as img_file:
shutil.copyfileobj(image.file, img_file)
return {"image_saved_to": image_path}
# @app.post("/upload_distance/")
# async def upload_distance(
# distance: float = Form(..., description="距离")
# ):
# # 调用本地 modbus 程序,将距离作为参数传递
# try:
# #写入日志精确到毫秒
# # now = datetime.now()
# # log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n"
# # with open("/home/orangepi/Opencv/time.txt", "a") as f:
# # f.write(log_line)
# result = subprocess.run(
# [MODBUS_BIN_PATH, str(int(distance))],
# capture_output=True,
# text=True,
# timeout=5
# )
# if result.returncode != 0:
# return JSONResponse(status_code=500, content={"error": f"modbus程序执行失败: {result.stderr}"})
# return {
# "distance": distance,
# "modbus_output": result.stdout.strip()
# }
# except Exception as e:
# return JSONResponse(status_code=500, content={"error": str(e)})