246 lines
9.5 KiB
Python
246 lines
9.5 KiB
Python
#本程序用于启用fastApi,用于与摄像头的数据交互建立连接
|
||
|
||
from fastapi import FastAPI, File, UploadFile, Form, WebSocket, WebSocketDisconnect
|
||
from fastapi.responses import JSONResponse
|
||
import os, shutil, subprocess, json, time, threading, asyncio
|
||
|
||
app = FastAPI()
|
||
|
||
VIDEO_SAVE_PATH = "/mnt/save/video"
|
||
IMAGE_SAVE_PATH = "/mnt/save/warning"
|
||
MODBUS_BIN_PATH = "/home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal"
|
||
GPIO_CONFIG_FILE = "/home/orangepi/RKApp/InitAuth/conf/.env"
|
||
Camera_Config_File = "/opt/rknn-yolov11/.env"
|
||
|
||
os.makedirs(VIDEO_SAVE_PATH, exist_ok=True)
|
||
os.makedirs(IMAGE_SAVE_PATH, exist_ok=True)
|
||
|
||
# ============================== GPIO 状态控制器 ============================== #
|
||
|
||
# 全局状态变量
|
||
current_gpio_state = 'HIGH' # 当前GPIO状态 ('HIGH' 或 'LOW')
|
||
last_person_time = 0.0 # 最后一次检测到人的时间
|
||
gpio_state_lock = threading.Lock() # 线程锁
|
||
GPIO_DELAY_SECONDS = 2.0 # 人离开后的延迟时间(秒)
|
||
|
||
def update_gpio_config(value: str):
|
||
"""
|
||
更新GPIO配置文件中的 outPutMode 值
|
||
|
||
Args:
|
||
value: 'true' 或 'false'
|
||
"""
|
||
try:
|
||
with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
modified = False
|
||
for i, line in enumerate(lines):
|
||
if line.strip().startswith('outPutMode'):
|
||
lines[i] = f"outPutMode:{value}\n"
|
||
modified = True
|
||
break
|
||
|
||
if not modified:
|
||
lines.append(f"outPutMode:{value}\n")
|
||
|
||
with open(GPIO_CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||
f.writelines(lines)
|
||
|
||
print(f"[GPIO] 配置文件已更新: outPutMode={value}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[GPIO ERROR] 更新配置文件失败: {e}")
|
||
return False
|
||
|
||
def call_gpio_program():
|
||
"""调用GPIO控制程序"""
|
||
try:
|
||
signal = "echo 'orangepi' | sudo " + MODBUS_BIN_PATH
|
||
result = subprocess.run([signal], shell=True, capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0:
|
||
print(f"[GPIO] GPIO程序调用成功")
|
||
return True
|
||
else:
|
||
print(f"[GPIO ERROR] GPIO程序调用失败: {result.stderr}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"[GPIO ERROR] 调用GPIO程序时发生异常: {e}")
|
||
return False
|
||
|
||
def set_gpio_low():
|
||
"""设置GPIO为低电平"""
|
||
global current_gpio_state
|
||
if update_gpio_config('false') and call_gpio_program():
|
||
current_gpio_state = 'LOW'
|
||
print(f"[GPIO] ✅ 已切换到低电平 (检测到人)")
|
||
return True
|
||
return False
|
||
|
||
def set_gpio_high():
|
||
"""设置GPIO为高电平"""
|
||
global current_gpio_state
|
||
if update_gpio_config('true') and call_gpio_program():
|
||
current_gpio_state = 'HIGH'
|
||
print(f"[GPIO] ✅ 已恢复高电平 (延迟{GPIO_DELAY_SECONDS}秒)")
|
||
return True
|
||
return False
|
||
|
||
def gpio_monitor_task():
|
||
"""
|
||
GPIO监控后台任务
|
||
定期检查是否需要恢复高电平
|
||
"""
|
||
global current_gpio_state, last_person_time
|
||
|
||
print("[GPIO] 🚀 GPIO监控线程已启动")
|
||
|
||
while True:
|
||
time.sleep(0.5) # 每0.5秒检查一次
|
||
|
||
with gpio_state_lock:
|
||
current_time = time.time()
|
||
time_since_last_person = current_time - last_person_time
|
||
|
||
# 如果当前是低电平,且距离上次检测到人已超过延迟时间
|
||
if (current_gpio_state == 'LOW' and
|
||
last_person_time > 0 and
|
||
time_since_last_person >= GPIO_DELAY_SECONDS):
|
||
|
||
print(f"[GPIO] ⏰ 距上次检测: {time_since_last_person:.1f}秒,准备恢复高电平")
|
||
set_gpio_high()
|
||
|
||
# 启动GPIO监控线程
|
||
gpio_monitor_thread = threading.Thread(target=gpio_monitor_task, daemon=True)
|
||
gpio_monitor_thread.start()
|
||
|
||
|
||
@app.websocket("/ws/distance")
|
||
@app.websocket("/ws/distance/")
|
||
async def websocket_distance(websocket: WebSocket):
|
||
global current_gpio_state, last_person_time
|
||
|
||
await websocket.accept()
|
||
print("✅ WebSocket 客户端已连接")
|
||
try:
|
||
while True:
|
||
data = await websocket.receive_text()
|
||
try:
|
||
msg = json.loads(data)
|
||
distance = msg.get("distance")
|
||
ts = msg.get("ts")
|
||
print(f"📨 收到距离: {distance}m, 时间戳: {ts}")
|
||
|
||
# # 写入日志,单独try,避免异常影响后续流程
|
||
# try:
|
||
# now = datetime.now()
|
||
# log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n"
|
||
# with open("/home/orangepi/Opencv/time.txt", "a") as f:
|
||
# f.write(log_line)
|
||
# except Exception as log_e:
|
||
# print(f"日志写入失败: {log_e}")
|
||
|
||
# ⭐ GPIO状态控制逻辑
|
||
with gpio_state_lock:
|
||
current_time = time.time()
|
||
|
||
# 读取配置文件中的危险距离阈值
|
||
danger_distance = 3.0 # 默认值
|
||
try:
|
||
with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
for line in lines:
|
||
if line.strip().startswith('MAX_DISTANCE'):
|
||
danger_distance = float(line.strip().split(':')[1])
|
||
break
|
||
except Exception as e:
|
||
print(f"[Config] ⚠️ 读取危险距离失败,使用默认值3.0m: {e}")
|
||
|
||
print(f"[Config] 📏 危险距离阈值: {danger_distance}m, 当前距离: {distance}m")
|
||
|
||
# 判断是否在危险区域内
|
||
if distance is not None and distance < danger_distance:
|
||
# 人在危险区域内,更新时间戳
|
||
last_person_time = current_time
|
||
print(f"[GPIO] ⚠️ 危险!距离 {distance:.2f}m < {danger_distance:.2f}m")
|
||
|
||
# 如果当前是高电平,切换到低电平
|
||
if current_gpio_state == 'HIGH':
|
||
print(f"[GPIO] 🔻 当前为高电平,准备切换到低电平")
|
||
set_gpio_low()
|
||
else:
|
||
print(f"[GPIO] ⚡ 当前已是低电平,保持状态")
|
||
else:
|
||
# 人在安全区域,不触发GPIO
|
||
print(f"[GPIO] ✅ 安全距离 {distance:.2f}m >= {danger_distance:.2f}m,不触发GPIO")
|
||
|
||
# # 发送响应给客户端
|
||
# await websocket.send_json({
|
||
# "status": "success",
|
||
# "distance": distance,
|
||
# "gpio_state": current_gpio_state,
|
||
# "message": f"已检测到人,GPIO状态: {current_gpio_state}"
|
||
# })
|
||
|
||
except json.JSONDecodeError:
|
||
await websocket.send_text("invalid JSON")
|
||
except Exception as e:
|
||
await websocket.send_text(f"server error: {e}")
|
||
print(f"❌ 处理消息时出错: {e}")
|
||
|
||
except WebSocketDisconnect:
|
||
print("⚠️ WebSocket 客户端断开连接")
|
||
except Exception as e:
|
||
print("❌ WebSocket 处理出错:", e)
|
||
|
||
@app.post("/upload_video/")
|
||
async def upload_video(
|
||
video: UploadFile = File(..., description="上传的视频(.mp4)")
|
||
):
|
||
if not video.filename.lower().endswith('.mp4'):
|
||
return JSONResponse(status_code=400, content={"error": "视频必须为.mp4格式"})
|
||
video_path = os.path.join(VIDEO_SAVE_PATH, video.filename)
|
||
with open(video_path, "wb") as vid_file:
|
||
shutil.copyfileobj(video.file, vid_file)
|
||
return {"video_saved_to": video_path}
|
||
|
||
|
||
@app.post("/upload_image/")
|
||
async def upload_image(
|
||
image: UploadFile = File(..., description="上传的图片(.jpg)")
|
||
):
|
||
if not image.filename.lower().endswith('.jpg'):
|
||
return JSONResponse(status_code=400, content={"error": "图片必须为.jpg格式"})
|
||
image_path = os.path.join(IMAGE_SAVE_PATH, image.filename)
|
||
with open(image_path, "wb") as img_file:
|
||
shutil.copyfileobj(image.file, img_file)
|
||
return {"image_saved_to": image_path}
|
||
|
||
|
||
@app.post("/upload_distance/")
|
||
async def upload_distance(
|
||
distance: float = Form(..., description="距离")
|
||
):
|
||
# 调用本地 modbus 程序,将距离作为参数传递
|
||
try:
|
||
#写入日志精确到毫秒
|
||
# now = datetime.now()
|
||
# log_line = f"{now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} distance={distance} ts={ts}\n"
|
||
# with open("/home/orangepi/Opencv/time.txt", "a") as f:
|
||
# f.write(log_line)
|
||
result = subprocess.run(
|
||
[MODBUS_BIN_PATH, str(int(distance))],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
if result.returncode != 0:
|
||
return JSONResponse(status_code=500, content={"error": f"modbus程序执行失败: {result.stderr}"})
|
||
return {
|
||
"distance": distance,
|
||
"modbus_output": result.stdout.strip()
|
||
}
|
||
except Exception as e:
|
||
return JSONResponse(status_code=500, content={"error": str(e)})
|
||
|