This commit is contained in:
2025-11-18 16:59:51 +08:00
parent dcecf87fd0
commit 29c66a752c
2 changed files with 107 additions and 27 deletions

Binary file not shown.

View File

@@ -10,6 +10,11 @@
#include <opencv4/opencv2/opencv.hpp>
#include <mqtt/async_client.h>
#include <nlohmann/json.hpp>
#include <mutex>
#include <vector>
#include <queue>
#include <condition_variable>
#include <atomic>
using namespace std;
using namespace cv;
@@ -24,7 +29,21 @@ const string Topic = "/video/PersonData";
const int Qos = 0;
mqtt::async_client client(mqtt_url, clientId);
// 函数声明
// 保存检测结果
struct Dection
{
int x, y, w, h;
double distance;
};
mutex detMutex; // 保护latestDection的互斥锁
vector<Dection> latestDection; // 保存最新接收到的检测结果
mutex alertMutex; // 保护alertQueue的互斥锁
condition_variable alertcv; // 通知报警线程有新任务
queue<nlohmann::json> alertQueue; // 存放解析后的数据
std::atomic<bool> alertWorkerRunning{false}; // 工作线程运行标志
// mqtt初始化
void MqttInit();
// 摄像头管道初始化
@@ -87,6 +106,44 @@ void MqttInit()
client.connect()->wait();
client.subscribe(Topic, Qos)->wait();
// 开启报警线程
thread([]()
{
while (alertWorkerRunning.load())
{
unique_lock<mutex> lk(alertMutex);
// 等待条件,队列非空或者收到停止信号
alertcv.wait(lk,[](){
return !alertQueue.empty() || !alertWorkerRunning.load();});
//如果线程应退出切队列为空,退出循环
if(alertQueue.empty() && !alertWorkerRunning.load())
break;
//处理队列中的所有信号
while(!alertQueue.empty())
{
auto job = move(alertQueue.front());
alertQueue.pop(); //释放空指针
//解锁,以便于增加队列任务
lk.unlock();
//报警处理函数
try
{
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
lk.lock(); //重新上锁,以继续处理下个任务
}
} })
.detach();
}
// mqtt接收订阅消息的回调
@@ -98,19 +155,35 @@ void getMsgCallback(mqtt::const_message_ptr msg)
try {
// TODO 处理接收到到的位置数据
auto json = nlohmann::json::parse(payload);
//绘制 接收到的坐标 的方框
thread([json](){
for(auto & ii :json)
{
int x = static_cast<int>(ii["x"]);
int y = static_cast<int>(ii["y"]);
int w = static_cast<int>(ii["w"]);
int h = static_cast<int>(ii["h"]);
double distance = static_cast<double>(ii["distance"]);
drawRect(x,y,w,h,distance);
}
}).detach();
vector<Dection> dets;
dets.reserve(json.size()); //预分配,减少多次分配
for(const auto&ii:json)
{
Dection d;
// 使用静态转换确保类型为整数/浮点
d.x = static_cast<int>(ii.value("x", 0));
d.y = static_cast<int>(ii.value("y", 0));
d.w = static_cast<int>(ii.value("w", 0));
d.h = static_cast<int>(ii.value("h", 0));
d.distance = static_cast<double>(ii.value("distance", 0.0));
dets.push_back(d);
}
//原子替换最新检测合集
{
lock_guard<mutex> lk(detMutex);
latestDection = move(dets);
}
{
lock_guard<mutex> lk(alertMutex);
alertQueue.push(move(json)); //push后通知工作线程
}
//唤醒任务线程
alertcv.notify_one();
}
catch (const nlohmann::json::parse_error &e) {
cerr << "JSON 解析错误: " << e.what() << "\n原始 payload: " << payload << "\n";
@@ -180,22 +253,29 @@ bool processFrame(VideoCapture &cap, FILE *pipe, Mat &frame, int64 &count, chron
return false;
}
// 拷贝视频帧
handleFrame = frame;
// FPS计算与显示
++count;
auto now = chrono::steady_clock::now();
if (chrono::duration_cast<chrono::milliseconds>(now - t0).count() / 1000.0 > 1.0)
{
string fps = "FPS:" + to_string(count);
count = 0;
t0 = now;
}
// 写入管道
fwrite(frame.data, 1, frame.total() * frame.elemSize(), pipe);
fflush(pipe);
// 拷贝视频帧
handleFrame = frame;
// 读取最新检测:短锁获取并将结果拷贝到本地变量
vector<Dection> destCopy;
{
lock_guard<mutex> lk(detMutex);
destCopy = latestDection; // 复制到本地后释放锁
}
// 在主线程上进行绘制
for (const auto &ii : destCopy)
{
drawRect(ii.x, ii.y, ii.w, ii.h, ii.distance);
}
latestDection.clear();
// 可选:显示窗口
// imshow("测试画面", frame);