Files
RkApp/FastApi/fastApi.py
2025-10-31 13:46:24 +08:00

246 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#本程序用于启用fastApi,用于与摄像头的数据交互建立连接
from fastapi import FastAPI, File, UploadFile, Form, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
import os, shutil, subprocess, json, time, threading, asyncio
app = FastAPI()
VIDEO_SAVE_PATH = "/mnt/save/video"
IMAGE_SAVE_PATH = "/mnt/save/warning"
MODBUS_BIN_PATH = "/home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal"
GPIO_CONFIG_FILE = "/home/orangepi/RKApp/InitAuth/conf/.env"
Camera_Config_File = "/opt/rknn-yolov11/.env"
os.makedirs(VIDEO_SAVE_PATH, exist_ok=True)
os.makedirs(IMAGE_SAVE_PATH, exist_ok=True)
# ============================== GPIO 状态控制器 ============================== #
# 全局状态变量
current_gpio_state = 'HIGH' # 当前GPIO状态 ('HIGH' 或 'LOW')
last_person_time = 0.0 # 最后一次检测到人的时间
gpio_state_lock = threading.Lock() # 线程锁
GPIO_DELAY_SECONDS = 2.0 # 人离开后的延迟时间(秒)
def update_gpio_config(value: str):
"""
更新GPIO配置文件中的 outPutMode 值
Args:
value: 'true''false'
"""
try:
with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
modified = False
for i, line in enumerate(lines):
if line.strip().startswith('outPutMode'):
lines[i] = f"outPutMode:{value}\n"
modified = True
break
if not modified:
lines.append(f"outPutMode:{value}\n")
with open(GPIO_CONFIG_FILE, 'w', encoding='utf-8') as f:
f.writelines(lines)
print(f"[GPIO] 配置文件已更新: outPutMode={value}")
return True
except Exception as e:
print(f"[GPIO ERROR] 更新配置文件失败: {e}")
return False
def call_gpio_program():
"""调用GPIO控制程序"""
try:
signal = "echo 'orangepi' | sudo " + MODBUS_BIN_PATH
result = subprocess.run([signal], shell=True, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print(f"[GPIO] GPIO程序调用成功")
return True
else:
print(f"[GPIO ERROR] GPIO程序调用失败: {result.stderr}")
return False
except Exception as e:
print(f"[GPIO ERROR] 调用GPIO程序时发生异常: {e}")
return False
def set_gpio_low():
"""设置GPIO为低电平"""
global current_gpio_state
if update_gpio_config('false') and call_gpio_program():
current_gpio_state = 'LOW'
print(f"[GPIO] ✅ 已切换到低电平 (检测到人)")
return True
return False
def set_gpio_high():
"""设置GPIO为高电平"""
global current_gpio_state
if update_gpio_config('true') and call_gpio_program():
current_gpio_state = 'HIGH'
print(f"[GPIO] ✅ 已恢复高电平 (延迟{GPIO_DELAY_SECONDS}秒)")
return True
return False
def gpio_monitor_task():
"""
GPIO监控后台任务
定期检查是否需要恢复高电平
"""
global current_gpio_state, last_person_time
print("[GPIO] 🚀 GPIO监控线程已启动")
while True:
time.sleep(0.5) # 每0.5秒检查一次
with gpio_state_lock:
current_time = time.time()
time_since_last_person = current_time - last_person_time
# 如果当前是低电平,且距离上次检测到人已超过延迟时间
if (current_gpio_state == 'LOW' and
last_person_time > 0 and
time_since_last_person >= GPIO_DELAY_SECONDS):
print(f"[GPIO] ⏰ 距上次检测: {time_since_last_person:.1f}秒,准备恢复高电平")
set_gpio_high()
# 启动GPIO监控线程
gpio_monitor_thread = threading.Thread(target=gpio_monitor_task, daemon=True)
gpio_monitor_thread.start()
@app.websocket("/ws/distance")
@app.websocket("/ws/distance/")
async def websocket_distance(websocket: WebSocket):
global current_gpio_state, last_person_time
await websocket.accept()
print("✅ WebSocket 客户端已连接")
try:
while True:
data = await websocket.receive_text()
try:
msg = json.loads(data)
distance = msg.get("distance")
ts = msg.get("ts")
print(f"📨 收到距离: {distance}m, 时间戳: {ts}")
# # 写入日志单独try避免异常影响后续流程
# try:
# now = datetime.now()
# log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n"
# with open("/home/orangepi/Opencv/time.txt", "a") as f:
# f.write(log_line)
# except Exception as log_e:
# print(f"日志写入失败: {log_e}")
# ⭐ GPIO状态控制逻辑
with gpio_state_lock:
current_time = time.time()
# 读取配置文件中的危险距离阈值
danger_distance = 3.0 # 默认值
try:
with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
if line.strip().startswith('MAX_DISTANCE'):
danger_distance = float(line.strip().split(':')[1])
break
except Exception as e:
print(f"[Config] ⚠️ 读取危险距离失败使用默认值3.0m: {e}")
print(f"[Config] 📏 危险距离阈值: {danger_distance}m, 当前距离: {distance}m")
# 判断是否在危险区域内
if distance is not None and distance < danger_distance:
# 人在危险区域内,更新时间戳
last_person_time = current_time
print(f"[GPIO] ⚠️ 危险!距离 {distance:.2f}m < {danger_distance:.2f}m")
# 如果当前是高电平,切换到低电平
if current_gpio_state == 'HIGH':
print(f"[GPIO] 🔻 当前为高电平,准备切换到低电平")
set_gpio_low()
else:
print(f"[GPIO] ⚡ 当前已是低电平,保持状态")
else:
# 人在安全区域不触发GPIO
print(f"[GPIO] ✅ 安全距离 {distance:.2f}m >= {danger_distance:.2f}m不触发GPIO")
# # 发送响应给客户端
# await websocket.send_json({
# "status": "success",
# "distance": distance,
# "gpio_state": current_gpio_state,
# "message": f"已检测到人GPIO状态: {current_gpio_state}"
# })
except json.JSONDecodeError:
await websocket.send_text("invalid JSON")
except Exception as e:
await websocket.send_text(f"server error: {e}")
print(f"❌ 处理消息时出错: {e}")
except WebSocketDisconnect:
print("⚠️ WebSocket 客户端断开连接")
except Exception as e:
print("❌ WebSocket 处理出错:", e)
@app.post("/upload_video/")
async def upload_video(
video: UploadFile = File(..., description="上传的视频(.mp4")
):
if not video.filename.lower().endswith('.mp4'):
return JSONResponse(status_code=400, content={"error": "视频必须为.mp4格式"})
video_path = os.path.join(VIDEO_SAVE_PATH, video.filename)
with open(video_path, "wb") as vid_file:
shutil.copyfileobj(video.file, vid_file)
return {"video_saved_to": video_path}
@app.post("/upload_image/")
async def upload_image(
image: UploadFile = File(..., description="上传的图片(.jpg")
):
if not image.filename.lower().endswith('.jpg'):
return JSONResponse(status_code=400, content={"error": "图片必须为.jpg格式"})
image_path = os.path.join(IMAGE_SAVE_PATH, image.filename)
with open(image_path, "wb") as img_file:
shutil.copyfileobj(image.file, img_file)
return {"image_saved_to": image_path}
@app.post("/upload_distance/")
async def upload_distance(
distance: float = Form(..., description="距离")
):
# 调用本地 modbus 程序,将距离作为参数传递
try:
#写入日志精确到毫秒
# now = datetime.now()
# log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n"
# with open("/home/orangepi/Opencv/time.txt", "a") as f:
# f.write(log_line)
result = subprocess.run(
[MODBUS_BIN_PATH, str(int(distance))],
capture_output=True,
text=True,
timeout=5
)
if result.returncode != 0:
return JSONResponse(status_code=500, content={"error": f"modbus程序执行失败: {result.stderr}"})
return {
"distance": distance,
"modbus_output": result.stdout.strip()
}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})