diff --git a/VideoProsessing/bin/video b/VideoProsessing/bin/video index 0494fed..3eb45bc 100755 Binary files a/VideoProsessing/bin/video and b/VideoProsessing/bin/video differ diff --git a/VideoProsessing/src/main.cpp b/VideoProsessing/src/main.cpp index c4e3cc4..6899035 100644 --- a/VideoProsessing/src/main.cpp +++ b/VideoProsessing/src/main.cpp @@ -10,6 +10,11 @@ #include #include #include +#include +#include +#include +#include +#include using namespace std; using namespace cv; @@ -24,7 +29,21 @@ const string Topic = "/video/PersonData"; const int Qos = 0; mqtt::async_client client(mqtt_url, clientId); -// 函数声明 +// 保存检测结果 +struct Dection +{ + int x, y, w, h; + double distance; +}; + +mutex detMutex; // 保护latestDection的互斥锁 +vector latestDection; // 保存最新接收到的检测结果 + +mutex alertMutex; // 保护alertQueue的互斥锁 +condition_variable alertcv; // 通知报警线程有新任务 +queue alertQueue; // 存放解析后的数据 +std::atomic alertWorkerRunning{false}; // 工作线程运行标志 + // mqtt初始化 void MqttInit(); // 摄像头管道初始化 @@ -87,6 +106,44 @@ void MqttInit() client.connect()->wait(); client.subscribe(Topic, Qos)->wait(); + + // 开启报警线程 + thread([]() + { + while (alertWorkerRunning.load()) + { + unique_lock lk(alertMutex); + // 等待条件,队列非空或者收到停止信号 + alertcv.wait(lk,[](){ + return !alertQueue.empty() || !alertWorkerRunning.load();}); + + //如果线程应退出切队列为空,退出循环 + if(alertQueue.empty() && !alertWorkerRunning.load()) + break; + + //处理队列中的所有信号 + while(!alertQueue.empty()) + { + auto job = move(alertQueue.front()); + alertQueue.pop(); //释放空指针 + //解锁,以便于增加队列任务 + lk.unlock(); + + //报警处理函数 + try + { + + } + catch(const std::exception& e) + { + std::cerr << e.what() << '\n'; + } + + lk.lock(); //重新上锁,以继续处理下个任务 + + } + } }) + .detach(); } // mqtt接收订阅消息的回调 @@ -98,19 +155,35 @@ void getMsgCallback(mqtt::const_message_ptr msg) try { // TODO 处理接收到到的位置数据 auto json = nlohmann::json::parse(payload); - //绘制 接收到的坐标 的方框 - thread([json](){ - for(auto & ii :json) - { - int x = static_cast(ii["x"]); - int y = static_cast(ii["y"]); - int w = static_cast(ii["w"]); - int h = static_cast(ii["h"]); - double distance = static_cast(ii["distance"]); - drawRect(x,y,w,h,distance); - } - }).detach(); - + + vector dets; + dets.reserve(json.size()); //预分配,减少多次分配 + for(const auto&ii:json) + { + Dection d; + // 使用静态转换确保类型为整数/浮点 + d.x = static_cast(ii.value("x", 0)); + d.y = static_cast(ii.value("y", 0)); + d.w = static_cast(ii.value("w", 0)); + d.h = static_cast(ii.value("h", 0)); + d.distance = static_cast(ii.value("distance", 0.0)); + + dets.push_back(d); + } + + //原子替换最新检测合集 + { + lock_guard lk(detMutex); + latestDection = move(dets); + } + + { + lock_guard lk(alertMutex); + alertQueue.push(move(json)); //push后通知工作线程 + } + + //唤醒任务线程 + alertcv.notify_one(); } catch (const nlohmann::json::parse_error &e) { cerr << "JSON 解析错误: " << e.what() << "\n原始 payload: " << payload << "\n"; @@ -180,22 +253,29 @@ bool processFrame(VideoCapture &cap, FILE *pipe, Mat &frame, int64 &count, chron return false; } - // 拷贝视频帧 - handleFrame = frame; - - // FPS计算与显示 - ++count; - auto now = chrono::steady_clock::now(); - if (chrono::duration_cast(now - t0).count() / 1000.0 > 1.0) - { - string fps = "FPS:" + to_string(count); - count = 0; - t0 = now; - } - // 写入管道 fwrite(frame.data, 1, frame.total() * frame.elemSize(), pipe); fflush(pipe); + + // 拷贝视频帧 + handleFrame = frame; + + // 读取最新检测:短锁获取并将结果拷贝到本地变量 + vector destCopy; + + { + lock_guard lk(detMutex); + destCopy = latestDection; // 复制到本地后释放锁 + } + + // 在主线程上进行绘制 + for (const auto &ii : destCopy) + { + drawRect(ii.x, ii.y, ii.w, ii.h, ii.distance); + } + + latestDection.clear(); + // 可选:显示窗口 // imshow("测试画面", frame);