diff --git a/GPIOSignal/bin/sendGpioSignal b/GPIOSignal/bin/sendGpioSignal index a0d5e33..ac15ff5 100755 Binary files a/GPIOSignal/bin/sendGpioSignal and b/GPIOSignal/bin/sendGpioSignal differ diff --git a/GPIOSignal/src/makefile b/GPIOSignal/src/makefile index 95db1cf..83a452d 100644 --- a/GPIOSignal/src/makefile +++ b/GPIOSignal/src/makefile @@ -1,7 +1,7 @@ all:sendGpioSignal sendGpioSignal: - g++ -g -o sendGpioSignal sendGpioSignal.cpp //home/orangepi/RKApp/softWareInit/NetraLib/src/Netra.cpp -I/home/orangepi/RKApp/softWareInit/NetraLib/include -lwiringPi + g++ -g -o sendGpioSignal sendGpioSignal.cpp -lwiringPi mv sendGpioSignal ../bin diff --git a/GPIOSignal/src/sendGpioSignal.cpp b/GPIOSignal/src/sendGpioSignal.cpp index 0553e74..fcac4d6 100644 --- a/GPIOSignal/src/sendGpioSignal.cpp +++ b/GPIOSignal/src/sendGpioSignal.cpp @@ -6,12 +6,7 @@ #include #include -#include "/home/orangepi/RKApp/softWareInit/NetraLib/include/Netra.hpp" - using namespace std; -using namespace QCL; - -const string SetFile = "/home/orangepi/RKApp/InitAuth/conf/.env"; // 初始化GPIO引脚 int InitGpio(int GPIO_Pin1, int GPIO_Pin2); @@ -19,9 +14,6 @@ int InitGpio(int GPIO_Pin1, int GPIO_Pin2); // 写入GPIO引脚 void WriteGpio(int GPIO_Pin, int value); -// 获取输出模式 -bool GetOutValue(int &value); - int main(int argc, char *argv[]) { int GPIO_Pin1 = 7; @@ -44,70 +36,6 @@ int main(int argc, char *argv[]) WriteGpio(GPIO_Pin2, stoi(argv[1])); return 0; - - // int value = 0; - // bool useArg = false; - - // // 修改点:如果传入了参数,直接使用参数作为电平值 (1 或 0) - // if (argc > 1) - // { - // value = atoi(argv[1]); - // useArg = true; - // cout << "[sendGpioSignal] 使用命令行参数: " << value << endl; - // } - // else - // { - // // 原有的读取文件逻辑 - // cout << "[sendGpioSignal] 启动,读取配置: " << SetFile << endl; - // if (GetOutValue(value) == false) - // { - // cerr << "[sendGpioSignal] 未读取到 outPutMode,程序退出" << endl; - // return -1; - // } - // cout << "[sendGpioSignal] 读取到 outPutMode=" << (value == 1 ? "true" : "false") << endl; - // } - - // // 初始化GPIO引脚 - // if (InitGpio(GPIO_Pin1, GPIO_Pin2) != 0) - // { - // cout << "Error: Failed to initialize GPIO pin " << endl; - // return 1; - // } - // // 写入GPIO引脚 - // cout << "[sendGpioSignal] 设置 GPIO(" << GPIO_Pin1 << "," << GPIO_Pin2 << ") 为 " - // << (value == 1 ? "HIGH" : "LOW") << endl; - // WriteGpio(GPIO_Pin1, value); - // WriteGpio(GPIO_Pin2, value); - // cout << "[sendGpioSignal] 完成" << endl; - - // this_thread::sleep_for(chrono::milliseconds(100)); - - return 0; -} - -// 获取输出模式 -bool GetOutValue(int &value) -{ - bool flag = true; - // 读取文件 - ReadFile rf(SetFile); - if (rf.Open() == false) - { - cerr << "读取文件失败" << endl; - flag = false; - } - - auto str = rf.ReadLines(); - for (auto &ii : str) - { - if (ii.find("outPutMode") != string::npos) - { - value = (ii.substr(string("outPutMode:").size()) == "true" ? 1 : 0); - } - } - - rf.Close(); - return flag; } // 初始化GPIO引脚 diff --git a/GPIOSignal/src/yolov11_stereo_distance_ws.py b/GPIOSignal/src/yolov11_stereo_distance_ws.py deleted file mode 100644 index a3a332b..0000000 --- a/GPIOSignal/src/yolov11_stereo_distance_ws.py +++ /dev/null @@ -1,2013 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -YOLOv11 + Depth Demo on RK3588 3-NPU 并行版 (v2.0) -==================================================== - -项目描述: - 基于 RK3588 芯片的高性能目标检测系统,集成深度信息和距离报警功能。 - 充分利用 RK3588 的三颗 NPU 核心,实现并行推理以最大化帧率性能。 - -核心特性: - 1. 多 NPU 并行推理 - 每颗 NPU 核心独立运行 RKNN 模型实例 - 2. 多线程流水线架构 - 图像采集、模型推理、结果渲染分离 - 3. 深度距离检测 - 结合彩色图像和深度信息计算目标距离 - 4. 实时报警系统 - 根据距离阈值触发报警并保存关键帧 - 5. WebSocket 数据上报 - 实时推送检测结果到远程服务器 - 6. 视频录制功能 - 同时保存原始视频和标注视频 - -性能指标: - - 单 NPU: ~18 FPS - - 三 NPU 并行: ~48-52 FPS - - CPU/GPU 占用率: <60% - -作者: xisnoDy -创建时间: 2025年 -最后更新: 2025年7月9日 -版本: v2.0 -""" - -import os -import sys -import time -import math -import threading -import queue -import json -import datetime -from typing import Tuple - -import cv2 -import numpy as np -from openni import openni2 -from dotenv import load_dotenv - -from rknn_executor import RKNN_model_container -from dataset_utils import COCO_test_helper - -import subprocess - -# ============================== 配置文件加载 ============================== # - -def load_config(): - """ - 从 .env 文件加载配置参数 - - 该函数会查找并加载 .env 配置文件,将环境变量转换为相应的数据类型。 - 支持的数据类型包括:字符串、整数、浮点数、布尔值、列表和集合。 - - Returns: - dict: 包含所有配置参数的字典 - - Raises: - FileNotFoundError: 当 .env 文件不存在时 - ValueError: 当配置参数格式错误时 - """ - # 获取项目根目录下的 .env 文件路径 - project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - env_path = os.path.join(project_root, '.env') - - # 检查 .env 文件是否存在 - if not os.path.exists(env_path): - raise FileNotFoundError(f"配置文件不存在: {env_path}") - - # 加载 .env 文件 - load_dotenv(env_path) - print(f"[INFO] 已加载配置文件: {env_path}") - - # 辅助函数:安全获取环境变量 - def get_env_str(key, default=None): - """获取字符串类型的环境变量""" - value = os.getenv(key, default) - if value is None: - raise ValueError(f"缺少必需的配置参数: {key}") - return value - - def get_env_int(key, default=None): - """获取整数类型的环境变量""" - value = os.getenv(key, default) - if value is None: - raise ValueError(f"缺少必需的配置参数: {key}") - try: - return int(value) - except ValueError: - raise ValueError(f"配置参数 {key} 必须是整数: {value}") - - def get_env_float(key, default=None): - """获取浮点数类型的环境变量""" - value = os.getenv(key, default) - if value is None: - raise ValueError(f"缺少必需的配置参数: {key}") - try: - return float(value) - except ValueError: - raise ValueError(f"配置参数 {key} 必须是浮点数: {value}") - - def get_env_bool(key, default=None): - """获取布尔类型的环境变量""" - value = os.getenv(key, default) - if value is None: - raise ValueError(f"缺少必需的配置参数: {key}") - return value.lower() in ('true', '1', 'yes', 'on') - - def get_env_list(key, dtype=int, default=None): - """获取列表类型的环境变量""" - value = os.getenv(key, default) - if value is None: - raise ValueError(f"缺少必需的配置参数: {key}") - try: - return [dtype(x.strip()) for x in value.split(',') if x.strip()] - except ValueError: - raise ValueError(f"配置参数 {key} 格式错误: {value}") - - def get_env_set(key, dtype=int, default=None): - """获取集合类型的环境变量""" - return set(get_env_list(key, dtype, default)) - - def get_env_tuple(key, dtype=int, default=None): - """获取元组类型的环境变量""" - return tuple(get_env_list(key, dtype, default)) - - # 构建配置字典 - config = { - # 硬件路径配置 - 'OPENNI_LIB_PATH': get_env_str('OPENNI_LIB_PATH'), - 'MODEL_PATH': get_env_str('MODEL_PATH'), - - # NPU 配置参数 - 'NPU_CORES': get_env_list('NPU_CORES', int), - 'CPU_THREADS': get_env_int('CPU_THREADS'), - 'CPU_AFFINITY': get_env_set('CPU_AFFINITY', int), - - # 模型推理配置参数 - 'OBJ_THRESH': get_env_float('OBJ_THRESH'), - 'NMS_THRESH': get_env_float('NMS_THRESH'), - 'IMG_SIZE': get_env_tuple('IMG_SIZE', int), - 'PERSON_CLASS_ID': get_env_int('PERSON_CLASS_ID'), - - # 队列配置参数 - 'CAP_QUEUE_SIZE': get_env_int('CAP_QUEUE_SIZE'), - 'RES_QUEUE_SIZE': get_env_int('RES_QUEUE_SIZE'), - - # 摄像头遮挡检测配置参数 - 'BRIGHTNESS_THRESHOLD': get_env_int('BRIGHTNESS_THRESHOLD'), - 'VARIANCE_THRESHOLD': get_env_int('VARIANCE_THRESHOLD'), - - # 深度信息处理配置参数 - 'DEPTH_SAMPLE_RADIUS': get_env_int('DEPTH_SAMPLE_RADIUS'), - 'DEPTH_FILTER_SIZE': get_env_int('DEPTH_FILTER_SIZE'), - - # 距离分区和报警配置参数 - 'NEAR_THRESHOLD': get_env_float('NEAR_THRESHOLD'), - 'MID_THRESHOLD': get_env_float('MID_THRESHOLD'), - 'MAX_DISTANCE': get_env_float('MAX_DISTANCE'), - - # 文件保存配置参数 - 'WARNING_IMAGE_DIR': get_env_str('WARNING_IMAGE_DIR'), - 'WARNING_IMAGE_NAME_FORMAT': get_env_str('WARNING_IMAGE_NAME_FORMAT'), - 'VIDEO_SAVE_DIR': get_env_str('VIDEO_SAVE_DIR'), - 'RAW_VIDEO_NAME_FORMAT': get_env_str('RAW_VIDEO_NAME_FORMAT'), - 'MARK_VIDEO_NAME_FORMAT': get_env_str('MARK_VIDEO_NAME_FORMAT'), - 'WARNING_SNAPSHOT_INTERVAL': get_env_float('WARNING_SNAPSHOT_INTERVAL'), - - # WebSocket 上报配置参数 - 'WS_URL': get_env_str('WS_URL'), - 'WS_REPORT_INTERVAL': get_env_float('WS_REPORT_INTERVAL'), - - # 显示和调试配置参数 - 'ENABLE_GUI': get_env_bool('ENABLE_GUI'), - 'ENABLE_VIDEO_RECORDING': get_env_bool('ENABLE_VIDEO_RECORDING'), - 'FPS_STATS_ENABLED': get_env_bool('FPS_STATS_ENABLED'), - - # 画面翻转配置参数 - 'FLIP_HORIZONTAL': get_env_bool('FLIP_HORIZONTAL'), - 'FLIP_VERTICAL': get_env_bool('FLIP_VERTICAL'), - - # 时区配置 - 'TIMEZONE_OFFSET': get_env_int('TIMEZONE_OFFSET'), - } - - # 打印加载的关键配置信息(用于调试) - print("[INFO] 主要配置参数:") - print(f" - 模型路径: {config['MODEL_PATH']}") - print(f" - NPU 核心: {config['NPU_CORES']}") - print(f" - 图像尺寸: {config['IMG_SIZE']}") - print(f" - 置信度阈值: {config['OBJ_THRESH']}") - print(f" - GUI 模式: {config['ENABLE_GUI']}") - print(f" - 视频录制: {config['ENABLE_VIDEO_RECORDING']}") - print(f" - 水平翻转: {config['FLIP_HORIZONTAL']}") - print(f" - 垂直翻转: {config['FLIP_VERTICAL']}") - - return config - - -# 加载全局配置 -try: - CONFIG = load_config() - print("[INFO] 配置文件加载成功") -except Exception as e: - print(f"[ERROR] 配置文件加载失败: {e}") - sys.exit(1) - -# ============================== 全局配置参数 ============================== # -# 注意: 以下变量从 .env 文件动态加载,用于保持代码兼容性 - -# 硬件路径配置 -OPENNI_LIB_PATH = CONFIG['OPENNI_LIB_PATH'] # OpenNI 库路径 -MODEL_PATH = CONFIG['MODEL_PATH'] # RKNN 模型文件路径 - -# NPU 配置参数 -NPU_CORES = CONFIG['NPU_CORES'] # 指定参与运算的 NPU 核心序号 -CPU_THREADS = CONFIG['CPU_THREADS'] # OpenCV CPU 线程数 -CPU_AFFINITY = CONFIG['CPU_AFFINITY'] # CPU 亲和性设置,绑定到特定核心 - -# 模型推理配置参数 -OBJ_THRESH = CONFIG['OBJ_THRESH'] # 目标检测置信度阈值 -NMS_THRESH = CONFIG['NMS_THRESH'] # 非极大值抑制阈值 -IMG_SIZE = CONFIG['IMG_SIZE'] # 模型输入尺寸 (height, width) -PERSON_CLASS_ID = CONFIG['PERSON_CLASS_ID'] # 人体类别ID (COCO dataset) - -# 队列配置参数 -CAP_QUEUE_SIZE = CONFIG['CAP_QUEUE_SIZE'] # 采集→推理 队列深度 -RES_QUEUE_SIZE = CONFIG['RES_QUEUE_SIZE'] # 推理→渲染 队列深度 - -# 摄像头遮挡检测配置参数 -BRIGHTNESS_THRESHOLD = CONFIG['BRIGHTNESS_THRESHOLD'] # 亮度阈值,低于此值认为可能被遮挡 -VARIANCE_THRESHOLD = CONFIG['VARIANCE_THRESHOLD'] # 方差阈值,低于此值认为画面过于均匀 - -# 深度信息处理配置参数 -DEPTH_SAMPLE_RADIUS = CONFIG['DEPTH_SAMPLE_RADIUS'] # 深度采样半径(像素) -DEPTH_FILTER_SIZE = CONFIG['DEPTH_FILTER_SIZE'] # 深度中值滤波核大小 - -# 距离分区和报警配置参数 -NEAR_THRESHOLD = CONFIG['NEAR_THRESHOLD'] # 近距离阈值 (米) -MID_THRESHOLD = CONFIG['MID_THRESHOLD'] # 中距离阈值 (米) -MAX_DISTANCE = CONFIG['MAX_DISTANCE'] # 报警判定的最大距离 (米),超出则视为安全 - -# 文件保存配置参数 -WARNING_IMAGE_DIR = CONFIG['WARNING_IMAGE_DIR'] # 报警截图保存目录 -WARNING_IMAGE_NAME_FORMAT = CONFIG['WARNING_IMAGE_NAME_FORMAT'] # 报警截图文件名格式 -VIDEO_SAVE_DIR = CONFIG['VIDEO_SAVE_DIR'] # 视频保存目录 -RAW_VIDEO_NAME_FORMAT = CONFIG['RAW_VIDEO_NAME_FORMAT'] # 原始视频文件名格式 -MARK_VIDEO_NAME_FORMAT = CONFIG['MARK_VIDEO_NAME_FORMAT'] # 标注视频文件名格式 -WARNING_SNAPSHOT_INTERVAL = CONFIG['WARNING_SNAPSHOT_INTERVAL'] # 报警截图最小间隔 (秒) - -# WebSocket 上报配置参数 -WS_URL = CONFIG['WS_URL'] # WebSocket 上报目标 URL -WS_REPORT_INTERVAL = CONFIG['WS_REPORT_INTERVAL'] # WebSocket 上报间隔 (秒) - -# 显示和调试配置参数 -ENABLE_GUI = CONFIG['ENABLE_GUI'] # 是否启用 GUI 显示 (False: 无头模式) -ENABLE_VIDEO_RECORDING = CONFIG['ENABLE_VIDEO_RECORDING'] # 是否启用视频录制功能 -FPS_STATS_ENABLED = CONFIG['FPS_STATS_ENABLED'] # 是否启用 FPS 统计功能 - -# 画面翻转配置参数 -FLIP_HORIZONTAL = CONFIG['FLIP_HORIZONTAL'] # 是否水平镜像翻转画面 -FLIP_VERTICAL = CONFIG['FLIP_VERTICAL'] # 是否垂直上下翻转画面 - -# 时区配置 (北京时间 UTC+8) -TIMEZONE_OFFSET = CONFIG['TIMEZONE_OFFSET'] # 北京时间偏移量(小时) - -# GPIO 控制配置参数 -GPIO_CMD_PATH = "/home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal" # GPIO 控制程序路径 -GPIO_CONFIG_FILE = "/home/orangepi/InitAuth/conf/.env" # GPIO 配置文件路径 -GPIO_DELAY_SECONDS = 2.0 - -# ========================================================================= # - -# ========================= OpenCV 性能优化配置 ========================= # -cv2.setUseOptimized(True) # 启用 OpenCV 优化 -cv2.setNumThreads(CPU_THREADS) # 设置 OpenCV CPU 线程数 - -# 尝试设置 CPU 亲和性,将当前进程绑定到特定 CPU 核心 -try: - os.sched_setaffinity(0, CPU_AFFINITY) # 绑定到 CPU 核心 4-7 - print(f"[INFO] CPU 亲和性已设置: {CPU_AFFINITY}") -except (AttributeError, PermissionError) as e: - print(f"[WARN] 无法设置 CPU 亲和性: {e}") - pass - -# ============================== 时间工具函数 ============================== # - -def get_beijing_time(): - """ - 获取北京时间 (UTC+8) - - Returns: - datetime: 北京时间的 datetime 对象 - """ - import datetime - utc_now = datetime.datetime.utcnow() - beijing_time = utc_now + datetime.timedelta(hours=TIMEZONE_OFFSET) - return beijing_time - - -def format_beijing_timestamp(fmt="%Y-%m-%d %H:%M:%S"): - """ - 格式化北京时间戳 - - Args: - fmt (str): 时间格式化字符串 - - Returns: - str: 格式化后的北京时间字符串 - """ - return get_beijing_time().strftime(fmt) - - -# ============================== FPS 统计类 ============================== # - -class FPSStatistics: - """ - FPS 统计工具类 - 用于记录和计算帧率的最大值、最小值和平均值 - """ - - def __init__(self): - """初始化 FPS 统计数据""" - self.fps_history = [] # FPS 历史记录 - self.max_fps = 0.0 # 最高 FPS - self.min_fps = float('inf') # 最低 FPS - self.total_frames = 0 # 总帧数 - self.start_time = time.perf_counter() # 开始时间 - - def update(self, current_fps): - """ - 更新 FPS 统计数据 - - Args: - current_fps (float): 当前帧率 - """ - if current_fps > 0: # 只记录有效的 FPS 值 - self.fps_history.append(current_fps) - self.max_fps = max(self.max_fps, current_fps) - self.min_fps = min(self.min_fps, current_fps) - self.total_frames += 1 - - def get_average_fps(self): - """ - 计算平均 FPS - - Returns: - float: 平均帧率 - """ - if len(self.fps_history) == 0: - return 0.0 - return sum(self.fps_history) / len(self.fps_history) - - def get_overall_fps(self): - """ - 计算总体 FPS(基于总时间和总帧数) - - Returns: - float: 总体帧率 - """ - elapsed_time = time.perf_counter() - self.start_time - if elapsed_time > 0: - return self.total_frames / elapsed_time - return 0.0 - - def get_statistics(self): - """ - 获取完整的 FPS 统计信息 - - Returns: - dict: 包含所有统计数据的字典 - """ - return { - 'max_fps': self.max_fps if self.max_fps > 0 else 0.0, - 'min_fps': self.min_fps if self.min_fps != float('inf') else 0.0, - 'avg_fps': self.get_average_fps(), - 'overall_fps': self.get_overall_fps(), - 'total_frames': self.total_frames, - 'samples': len(self.fps_history) - } - - def print_statistics(self): - """打印详细的 FPS 统计信息""" - stats = self.get_statistics() - print("\n" + "="*50) - print(" FPS 统计报告") - print("="*50) - print(f"最高 FPS: {stats['max_fps']:.2f}") - print(f"最低 FPS: {stats['min_fps']:.2f}") - print(f"平均 FPS: {stats['avg_fps']:.2f}") - print(f"总体 FPS: {stats['overall_fps']:.2f}") - print(f"总处理帧数: {stats['total_frames']}") - print(f"统计样本数: {stats['samples']}") - print(f"运行时间: {time.perf_counter() - self.start_time:.2f} 秒") - print("="*50) - - -# ============================== 后处理工具函数 ============================== # - -def filter_boxes(boxes, box_confidences, box_class_probs): - """ - 过滤目标检测结果,根据置信度阈值筛选有效边界框 - - 该函数实现了基于置信度的边界框过滤机制,只保留高于设定阈值的检测结果。 - 置信度计算公式: final_confidence = objectness_score * class_probability - - Args: - boxes (np.ndarray): 边界框坐标数组,形状为 [N, 4],格式为 [x1, y1, x2, y2] - box_confidences (np.ndarray): 目标存在置信度数组,形状为 [N, 1] - box_class_probs (np.ndarray): 类别概率分布数组,形状为 [N, num_classes] - - Returns: - tuple: (filtered_boxes, filtered_classes, filtered_scores) - - filtered_boxes: 过滤后的边界框坐标 - - filtered_classes: 过滤后的类别索引 - - filtered_scores: 过滤后的最终置信度分数 - """ - conf = box_confidences.reshape(-1) # 展平置信度数组为一维 - cls_max = np.max(box_class_probs, axis=-1) # 获取每个框的最大类别概率 - classes = np.argmax(box_class_probs, axis=-1) # 获取每个框的类别索引 - final_scores = cls_max * conf # 计算最终置信度分数 - keep = np.where(final_scores >= OBJ_THRESH) # 保留高于阈值的框的索引 - - return boxes[keep], classes[keep], final_scores[keep] - - -def dfl(position): - """ - Distribution Focal Loss (DFL) 回归解码算法 - - DFL 是一种将连续值回归问题转换为分类问题的技术,通过学习距离分布来提高 - 边界框回归的精度。该函数将网络输出的分布形式预测转换为具体的坐标偏移量。 - - 算法流程: - 1. 将位置预测重塑为分组形式 (每个坐标分量一组) - 2. 应用 softmax 激活函数得到概率分布 - 3. 计算加权期望值作为最终的坐标偏移量 - - Args: - position (np.ndarray): 位置预测张量,形状为 [batch, channels, height, width] - channels 必须能被 4 整除 (对应 x1, y1, x2, y2) - - Returns: - np.ndarray: 解码后的位置偏移量,形状为 [batch, 4, height, width] - """ - n, c, h, w = position.shape - p_num = 4 # 边界框有4个坐标分量 (x1, y1, x2, y2) - mc = c // p_num # 每个坐标分量的通道数 - - # 重塑张量为分组形式: [batch, 4, mc, height, width] - y = position.reshape(n, p_num, mc, h, w) - - # 应用 softmax 激活函数 (数值稳定化版本) - y = np.exp(y - np.max(y, axis=2, keepdims=True)) # 减去最大值防止溢出 - y /= y.sum(axis=2, keepdims=True) # 归一化为概率分布 - - # 创建累积权重矩阵 [0, 1, 2, ..., mc-1] - acc = np.arange(mc, dtype=np.float32).reshape(1, 1, mc, 1, 1) - - # 计算加权期望值 (即分布的期望) - return (y * acc).sum(axis=2) - - -def box_process(position): - """ - 处理边界框回归预测,将网络输出转换为实际坐标 - - 该函数实现了 YOLOv11 的边界框解码逻辑,将网络输出的相对偏移量转换为 - 绝对坐标。处理流程包括: - 1. 创建特征图网格坐标 - 2. 计算缩放比例 (特征图 → 输入图像) - 3. 应用 DFL 解码 - 4. 转换为绝对坐标系 - - Args: - position (np.ndarray): DFL 位置预测张量,形状为 [1, C, H, W] - - Returns: - np.ndarray: 转换后的边界框坐标,形状为 [1, 4, H, W] - 格式为 [x1, y1, x2, y2],坐标系为输入图像坐标系 - """ - grid_h, grid_w = position.shape[2:4] # 获取特征图尺寸 - - # 创建网格坐标矩阵 - col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) # 列坐标和行坐标 - grid = np.stack((col, row), axis=0).reshape(1, 2, grid_h, grid_w) # 堆叠为网格张量 [1, 2, H, W] - - # 计算特征图到输入图像的缩放比例 - stride = np.array([ - IMG_SIZE[1] // grid_w, # x 方向缩放比例 - IMG_SIZE[0] // grid_h # y 方向缩放比例 - ]).reshape(1, 2, 1, 1) - - # 应用 DFL 解码得到相对偏移量 - position = dfl(position) # [1, 4, H, W] - - # 计算边界框坐标 (相对于网格中心) - box_xy = grid + 0.5 - position[:, 0:2] # 左上角坐标 (x1, y1) - box_xy2 = grid + 0.5 + position[:, 2:4] # 右下角坐标 (x2, y2) - - # 转换为输入图像坐标系 - box_xy_abs = box_xy * stride # 左上角绝对坐标 - box_xy2_abs = box_xy2 * stride # 右下角绝对坐标 - - return np.concatenate((box_xy_abs, box_xy2_abs), axis=1) - - -def post_process(outputs): - """ - YOLOv11 后处理主函数,整合多尺度预测结果 - - 该函数实现了完整的 YOLOv11 后处理流程,包括: - 1. 多尺度特征融合 - 2. 边界框解码 - 3. 置信度过滤 - 4. 非极大值抑制 (NMS) - - YOLOv11 网络结构说明: - - 3个检测头,对应不同尺度的特征图 - - 每个检测头输出2个张量: 边界框回归 + 类别置信度 - - 边界框使用 DFL (Distribution Focal Loss) 编码 - - Args: - outputs (list): 网络输出列表,包含 6 个张量 (3个尺度 × 2个输出) - [bbox_reg_large, cls_conf_large, bbox_reg_medium, cls_conf_medium, - bbox_reg_small, cls_conf_small] - - Returns: - tuple: (final_boxes, final_classes, final_scores) - - final_boxes: 最终的边界框坐标 [N, 4],经过 NMS 处理 - - final_classes: 最终的类别索引 [N] - - final_scores: 最终的置信度分数 [N] - 如果没有检测到目标,返回 (None, None, None) - """ - branches = 3 # YOLOv11 有3个检测分支 (不同尺度) - pair = len(outputs) // branches # 每个分支包含的输出数量 (应该为2) - - # 分别收集各分支的边界框回归、类别置信度和目标性得分 - boxes_list, cls_conf_list, ones_list = [], [], [] - - # 处理每个尺度分支的输出 - for i in range(branches): - bbox_output = outputs[pair * i] # 边界框回归输出 - cls_output = outputs[pair * i + 1] # 类别置信度输出 - - # 处理边界框回归 - boxes_list.append(box_process(bbox_output)) - - # 收集类别置信度 - cls_conf_list.append(cls_output) - - # 创建目标性得分 (YOLOv11中固定为1,表示该位置存在目标的概率) - ones_list.append(np.ones_like(cls_output[:, :1])) - - def flatten_feature_map(feature_tensor): - """ - 将特征图展平为二维矩阵 - - Args: - feature_tensor (np.ndarray): 输入特征张量 [N, C, H, W] - - Returns: - np.ndarray: 展平后的特征矩阵 [N*H*W, C] - """ - # 转换维度顺序: [N, C, H, W] → [N, H, W, C] - transposed = feature_tensor.transpose(0, 2, 3, 1) - # 展平空间维度: [N, H, W, C] → [N*H*W, C] - return transposed.reshape(-1, transposed.shape[-1]) - - # 合并所有尺度的预测结果 - all_boxes = np.concatenate([flatten_feature_map(b) for b in boxes_list]) # 合并边界框 - all_cls_conf = np.concatenate([flatten_feature_map(c) for c in cls_conf_list]) # 合并类别置信度 - all_ones = np.concatenate([flatten_feature_map(o) for o in ones_list]) # 合并目标性得分 - - # 第一步过滤: 基于置信度阈值过滤边界框 - filtered_boxes, filtered_classes, filtered_scores = filter_boxes( - all_boxes, all_ones, all_cls_conf - ) - - # 如果没有满足条件的检测框,直接返回 - if filtered_boxes.size == 0: - return None, None, None - - # 第二步过滤: 非极大值抑制 (NMS) 去除重叠框 - # NMS 参数说明: - # - boxes: 边界框列表 - # - scores: 对应的置信度分数 - # - score_threshold: 分数阈值 (已在第一步过滤中处理) - # - nms_threshold: IoU 阈值,控制重叠程度 - nms_indices = cv2.dnn.NMSBoxes( - filtered_boxes.tolist(), - filtered_scores.tolist(), - OBJ_THRESH, # 置信度阈值 - NMS_THRESH # NMS IoU 阈值 - ) - - # 检查 NMS 结果 - if len(nms_indices) == 0: - return None, None, None - - # 提取最终结果 - final_indices = nms_indices.flatten() # 展平索引数组 - final_boxes = filtered_boxes[final_indices] - final_classes = filtered_classes[final_indices] - final_scores = filtered_scores[final_indices] - - return final_boxes, final_classes, final_scores - -# ============================== 辅助处理函数 ============================== # - -# ============================== GPIO 控制器类 ============================== # - -class GPIOController: - """ - GPIO 信号控制器 - - 管理 GPIO 输出状态,实现以下逻辑: - - 检测到人时立即输出低电平 - - 人离开后延迟指定时间再恢复高电平 - - Attributes: - gpio_cmd (str): GPIO 控制程序路径 - config_file (str): 配置文件路径 - delay_seconds (float): 延迟时间(秒) - current_state (str): 当前GPIO状态 ('HIGH' 或 'LOW') - last_person_time (float): 最后一次检测到人的时间戳 - """ - - def __init__(self, gpio_cmd: str, config_file: str, delay_seconds: float = 2.0): - """ - 初始化 GPIO 控制器 - - Args: - gpio_cmd (str): GPIO 控制程序路径 - config_file (str): 配置文件路径 - delay_seconds (float): 人离开后的延迟时间(秒) - """ - self.gpio_cmd = gpio_cmd - self.config_file = config_file - self.delay_seconds = delay_seconds - self.current_state = 'HIGH' # 初始状态为高电平 - self.last_person_time = 0.0 # 最后检测到人的时间 - self._lock = threading.Lock() # 线程安全锁 - - print(f"[INFO] GPIO 控制器已初始化") - print(f" - GPIO 程序: {self.gpio_cmd}") - print(f" - 配置文件: {self.config_file}") - print(f" - 延迟时间: {self.delay_seconds}秒") - - def update(self, person_detected: bool): - """ - 更新 GPIO 状态 - - Args: - person_detected (bool): 是否检测到人 - """ - with self._lock: - current_time = time.time() - - if person_detected: - # 检测到人,更新时间戳 - self.last_person_time = current_time - - # 如果当前是高电平,切换到低电平 - if self.current_state == 'HIGH': - self._set_gpio_low() - else: - # 未检测到人,检查是否应该恢复高电平 - time_since_last_person = current_time - self.last_person_time - - if (self.current_state == 'LOW' and - time_since_last_person >= self.delay_seconds): - # 超过延迟时间且当前是低电平,恢复高电平 - self._set_gpio_high() - - def _set_gpio_low(self): - """设置 GPIO 为低电平""" - try: - # 修改配置文件 - self._update_config('false') - - # 调用 GPIO 程序 - result = subprocess.run( - [self.gpio_cmd], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - self.current_state = 'LOW' - print(f"[GPIO] 已切换到低电平 (检测到人)") - else: - print(f"[GPIO ERROR] 设置低电平失败: {result.stderr}") - - except Exception as e: - print(f"[GPIO ERROR] 设置低电平时发生异常: {e}") - - def _set_gpio_high(self): - """设置 GPIO 为高电平""" - try: - # 修改配置文件 - self._update_config('true') - - # 调用 GPIO 程序 - result = subprocess.run( - [self.gpio_cmd], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - self.current_state = 'HIGH' - print(f"[GPIO] 已恢复高电平 (延迟{self.delay_seconds}秒)") - else: - print(f"[GPIO ERROR] 设置高电平失败: {result.stderr}") - - except Exception as e: - print(f"[GPIO ERROR] 设置高电平时发生异常: {e}") - - def _update_config(self, value: str): - """ - 更新配置文件中的 outPutMode 值 - - Args: - value (str): 'true' 或 'false' - """ - try: - # 读取配置文件 - with open(self.config_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - - # 修改 outPutMode 行 - modified = False - for i, line in enumerate(lines): - if line.strip().startswith('outPutMode'): - lines[i] = f"outPutMode:{value}\n" - modified = True - break - - # 如果没找到,追加一行 - if not modified: - lines.append(f"outPutMode:{value}\n") - - # 写回配置文件 - with open(self.config_file, 'w', encoding='utf-8') as f: - f.writelines(lines) - - print(f"[GPIO] 配置文件已更新: outPutMode={value}") - - except Exception as e: - print(f"[GPIO ERROR] 更新配置文件失败: {e}") - - def get_status(self) -> dict: - """ - 获取当前状态信息 - - Returns: - dict: 包含状态信息的字典 - """ - with self._lock: - return { - 'current_state': self.current_state, - 'last_person_time': self.last_person_time, - 'time_since_last_person': time.time() - self.last_person_time - } - - - -def process_single_detection(box, score, color_frame, depth_data, color_width, color_height, depth_width, depth_height): - """ - 处理单个检测目标,计算其距离并绘制可视化信息 - - Args: - box (np.ndarray): 边界框坐标 [x1, y1, x2, y2] - score (float): 检测置信度 - color_frame (np.ndarray): 彩色图像帧 - depth_data (np.ndarray): 深度数据 - color_width, color_height (int): 彩色图像尺寸 - depth_width, depth_height (int): 深度图像尺寸 - - Returns: - float: 计算得到的距离值 (米),如果无效则返回 NaN - """ - x1, y1, x2, y2 = map(int, box) # 边界框坐标 - center_x, center_y = (x1 + x2) // 2, (y1 + y2) // 2 # 中心点坐标 - - # 将彩色图像坐标映射到深度图像坐标 - depth_x = int(center_x / color_width * depth_width) - depth_y = int(center_y / color_height * depth_height) - - # 在中心点周围采样深度值 (提高准确性) - sample_x1 = max(depth_x - DEPTH_SAMPLE_RADIUS, 0) - sample_x2 = min(depth_x + DEPTH_SAMPLE_RADIUS + 1, depth_width) - sample_y1 = max(depth_y - DEPTH_SAMPLE_RADIUS, 0) - sample_y2 = min(depth_y + DEPTH_SAMPLE_RADIUS + 1, depth_height) - - depth_patch = depth_data[sample_y1:sample_y2, sample_x1:sample_x2] - - # 对深度数据进行中值滤波 (去除噪声) - if (depth_patch == 0).any(): # 如果有无效深度值 - depth_patch = cv2.medianBlur(depth_patch, DEPTH_FILTER_SIZE) - - # 计算有效深度值的平均距离 - valid_depths = depth_patch[depth_patch > 0].astype(np.float32) - if valid_depths.size > 0: - distance_meters = float(np.mean(valid_depths)) / 1000.0 # 转换为米 - else: - distance_meters = float('nan') # 无有效深度值 - - # 根据距离选择绘制颜色 - if math.isnan(distance_meters): - color = (0, 255, 255) # 未知距离: 黄色 - else: - if distance_meters < NEAR_THRESHOLD: - color = (0, 0, 255) # 近距离: 红色 - elif distance_meters < MID_THRESHOLD: - color = (0, 165, 255) # 中距离: 橙色 - elif distance_meters < MAX_DISTANCE: - color = (0, 255, 255) # 远距离: 黄色 - else: - color = (0, 255, 0) # 安全距离: 绿色 - - # 绘制边界框 - cv2.rectangle(color_frame, (x1, y1), (x2, y2), color, 2) - - # 绘制标签 (包含置信度和距离信息) - if not math.isnan(distance_meters): - label = f"person {score:.2f} {distance_meters:.2f}m" - else: - label = f"person {score:.2f} --m" - - # 计算标签位置 (避免超出图像边界) - label_y = max(y1 - 10, 20) - cv2.putText( - color_frame, - label, - (x1, label_y), - cv2.FONT_HERSHEY_SIMPLEX, - 0.6, - color, - 2 - ) - - return distance_meters - - -def calculate_zone_counts(distances): - """ - 计算各距离区域的检测框数量 - - Args: - distances (list): 检测到的距离列表 - - Returns: - list: [近距离数量, 中距离数量, 远距离数量] - """ - zone_counts = [0, 0, 0] # [近距离, 中距离, 远距离] - - for distance in distances: - if distance is None or not np.isfinite(distance) or distance >= MAX_DISTANCE: - continue # 跳过无效或安全距离 - - if distance < NEAR_THRESHOLD: - zone_counts[0] += 1 # 近距离区域 - elif distance < MID_THRESHOLD: - zone_counts[1] += 1 # 中距离区域 - else: - zone_counts[2] += 1 # 远距离区域 - - return zone_counts - - -def check_warning_conditions(current_counts, prev_counts): - """ - 检查是否需要触发报警保存 - - 报警条件: - 1. 任何区域的检测框数量增加 - 2. 检测框在不同区域间移动 (总数不变但分布变化) - - Args: - current_counts (list): 当前各区域检测框数量 - prev_counts (list): 上次各区域检测框数量 - - Returns: - bool: 是否需要保存报警截图 - """ - # 检查是否有区域检测框数量增加 - for i in range(3): - if current_counts[i] > prev_counts[i]: - return True - - # 检查是否有区域间移动 (总数相同但分布不同) - total_current = sum(current_counts) - total_prev = sum(prev_counts) - - if (total_current == total_prev and - total_current > 0 and - current_counts != prev_counts): - return True - - return False - - -def save_warning_snapshot(frame, start_time, current_counts, prev_counts, distances): - """ - 保存报警截图,包含详细的时间和检测信息 - - Args: - frame (np.ndarray): 当前帧图像 - start_time (datetime): 帧处理开始时间 (北京时间) - current_counts (list): 当前各区域检测框数量 - prev_counts (list): 上次各区域检测框数量 - distances (list): 检测到的距离列表 - """ - snapshot = frame.copy() - h_img, w_img = snapshot.shape[:2] - - # 计算处理结束时间和耗时 - end_time = get_beijing_time() - processing_ms = (end_time - start_time).total_seconds() * 1000.0 - - # 文本绘制基准位置 - base_y = h_img - 15 - line_spacing = 24 - text_color = (255, 255, 255) # 白色文字 - font = cv2.FONT_HERSHEY_SIMPLEX - font_scale = 0.6 - thickness = 2 - - # 绘制时间信息 (三行) - time_texts = [ - f"Start: {start_time.strftime('%H:%M:%S.%f')[:-3]}", - f"End: {end_time.strftime('%H:%M:%S.%f')[:-3]}", - f"Δt: {processing_ms:.1f} ms" - ] - - for i, text in enumerate(time_texts): - y_pos = base_y - (2 - i) * line_spacing - cv2.putText(snapshot, text, (10, y_pos), font, font_scale, text_color, thickness, cv2.LINE_AA) - - # 绘制检测框数量信息 - zone_info = [ - f"0-{NEAR_THRESHOLD:g}m: {current_counts[0]}", - f"{NEAR_THRESHOLD:g}-{MID_THRESHOLD:g}m: {current_counts[1]}", - f"{MID_THRESHOLD:g}-{MAX_DISTANCE:g}m: {current_counts[2]}" - ] - counts_text = f"Counts: {' | '.join(zone_info)}" - cv2.putText(snapshot, counts_text, (10, base_y + 30), font, font_scale, (0, 255, 255), thickness, cv2.LINE_AA) - - # 绘制区域变化信息 - change_info = [] - for i in range(3): - if current_counts[i] > prev_counts[i]: - change_info.append(f"+{current_counts[i] - prev_counts[i]} in zone {i}") - elif current_counts[i] < prev_counts[i]: - change_info.append(f"-{prev_counts[i] - current_counts[i]} in zone {i}") - - if change_info: - change_text = f"Changes: {', '.join(change_info)}" - cv2.putText(snapshot, change_text, (10, base_y + 60), font, font_scale, (0, 165, 255), thickness, cv2.LINE_AA) - - # 生成文件名并保存 - filename = start_time.strftime(WARNING_IMAGE_NAME_FORMAT) - file_path = os.path.join(WARNING_IMAGE_DIR, filename) - - # 应用画面翻转(用于保存) - save_snapshot = apply_frame_flip(snapshot) - - try: - cv2.imwrite(file_path, save_snapshot) - print(f"[INFO] 报警截图已保存: {filename}") - except Exception as e: - print(f"[ERROR] 保存报警截图失败: {e}") - - -def handle_video_recording(raw_writer, mark_writer, frame, raw_filename, mark_filename, fourcc): - """ - 处理视频录制,包括延迟初始化和帧写入 - - Args: - raw_writer, mark_writer: 视频写入器对象 - frame (np.ndarray): 当前帧 - raw_filename, mark_filename (str): 视频文件名 - fourcc: 视频编码器 - - Returns: - tuple: 更新后的 (raw_writer, mark_writer) - """ - if not ENABLE_VIDEO_RECORDING: - return raw_writer, mark_writer - - # 延迟初始化视频写入器 (在获得第一帧尺寸后) - if raw_writer is None: - height, width = frame.shape[:2] - - try: - raw_writer = cv2.VideoWriter( - os.path.join(VIDEO_SAVE_DIR, raw_filename), - fourcc, 30, (width, height) - ) - mark_writer = cv2.VideoWriter( - os.path.join(VIDEO_SAVE_DIR, mark_filename), - fourcc, 30, (width, height) - ) - print(f"[INFO] 视频录制已开始: {raw_filename}, {mark_filename}") - except Exception as e: - print(f"[ERROR] 视频写入器初始化失败: {e}") - return None, None - - # 写入帧数据 - if raw_writer and mark_writer: - try: - # 应用画面翻转(用于视频保存) - save_frame = apply_frame_flip(frame) - - # 注意: 此时 frame 已经包含检测标记,因此原始视频和标记视频是相同的 - # 如果需要真正的原始视频,应该在检测标记之前保存原始帧 - raw_writer.write(save_frame) # 原始视频 (实际包含标记) - mark_writer.write(save_frame) # 标记视频 - except Exception as e: - print(f"[ERROR] 视频帧写入失败: {e}") - - return raw_writer, mark_writer - - -def apply_frame_flip(frame): - """ - 根据配置应用画面翻转 - - Args: - frame (np.ndarray): 输入图像 - - Returns: - np.ndarray: 翻转后的图像 - """ - result_frame = frame.copy() - - # 应用水平翻转(镜像) - if FLIP_HORIZONTAL: - result_frame = cv2.flip(result_frame, 1) - - # 应用垂直翻转(上下翻转) - if FLIP_VERTICAL: - result_frame = cv2.flip(result_frame, 0) - - return result_frame - - -def draw_fps_info(frame, fps): - """ - 在图像上绘制 FPS 信息 - - Args: - frame (np.ndarray): 输入图像 - fps (float): 当前帧率 - """ - # 绘制 FPS 信息 - fps_text = f"FPS: {fps:.2f}" - cv2.putText( - frame, - fps_text, - (10, 30), - cv2.FONT_HERSHEY_SIMPLEX, - 1.0, - (255, 255, 0), # 黄色 - 2 - ) - - -def draw_warning_overlay(frame, distances): - """ - 根据检测到的距离绘制警告覆盖层 - - Args: - frame (np.ndarray): 输入图像 - distances (list): 检测到的距离列表 - """ - # 检查是否需要显示警告 - alert_needed = False - min_distance = float('inf') - - for distance in distances: - if np.isfinite(distance) and distance < MAX_DISTANCE: - alert_needed = True - if distance < min_distance: - min_distance = distance - - if alert_needed: - # 根据最近距离选择警告文本颜色 - if min_distance < NEAR_THRESHOLD: - warning_color = (0, 0, 255) # 红色 - 高危 - elif min_distance < MID_THRESHOLD: - warning_color = (0, 165, 255) # 橙色 - 中危 - else: - warning_color = (0, 255, 255) # 黄色 - 低危 - - # 绘制警告文本 - h_img, w_img = frame.shape[:2] - warning_text = "WARNING!" - - # 计算文本尺寸以居中显示 - font = cv2.FONT_HERSHEY_DUPLEX - font_scale = 2.0 - thickness = 6 - - (text_width, text_height), _ = cv2.getTextSize(warning_text, font, font_scale, thickness) - text_x = (w_img - text_width) // 2 - text_y = h_img // 2 - - cv2.putText(frame, warning_text, (text_x, text_y), font, font_scale, warning_color, thickness) - -def normalize_boxes(boxes: np.ndarray, w: int, h: int) -> np.ndarray: - """ - 将像素坐标的 boxes 归一化为 [0,1] 区间,格式仍为 [x1, y1, x2, y2] - """ - if boxes is None or boxes.size == 0: - return boxes - scale = np.array([w, h, w, h], dtype=np.float32) - return (boxes.astype(np.float32) / scale).clip(0.0, 1.0) - -def resize_cover(img, tw, th): - """ - 将 img 放大以覆盖目标尺寸 (tw, th),再居中裁剪到精确尺寸。 - 不保留边框(无 letterbox)。 - """ - ih, iw = img.shape[:2] - if iw == 0 or ih == 0: - return img - scale = max(tw / float(iw), th / float(ih)) - nw, nh = int(round(iw * scale)), int(round(ih * scale)) - resized = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) - x0 = max((nw - tw) // 2, 0) - y0 = max((nh - th) // 2, 0) - return resized[y0:y0 + th, x0:x0 + tw] - - -# ============================== 多线程类定义 ============================== # - -class CaptureThread(threading.Thread): - """ - 图像采集线程类 - - 负责从彩色摄像头和深度传感器连续采集图像数据,并将其放入任务队列 - 供推理线程处理。该线程实现了生产者模式,是整个流水线的数据源。 - - 主要功能: - 1. 连续采集彩色帧和深度帧 - 2. 摄像头遮挡检测 (基于亮度和方差分析) - 3. 图像镜像处理 (对齐彩色和深度流) - 4. 队列管理 (防止队列溢出,保持实时性) - - Attributes: - cap: OpenCV 摄像头对象 - depth_stream: OpenNI 深度流对象 - depth_width, depth_height: 深度图像尺寸 - task_queue: 任务队列,用于传递采集的帧数据 - running: 线程运行状态控制标志 - """ - - def __init__(self, cap, depth_stream, depth_size, task_queue: queue.Queue): - """ - 初始化图像采集线程 - - Args: - cap: OpenCV 摄像头对象,用于采集彩色图像 - depth_stream: OpenNI 深度流对象,用于采集深度数据 - depth_size (tuple): 深度图像尺寸,格式为 (width, height) - task_queue (queue.Queue): 任务队列,用于传递采集的帧数据到推理线程 - """ - super().__init__(daemon=True) # 设置为守护线程,主程序退出时自动结束 - self.cap = cap - self.depth_stream = depth_stream - self.depth_width, self.depth_height = depth_size - self.task_queue = task_queue - self.running = True # 线程运行状态标志 - print("[INFO] 图像采集线程已初始化") - - def run(self): - """ - 线程主循环,持续采集图像数据 - - 执行流程: - 1. 读取彩色帧并进行镜像翻转 - 2. 执行摄像头遮挡检测 - 3. 读取深度帧数据 - 4. 管理任务队列 (防止溢出) - 5. 将帧数据放入队列供推理线程处理 - """ - while self.running: - # 读取彩色帧 - ret, color_frame = self.cap.read() - if not ret: - print("[WARN] 无法读取彩色帧,跳过当前循环...") - continue - - # 水平翻转以对齐深度流 (原始彩色流与深度流为镜像画面) - color_frame = cv2.flip(color_frame, 1) - - # 摄像头遮挡检测 (基于图像质量分析) - self._detect_camera_occlusion(color_frame) - - # 读取深度帧数据 - depth_frame = self.depth_stream.read_frame() - depth_data = np.ndarray( - (self.depth_height, self.depth_width), - dtype=np.uint16, - buffer=depth_frame.get_buffer_as_uint16() - ) - - # 队列管理: 如果队列已满,移除最旧的帧以保持实时性 - if self.task_queue.full(): - try: - self.task_queue.get_nowait() # 非阻塞获取并丢弃最旧的帧 - print("[DEBUG] 任务队列已满,丢弃旧帧以保持实时性") - except queue.Empty: - pass - - # 将新采集的帧数据放入队列 - self.task_queue.put((color_frame, depth_data)) - - def _detect_camera_occlusion(self, frame): - """ - 检测摄像头是否被遮挡 - - 通过分析图像的亮度和方差来判断摄像头是否被遮挡或存在异常。 - 检测原理: - - 亮度过低: 可能被不透明物体遮挡 - - 方差过小: 画面过于均匀,可能被单色物体遮 - - Args: - frame (np.ndarray): 输入的彩色图像帧 - """ - # 转换为灰度图以简化计算 - gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - - # 计算图像质量指标 - mean_brightness = gray_frame.mean() # 平均亮度 - std_dev = gray_frame.std() # 标准差 (反映图像对比度) - - # 检测遮挡条件 - if mean_brightness < BRIGHTNESS_THRESHOLD or std_dev < VARIANCE_THRESHOLD: - print(f"[WARN] 检测到摄像头画面可能被遮挡!") - print(f" 亮度值: {mean_brightness:.1f} (阈值: {BRIGHTNESS_THRESHOLD})") - print(f" 方差值: {std_dev:.1f} (阈值: {VARIANCE_THRESHOLD})") - - def stop(self): - """停止图像采集线程""" - self.running = False - print("[INFO] 图像采集线程停止信号已发送") - - -class InferenceThread(threading.Thread): - """ - 推理线程类 - - 每个推理线程绑定一颗 NPU 核心,从任务队列获取帧数据进行 YOLOv11 模型推理。 - 该线程实现了消费者-生产者模式中的处理环节,是整个系统的核心计算单元。 - - 主要功能: - 1. NPU 模型加载和初始化 - 2. 图像预处理 (letterbox 缩放、颜色空间转换) - 3. 模型推理执行 - 4. 后处理和结果传递 - - Attributes: - core_id: 绑定的 NPU 核心 ID (0, 1, 2) - task_queue: 任务队列,获取待推理的帧数据 - result_queue: 结果队列,存放推理结果 - helper: COCO 数据集辅助工具 - model: RKNN 模型容器 - input_blob: 预分配的输入缓冲区 - running: 线程运行状态控制标志 - """ - - def __init__(self, npu_core: int, task_queue: queue.Queue, result_queue: queue.Queue, helper: COCO_test_helper): - """ - 初始化推理线程 - - Args: - npu_core (int): 绑定的 NPU 核心ID (0, 1, 2) - task_queue (queue.Queue): 任务队列,获取待推理的帧数据 - result_queue (queue.Queue): 结果队列,存放推理结果 - helper (COCO_test_helper): COCO 数据集辅助工具,用于图像预处理 - """ - super().__init__(daemon=True) # 设置为守护线程 - self.core_id = npu_core - self.task_queue = task_queue - self.result_queue = result_queue - self.helper = helper - self.running = True - - print(f"[INFO] NPU-{self.core_id}: 正在加载 RKNN 模型...") - - # 为当前 NPU 核心加载 RKNN 模型 - try: - self.model = RKNN_model_container(MODEL_PATH, self.core_id) - print(f"[INFO] NPU-{self.core_id}: 模型加载成功") - except Exception as e: - print(f"[ERROR] NPU-{self.core_id}: 模型加载失败 - {e}") - raise - - # 预分配输入缓冲区以避免重复内存分配 (性能优化) - self.input_blob = np.zeros((1, IMG_SIZE[0], IMG_SIZE[1], 3), dtype=np.uint8) - - def run(self): - """ - 线程主循环,持续进行模型推理 - - 执行流程: - 1. 从任务队列获取帧数据 - 2. 图像预处理 (letterbox 缩放、颜色空间转换) - 3. 执行 NPU 模型推理 - 4. 后处理解析网络输出 - 5. 将结果放入结果队列 - """ - while self.running: - try: - # 从任务队列获取帧数据 (超时 0.1 秒避免死锁) - color_frame, depth_data = self.task_queue.get(timeout=0.1) - except queue.Empty: - continue # 队列为空时继续等待 - - # 执行推理流程 - try: - # 步骤1: 图像预处理 - processed_image = self._preprocess_image(color_frame) - - # 步骤2: 执行模型推理 - model_outputs = self.model.run([processed_image]) - - # 步骤3: 后处理解析结果 - boxes, classes, scores = post_process(model_outputs) - - # 步骤4: 将结果放入结果队列 - result_data = (color_frame, depth_data, boxes, classes, scores) - self.result_queue.put(result_data) - - except Exception as e: - print(f"[ERROR] NPU-{self.core_id}: 推理过程失败 - {e}") - continue - - def _preprocess_image(self, frame): - """ - 图像预处理函数 - - 执行YOLOv11模型所需的图像预处理步骤: - 1. letterbox 缩放到模型输入尺寸 (保持宽高比) - 2. 颜色空间转换 BGR → RGB - 3. 数据类型转换和内存布局调整 - - Args: - frame (np.ndarray): 输入的彩色图像帧 (BGR格式) - - Returns: - np.ndarray: 预处理后的图像数据,准备输入模型 - """ - # letterbox 缩放: 保持宽高比的情况下缩放到目标尺寸,不足部分用黑色填充 - letterbox_image = self.helper.letter_box( - frame, - new_shape=(IMG_SIZE[1], IMG_SIZE[0]), # (width, height) - pad_color=(0, 0, 0) # 黑色填充 - ) - - # 颜色空间转换: OpenCV默认是BGR,模型需要RGB - rgb_image = cv2.cvtColor(letterbox_image, cv2.COLOR_BGR2RGB) - - # 将处理后的图像放入预分配的缓冲区 - self.input_blob[0] = rgb_image - - return self.input_blob - - def stop(self): - """停止推理线程""" - self.running = False - print(f"[INFO] NPU-{self.core_id}: 推理线程停止信号已发送") - -# ============================== WebSocket 距离上报线程 ============================== # - -# WebSocket 依赖库检查 -try: - import websocket # websocket-client 库 -except ImportError: - raise ImportError("请先安装 websocket‑client 库:pip install websocket-client") - - -class DistanceUploaderWS(threading.Thread): - """ - WebSocket 距离上报线程类 - - 负责维护与远程服务器的 WebSocket 连接,并按固定间隔上报检测到的最近距离数据。 - 该线程采用后台运行模式,不会阻塞主程序的正常执行。 - - 主要功能: - 1. 自动建立和维护 WebSocket 连接 - 2. 连接断开时自动重连 - 3. 距离数据的队列化管理和节流控制 - 4. JSON 格式数据传输 - 5. 错误处理和异常恢复 - - Attributes: - url (str): WebSocket 服务器地址 - distance_queue (queue.Queue): 距离数据队列 - _stop_event (threading.Event): 线程停止事件 - ws (websocket.WebSocket): WebSocket 连接对象 - last_sent_time (float): 上次发送数据的时间戳 - """ - - def __init__(self, url: str = WS_URL): - """ - 初始化 WebSocket 上报线程 - - Args: - url (str): WebSocket 服务器地址,默认使用全局配置中的 WS_URL - """ - super().__init__(name="DistanceUploaderWS", daemon=True) - self.url = url - self.distance_queue: "queue.Queue[float]" = queue.Queue() # 距离数据队列 - self._stop_event = threading.Event() # 线程停止控制事件 - self.ws = None # WebSocket 连接对象 - self.last_sent_time = 0.0 # 上次发送时间戳 (用于节流控制) - - print(f"[INFO] WebSocket 上报线程已初始化,目标地址: {self.url}") - - def send_distance(self, distance: float): - """ - 发送距离数据到队列 - - 该方法线程安全,可以从任意线程调用。只接受有限的浮点数值, - 会自动过滤无效数据 (NaN, 无穷大等)。 - - Args: - distance (float): 距离值 (米),必须是有限的浮点数 - """ - if distance is not None and np.isfinite(distance): - try: - self.distance_queue.put_nowait(distance) # 非阻塞放入队列 - except queue.Full: - # 队列满时丢弃新数据,保持实时性 - print("[DEBUG] 距离数据队列已满,丢弃新数据") - - def run(self): - """ - 线程主循环,维护 WebSocket 连接并发送距离数据 - - 执行流程: - 1. 检查并建立/维护 WebSocket 连接 - 2. 从队列获取距离数据 - 3. 执行节流控制 (避免发送过于频繁) - 4. 构造 JSON 数据包并发送 - 5. 异常处理和自动重连 - """ - while not self._stop_event.is_set(): - try: - # 步骤1: 确保 WebSocket 连接可用 - self._ensure_connection() - - # 步骤2: 获取距离数据 (超时 0.1 秒) - try: - distance = self.distance_queue.get(timeout=0.1) - except queue.Empty: - continue - - # 步骤3: 节流控制 - current_time = time.time() - if current_time - self.last_sent_time < WS_REPORT_INTERVAL: - # 间隔太短,清空队列中的旧数据,只保留最新值 - self._flush_old_data() - continue - - # 步骤4: 构造并发送 JSON 数据包 - self._send_distance_data(distance) - self.last_sent_time = current_time - - except Exception as e: - # 任何异常都会触发连接重置,确保系统稳定性 - print(f"[ERROR] WebSocket 上报过程中发生异常: {e}") - self._reset_connection() - time.sleep(1) # 异常后短暂等待 - - def _ensure_connection(self): - """ - 确保 WebSocket 连接可用 - - 如果连接不存在或已断开,会尝试建立新连接。 - 连接建立失败时会抛出异常,由主循环处理。 - """ - if self.ws is None: - print(f"[INFO] 正在建立 WebSocket 连接: {self.url}") - self.ws = websocket.create_connection(self.url, timeout=5) - print("[INFO] WebSocket 连接建立成功") - - def _flush_old_data(self): - """ - 清空队列中的旧数据,只保留最新的距离值 - - 该方法用于实现节流控制,避免在短时间内发送过多数据。 - """ - latest_distance = None - while not self.distance_queue.empty(): - try: - latest_distance = self.distance_queue.get_nowait() - except queue.Empty: - break - - # 如果有最新数据,重新放回队列 - if latest_distance is not None: - try: - self.distance_queue.put_nowait(latest_distance) - except queue.Full: - pass # 队列满时直接丢弃 - - def _send_distance_data(self, distance: float): - """ - 发送距离数据到 WebSocket 服务器 - - Args: - distance (float): 距离值 (米) - - Raises: - Exception: WebSocket 发送失败时抛出异常 - """ - # 构造 JSON 数据包 - beijing_time = get_beijing_time() - payload = { - "distance": round(float(distance), 2), # 距离值,保留2位小数 - "timestamp": beijing_time.isoformat(), # 北京时间 ISO 格式 - "source": "yolov11_depth_detector" # 数据源标识 - } - - # 发送 JSON 数据 - json_data = json.dumps(payload, ensure_ascii=False) - self.ws.send(json_data) - - print(f"[DEBUG] 已发送距离数据: {distance:.2f}m @ {beijing_time.strftime('%H:%M:%S')}") - - def _reset_connection(self): - """ - 重置 WebSocket 连接 - - 安全地关闭现有连接并重置连接对象,为重连做准备。 - """ - if self.ws is not None: - try: - self.ws.close() - except Exception as e: - print(f"[DEBUG] 关闭 WebSocket 连接时出现异常: {e}") - finally: - self.ws = None - print("[INFO] WebSocket 连接已重置") - - def stop(self): - """ - 停止 WebSocket 上报线程 - - 设置停止事件并安全关闭 WebSocket 连接。 - 该方法是线程安全的,可以从任意线程调用。 - """ - print("[INFO] 正在停止 WebSocket 上报线程...") - self._stop_event.set() # 设置停止事件 - self._reset_connection() # 关闭连接 - print("[INFO] WebSocket 上报线程已停止") - -# ============================== 主程序入口 ============================== # - -def main(): - """ - 主函数:初始化设备、创建线程、运行检测循环 - """ - print("[INFO] 开始初始化 YOLOv11 + 深度检测系统...") - - # ==================== 配置验证 ==================== # - print("[INFO] 验证配置参数...") - - # 验证模型文件存在性 - if not os.path.exists(MODEL_PATH): - print(f"[ERROR] 模型文件不存在: {MODEL_PATH}") - print("[HINT] 请检查 .env 文件中的 MODEL_PATH 配置") - sys.exit(1) - - # 验证 OpenNI 库路径 - if not os.path.exists(OPENNI_LIB_PATH): - print(f"[ERROR] OpenNI 库路径不存在: {OPENNI_LIB_PATH}") - print("[HINT] 请检查 .env 文件中的 OPENNI_LIB_PATH 配置") - sys.exit(1) - - # 验证保存目录权限 - try: - os.makedirs(WARNING_IMAGE_DIR, exist_ok=True) - os.makedirs(VIDEO_SAVE_DIR, exist_ok=True) - print(f"[INFO] 保存目录已准备: {WARNING_IMAGE_DIR}, {VIDEO_SAVE_DIR}") - except PermissionError as e: - print(f"[ERROR] 无法创建保存目录: {e}") - print("[HINT] 请检查目录权限或修改 .env 文件中的保存路径") - sys.exit(1) - - # 验证 NPU 核心配置 - if not NPU_CORES: - print("[ERROR] NPU 核心配置为空") - print("[HINT] 请检查 .env 文件中的 NPU_CORES 配置") - sys.exit(1) - - # 验证阈值配置的合理性 - if not (0.0 < OBJ_THRESH < 1.0): - print(f"[ERROR] 置信度阈值配置错误: {OBJ_THRESH}") - print("[HINT] OBJ_THRESH 应该在 0.0 到 1.0 之间") - sys.exit(1) - - if not (0.0 < NMS_THRESH < 1.0): - print(f"[ERROR] NMS 阈值配置错误: {NMS_THRESH}") - print("[HINT] NMS_THRESH 应该在 0.0 到 1.0 之间") - sys.exit(1) - - print("[INFO] 配置验证通过") - - # ==================== 设备初始化 ==================== # - # 检查 OpenNI 库路径是否存在 - if not os.path.exists(OPENNI_LIB_PATH): - print(f"[ERROR] OpenNI 库路径不存在: {OPENNI_LIB_PATH}") - sys.exit(1) - - # 初始化 OpenNI 库 - print("[INFO] 正在初始化 OpenNI 库...") - openni2.initialize(OPENNI_LIB_PATH) - - # 打开深度摄像头设备 - print("[INFO] 正在打开深度摄像头设备...") - device = openni2.Device.open_any() - - # 创建并启动深度流 - depth_stream = device.create_depth_stream() - depth_stream.start() - depth_mode = depth_stream.get_video_mode() - depth_width = depth_mode.resolutionX - depth_height = depth_mode.resolutionY - print(f"[INFO] 深度流已启动,分辨率: {depth_width}x{depth_height}") - - # 初始化彩色摄像头 - print("[INFO] 正在初始化彩色摄像头...") - cap = cv2.VideoCapture(0, cv2.CAP_V4L2) # 使用 V4L2 后端 - cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) # 设置 MJPG 编码以提高性能 - - if not cap.isOpened(): - print("[ERROR] 无法打开彩色摄像头 /dev/video0") - sys.exit(1) - - # 获取彩色图像尺寸 - color_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - color_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - print(f"[INFO] 彩色摄像头已打开,分辨率: {color_width}x{color_height}") - - # ⭐ 准备报警和视频保存目录 - os.makedirs(WARNING_IMAGE_DIR, exist_ok=True) - os.makedirs(VIDEO_SAVE_DIR, exist_ok=True) - # 准备视频文件名 - ts = time.localtime() - raw_video_filename = time.strftime(RAW_VIDEO_NAME_FORMAT, ts) - mark_video_filename = time.strftime(MARK_VIDEO_NAME_FORMAT, ts) - raw_writer = None - mark_writer = None - fourcc = cv2.VideoWriter_fourcc(*"mp4v") - - # ==================== 队列和辅助工具初始化 ==================== # - # 创建任务队列和结果队列 - task_queue = queue.Queue(maxsize=CAP_QUEUE_SIZE) # 采集→推理队列 - result_queue = queue.Queue(maxsize=RES_QUEUE_SIZE) # 推理→渲染队列 - - # 初始化 COCO 数据集辅助工具 - helper = COCO_test_helper(enable_letter_box=True) - print("[INFO] COCO 辅助工具已初始化") - - # ==================== 线程创建与启动 ==================== # - # 创建并启动图像采集线程 - capture_thread = CaptureThread( - cap, - depth_stream, - (depth_width, depth_height), - task_queue - ) - capture_thread.start() - - # NPU 可用性检测和推理线程创建 - print("[INFO] 正在检测 NPU 可用性...") - valid_cores = [] - - for core in NPU_CORES: - try: - # 尝试创建模型容器以检测 NPU 可用性 - test_model = RKNN_model_container(MODEL_PATH, core) - valid_cores.append(core) - del test_model # 释放测试用的模型 - print(f"[INFO] NPU-{core}: 可用") - except Exception as e: - print(f"[WARN] NPU-{core}: 初始化失败,跳过 - {e}") - - # 创建推理线程(每个可用的 NPU 核心一个线程) - inference_workers = [] - for core in valid_cores: - worker = InferenceThread(core, task_queue, result_queue, helper) - worker.start() - inference_workers.append(worker) - - if not inference_workers: - print("[ERROR] 没有可用的 NPU 核心,程序退出") - sys.exit(1) - - print(f"[INFO] 成功启动 {len(inference_workers)} 个推理线程") - - # ⭐ 启动 WebSocket 上报线程 - uploader = DistanceUploaderWS(WS_URL) - uploader.start() - print("[INFO] WebSocket 上报线程已启动") - - - # ⭐ 新增:初始化 GPIO 控制器 - gpio_controller = GPIOController( - gpio_cmd=GPIO_CMD_PATH, - config_file=GPIO_CONFIG_FILE, - delay_seconds=GPIO_DELAY_SECONDS - ) - - print("[INFO] GPIO 控制器已启动") - - # ==================== 主渲染循环 ==================== # - # 创建显示窗口 - cv2.namedWindow("YOLOv11 Depth Demo", cv2.WINDOW_NORMAL) - cv2.setWindowProperty("YOLOv11 Depth Demo", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) - - # ⭐ 新增:获取全屏窗口可绘制区域尺寸(用来做 cover) - try: - _, _, win_w, win_h = cv2.getWindowImageRect(WIN_NAME) - except Exception: - # 少数平台若不支持该 API,可给一个兜底尺寸(如 1920x1080) - win_w, win_h = 1920, 1080 - - # 初始化 FPS 计算 - prev_time = time.perf_counter() - frame_count = 0 - - print("[INFO] 开始主渲染循环,按 'q' 键退出...") - - # 距离区域检测框数量管理初始化 - prev_zone_counts = [0, 0, 0] # 上一次各区域的检测框数量 - last_save_time = 0.0 # 上次保存报警截图的时间 - - # FPS 统计工具类实例 - fps_stats = FPSStatistics() if FPS_STATS_ENABLED else None - - try: - while True: - - frame_start_wall = get_beijing_time() - - try: - # 从结果队列获取推理结果(超时 50ms) - frame, depth_raw, boxes, classes, scores = result_queue.get(timeout=0.05) - except queue.Empty: - continue # 没有结果时继续等待 - - # ==================== 结果绘制 ==================== # - if boxes is not None: - # 将检测框从模型坐标系转换回原图坐标系 - boxes = helper.get_real_box(boxes) - - # 筛选出人体检测结果 - person_mask = classes == PERSON_CLASS_ID - person_boxes = boxes[person_mask] - person_scores = scores[person_mask] - - # ⭐ 新增:判断是否检测到人 - if len(person_boxes) > 0: - person_detected = True - - # ⭐ 新增:更新 GPIO 状态 - gpio_controller.update(person_detected) - - # ⭐ 新增:归一化坐标 [0,1](相对原始 color_width/height) - norm_person_boxes = normalize_boxes(person_boxes, color_width, color_height) - - # 准备距离收集列表 - distances = [] - - # 绘制每个检测到的人体 - for box, score in zip(person_boxes, person_scores): - distance_meters = process_single_detection(box, score, frame, depth_raw, color_width, color_height, depth_width, depth_height) - - # 收集距离用于报警和上报 - if not math.isnan(distance_meters): - distances.append(distance_meters) - - # ⭐ Send distance via WebSocket (最近距离) - if len(distances) > 0: - valid_dists = [d for d in distances if np.isfinite(d)] - if valid_dists: - uploader.send_distance(min(valid_dists)) - - # ⭐ 绘制警告覆盖层 - draw_warning_overlay(frame, distances) - - # ⭐ 统计当前各距离区域的检测框数量 - current_zone_counts = calculate_zone_counts(distances) - - # ⭐ 检查是否有区域检测框数量变化(增加或区域间移动) - save_needed = check_warning_conditions(current_zone_counts, prev_zone_counts) - - # ⭐ 保存报警图片(如果有变化) - now_time = time.time() - if save_needed and (now_time - last_save_time >= WARNING_SNAPSHOT_INTERVAL): - save_warning_snapshot(frame, frame_start_wall, current_zone_counts, prev_zone_counts, distances) - last_save_time = now_time - - # 更新检测框数量 - prev_zone_counts = current_zone_counts - - # ⭐ 保存原始帧和标记帧到视频 - raw_writer, mark_writer = handle_video_recording(raw_writer, mark_writer, frame, raw_video_filename, mark_video_filename, fourcc) - - # ==================== FPS 计算和显示 ==================== # - current_time = time.perf_counter() - fps = 1.0 / (current_time - prev_time) - prev_time = current_time - frame_count += 1 - - # 更新 FPS 统计 - if fps_stats: - fps_stats.update(fps) - - # 绘制 FPS 信息 - draw_fps_info(frame, fps) - - # 每100帧输出一次统计信息 - if frame_count % 100 == 0: - print(f"[INFO] 已处理 {frame_count} 帧,当前 FPS: {fps:.2f}") - - # 应用画面翻转(仅用于显示) - display_frame = apply_frame_flip(frame) - - # ⭐ 新增:定期刷新窗口尺寸(多屏/切分辨率时更稳) - if frame_count % 60 == 0: - try: - _, _, win_w, win_h = cv2.getWindowImageRect(WIN_NAME) - except Exception: - pass - - # ⭐ 关键一步:把显示帧按窗口全屏尺寸做 cover,去除白边 - display_frame = resize_cover(display_frame, win_w, win_h) - - # 显示图像 - cv2.imshow("YOLOv11 Depth Demo", display_frame) - - # 检查退出键 - if cv2.waitKey(1) & 0xFF == ord('q'): - print("[INFO] 检测到退出信号") - break - - except KeyboardInterrupt: - print("\n[INFO] 收到键盘中断信号") - - finally: - # ==================== 资源清理 ==================== # - print("[INFO] 正在清理资源...") - - - # ⭐ 新增:复位 GPIO 到高电平 - try: - gpio_controller._update_config('true') - subprocess.run([GPIO_CMD_PATH], timeout=5) - print("[INFO] GPIO 已复位到高电平") - except Exception as e: - print(f"[WARN] GPIO 复位失败: {e}") - -cv2.destroyAllWindows() - - # 停止所有线程 - capture_thread.stop() - capture_thread.join(timeout=2.0) - - for worker in inference_workers: - worker.stop() - worker.join(timeout=2.0) - - # 释放设备资源 - cap.release() - depth_stream.stop() - device.close() - openni2.unload() - - # 释放视频写入器 - if raw_writer is not None: - raw_writer.release() - mark_writer.release() - # 停止 WebSocket 上报线程 - uploader.stop() - uploader.join(timeout=1) - - cv2.destroyAllWindows() - # 打印 FPS 统计报告 - if FPS_STATS_ENABLED and fps_stats: - fps_stats.print_statistics() - print("[INFO] 程序已安全退出") - -def apply_command_line_args(args): - """ - 根据命令行参数修改全局配置 - - Args: - args: 命令行参数对象 - """ - global ENABLE_GUI, ENABLE_VIDEO_RECORDING, FPS_STATS_ENABLED - - if args.no_gui: - ENABLE_GUI = False - - if args.no_video: - ENABLE_VIDEO_RECORDING = False - - if args.no_fps_stats: - FPS_STATS_ENABLED = False - -if __name__ == "__main__": - """ - 程序入口点 - - 检查运行环境并启动主程序。支持命令行参数控制GUI显示模式。 - """ - import argparse - - # 创建命令行参数解析器 - parser = argparse.ArgumentParser( - description="YOLOv11 深度检测系统", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -示例用法: - python ob_yolo_rknn.py # 启动GUI模式 - python ob_yolo_rknn.py --no-gui # 启动无头模式 - python ob_yolo_rknn.py --no-video # 禁用视频录制 - python ob_yolo_rknn.py --no-fps-stats # 禁用FPS统计 - """ - ) - - # 添加命令行参数 - parser.add_argument( - '--no-gui', - action='store_true', - help='禁用GUI显示,以无头模式运行' - ) - parser.add_argument( - '--no-video', - action='store_true', - help='禁用视频录制功能' - ) - parser.add_argument( - '--no-fps-stats', - action='store_true', - help='禁用FPS统计功能' - ) - - # 解析命令行参数 - args = parser.parse_args() - - # 应用命令行参数配置 - apply_command_line_args(args) - - # 显示配置变更信息 - if args.no_gui: - print("[INFO] GUI 显示已禁用 (命令行参数)") - - if args.no_video: - print("[INFO] 视频录制已禁用 (命令行参数)") - - if args.no_fps_stats: - print("[INFO] FPS 统计已禁用 (命令行参数)") - - # 显示启动信息 - gui_status = "启用" if ENABLE_GUI else "禁用 (无头模式)" - video_status = "启用" if ENABLE_VIDEO_RECORDING else "禁用" - fps_status = "启用" if FPS_STATS_ENABLED else "禁用" - beijing_time_str = format_beijing_timestamp() - - print(f""" -======================================== - YOLOv11 深度检测系统 v2.0 -======================================== -配置信息: -- GUI 显示: {gui_status} -- 视频录制: {video_status} -- FPS 统计: {fps_status} -- NPU 核心: {NPU_CORES} -- 模型路径: {MODEL_PATH} -- 深度库路径: {OPENNI_LIB_PATH} -- 北京时间: {beijing_time_str} -======================================== - """) - - try: - # 启动主程序 - main() - except Exception as e: - print(f"\n[CRITICAL ERROR] 程序运行时发生严重错误:") - print(f"[ERROR] {type(e).__name__}: {e}") - print("\n[DEBUG] 请检查以下项目:") - print("1. 硬件连接 (摄像头、深度传感器)") - print("2. 库文件路径配置") - print("3. RKNN 模型文件") - print("4. 系统权限设置") - print("5. NPU 驱动状态") - sys.exit(1) diff --git a/VideoProsessing/bin/video b/VideoProsessing/bin/video index 848a6cb..4f5147d 100755 Binary files a/VideoProsessing/bin/video and b/VideoProsessing/bin/video differ diff --git a/VideoProsessing/src/main.cpp b/VideoProsessing/src/main.cpp index 6d0128c..68733e0 100644 --- a/VideoProsessing/src/main.cpp +++ b/VideoProsessing/src/main.cpp @@ -13,9 +13,10 @@ #include #include #include -#include +#include //队列 #include #include +#include //双端队列 #include "Netra.hpp" @@ -24,15 +25,13 @@ using namespace QCL; using namespace cv; using namespace chrono_literals; -// 全局变量 -VideoCapture cap(0); -Mat handleFrame; // 存放处理后的帧 +// 路径和接口 const string mqtt_url = "tcp://192.168.12.1:1883"; const string clientId = "video_subData"; const string Topic = "/video/PersonData"; const string filePath = "../../InitAuth/conf/.env"; // 配置保存路径 -const int Qos = 0; -mqtt::async_client client(mqtt_url, clientId); +const string warningPath = "/mnt/save/warning/"; // 报警图片保存路径 +const string videoPath = "/mnt/save/video/"; // 报警视频保存路径 // 保存检测结果 struct Dection @@ -49,15 +48,26 @@ struct dangerDistance int safe; } dis; -mutex detMutex; // 保护latestDection的互斥锁 -vector latestDection; // 保存最新接收到的检测结果 - +// 全局变量和对象 +VideoCapture cap(0); +Mat handleFrame; // 存放处理后的帧 +const int Qos = 0; +mqtt::async_client client(mqtt_url, clientId); +mutex detMutex; // 保护latestDection的互斥锁 +vector latestDection; // 保存最新接收到的检测结果 mutex alertMutex; // 保护alertQueue的互斥锁 condition_variable alertcv; // 通知报警线程有新任务 queue alertQueue; // 存放解析后的数据 std::atomic alertWorkerRunning{false}; // 工作线程运行标志 atomic outPutMode = false; // 保存报警输出false--电平 -// atomic currentFrameHasDanger{false}; // 是否进行了报警 + +// 视频相关 +const int FPS = 30; // 帧率 +const int PRE_RECORD_SECONDS = 10; // 预录制时长 +const int MAX_BUFFER_SIZE = FPS * PRE_RECORD_SECONDS; // 缓冲区最大帧数 +mutex bufferMutex; // 保护缓冲区的锁 +deque videoDeque; // 环形缓冲区,存储最近十秒的画面帧 +atomic isRecording{false}; // 是否正在写入文件 // mqtt初始化 void MqttInit(); @@ -81,8 +91,12 @@ void warnThread(); bool GetDistance(); // 调用报警输出程序 void setGPIOLevel(int level); -// // 状态机 -// void warnStatus(); +// 获取当前时间字符串做文件名 +string getCurrentTimeStr(); +// 保存图片 +void saveAlarmImage(); +// 保存视频 +void saveAlarmVideo(); int main() { @@ -111,6 +125,50 @@ int main() return 0; } +// 保存图片 +void saveAlarmImage(const Mat &frame) +{ + // 保存路径 + string fileName = warningPath + "alarm_" + getCurrentTimeStr() + ".jpg"; + imwrite(fileName, frame); +} +// 保存视频 +void saveAlarmVideo(deque bufferSnapshot) +{ + thread([bufferSnapshot]() + { + string fileName = videoPath + "alarm_" + getCurrentTimeStr() + ".mp4"; + // 初始化视频写入器 + VideoWriter write; + int codec = write.fourcc('H', '2', '6', '4'); + Size size(1920, 1080); + if (!bufferSnapshot.empty()) + { + size = bufferSnapshot.front().size(); + } + write.open(fileName, codec, FPS, size, true); + + // 写入缓冲区的所有帧 + for (auto &ii : bufferSnapshot) + { + write.write(ii); + } + + //退出清理 + write.release(); }) + .detach(); +} + +// 获取当前时间字符串做文件名 +string getCurrentTimeStr() +{ + auto now = chrono::system_clock::now(); + auto time_t_now = chrono::system_clock::to_time_t(now); + stringstream ss; + ss << put_time(localtime(&time_t_now), "%Y%m%d_%H%M%S"); + return ss.str(); +} + // 调用报警输出程序 void setGPIOLevel(int level) { @@ -118,19 +176,6 @@ void setGPIOLevel(int level) system(cmd.c_str()); } -// // 状态机 -// void warnStatus() -// { -// thread([]() -// { -// bool isAlarm = false; -// auto lastDangerTime = chrono::steady_clock::now(); -// while(alertWorkerRunning.load()) -// { - -// } }); -// } - // 报警线程 void warnThread() { @@ -201,13 +246,13 @@ void warnThread() } } + catch(const std::exception& e) { std::cerr << e.what() << '\n'; } - lk.lock(); //重新上锁,以继续处理下个任务 - + lk.lock(); //重新上锁,以继续处理下个任务 } //逻辑处理,状态机 @@ -446,6 +491,12 @@ bool processFrame(VideoCapture &cap, FILE *pipe, Mat &frame, int64 &count, chron latestDection.clear(); } + // 短锁进行保存 + { + lock_guard lk(bufferMutex); + videoDeque.push_back(handleFrame); + } + // 可选:显示窗口 // imshow("测试画面", frame);