This commit is contained in:
2025-10-31 13:32:51 +08:00
parent e4ae201289
commit b807b0f1b4
10 changed files with 4799 additions and 29 deletions

View File

@@ -2,21 +2,123 @@
from fastapi import FastAPI, File, UploadFile, Form, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
import os, shutil, subprocess, json
import os, shutil, subprocess, json, time, threading, asyncio
app = FastAPI()
VIDEO_SAVE_PATH = "/mnt/save/video"
IMAGE_SAVE_PATH = "/mnt/save/warning"
MODBUS_BIN_PATH = "/home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal"
GPIO_CONFIG_FILE = "/home/orangepi/RKApp/InitAuth/conf/.env"
os.makedirs(VIDEO_SAVE_PATH, exist_ok=True)
os.makedirs(IMAGE_SAVE_PATH, exist_ok=True)
# ============================== GPIO 状态控制器 ============================== #
# 全局状态变量
current_gpio_state = 'HIGH' # 当前GPIO状态 ('HIGH' 或 'LOW')
last_person_time = 0.0 # 最后一次检测到人的时间
gpio_state_lock = threading.Lock() # 线程锁
GPIO_DELAY_SECONDS = 2.0 # 人离开后的延迟时间(秒)
def update_gpio_config(value: str):
"""
更新GPIO配置文件中的 outPutMode 值
Args:
value: 'true''false'
"""
try:
with open(GPIO_CONFIG_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
modified = False
for i, line in enumerate(lines):
if line.strip().startswith('outPutMode'):
lines[i] = f"outPutMode:{value}\n"
modified = True
break
if not modified:
lines.append(f"outPutMode:{value}\n")
with open(GPIO_CONFIG_FILE, 'w', encoding='utf-8') as f:
f.writelines(lines)
print(f"[GPIO] 配置文件已更新: outPutMode={value}")
return True
except Exception as e:
print(f"[GPIO ERROR] 更新配置文件失败: {e}")
return False
def call_gpio_program():
"""调用GPIO控制程序"""
try:
signal = "echo 'orangepi' | sudo " + MODBUS_BIN_PATH
result = subprocess.run([signal], shell=True, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print(f"[GPIO] GPIO程序调用成功")
return True
else:
print(f"[GPIO ERROR] GPIO程序调用失败: {result.stderr}")
return False
except Exception as e:
print(f"[GPIO ERROR] 调用GPIO程序时发生异常: {e}")
return False
def set_gpio_low():
"""设置GPIO为低电平"""
global current_gpio_state
if update_gpio_config('false') and call_gpio_program():
current_gpio_state = 'LOW'
print(f"[GPIO] ✅ 已切换到低电平 (检测到人)")
return True
return False
def set_gpio_high():
"""设置GPIO为高电平"""
global current_gpio_state
if update_gpio_config('true') and call_gpio_program():
current_gpio_state = 'HIGH'
print(f"[GPIO] ✅ 已恢复高电平 (延迟{GPIO_DELAY_SECONDS}秒)")
return True
return False
def gpio_monitor_task():
"""
GPIO监控后台任务
定期检查是否需要恢复高电平
"""
global current_gpio_state, last_person_time
print("[GPIO] 🚀 GPIO监控线程已启动")
while True:
time.sleep(0.5) # 每0.5秒检查一次
with gpio_state_lock:
current_time = time.time()
time_since_last_person = current_time - last_person_time
# 如果当前是低电平,且距离上次检测到人已超过延迟时间
if (current_gpio_state == 'LOW' and
last_person_time > 0 and
time_since_last_person >= GPIO_DELAY_SECONDS):
print(f"[GPIO] ⏰ 距上次检测: {time_since_last_person:.1f}秒,准备恢复高电平")
set_gpio_high()
# 启动GPIO监控线程
gpio_monitor_thread = threading.Thread(target=gpio_monitor_task, daemon=True)
gpio_monitor_thread.start()
@app.websocket("/ws/distance")
@app.websocket("/ws/distance/")
async def websocket_distance(websocket: WebSocket):
global current_gpio_state, last_person_time
await websocket.accept()
print("✅ WebSocket 客户端已连接")
try:
@@ -26,7 +128,7 @@ async def websocket_distance(websocket: WebSocket):
msg = json.loads(data)
distance = msg.get("distance")
ts = msg.get("ts")
print(f"收到距离: {distance}, 时间戳: {ts}")
print(f"📨 收到距离: {distance}m, 时间戳: {ts}")
# # 写入日志单独try避免异常影响后续流程
# try:
@@ -36,33 +138,35 @@ async def websocket_distance(websocket: WebSocket):
# f.write(log_line)
# except Exception as log_e:
# print(f"日志写入失败: {log_e}")
#调用 Modbus 可执行文件
# print(f"调用 Modbus 程序,传递距离: {distance}")
signal = "echo 'orangepi' | sudo "+ MODBUS_BIN_PATH
result = subprocess.run(
[signal],
# shell=True,ocapture_output=True, text=True, timeout=0
shell=True
)
# print("signal:", signal)
# print(f"Modbus 程序返回: {result.stdout.strip()}, 错误: {result.stderr.strip()}")
if result.returncode != 0:
# 发送错误信息给客户端
await websocket.send_json({
"error": f"modbus 程序执行失败: {result.stderr.strip()}"
})
continue
# ⭐ GPIO状态控制逻辑
with gpio_state_lock:
current_time = time.time()
# 更新最后检测到人的时间
last_person_time = current_time
print(f"[GPIO] 🔄 更新最后检测时间")
# 如果当前是高电平,切换到低电平
if current_gpio_state == 'HIGH':
print(f"[GPIO] 🔻 当前为高电平,准备切换到低电平")
set_gpio_low()
else:
print(f"[GPIO] ⚡ 当前已是低电平,保持状态")
# 正常发送结果
# 发送响应给客户端
await websocket.send_json({
"status": "success",
"distance": distance,
"modbus_output": result.stdout.strip()
"gpio_state": current_gpio_state,
"message": f"已检测到人GPIO状态: {current_gpio_state}"
})
except json.JSONDecodeError:
await websocket.send_text("invalid JSON")
except Exception as e:
await websocket.send_text(f"server error: {e}")
print(f"❌ 处理消息时出错: {e}")
except WebSocketDisconnect:
print("⚠️ WebSocket 客户端断开连接")