Files
RkApp/VideoProsessing/src/main.cpp

1104 lines
34 KiB
C++
Raw Normal View History

2026-01-09 13:59:10 +08:00
/*
1.,使UDP原生协议进行推流,YOLO模型进行处理
2.YOLO传来的坐标和深度数据
3.
4.
5.
*/
#include <iostream>
#include <opencv4/opencv2/opencv.hpp>
#include <mqtt/async_client.h>
#include <nlohmann/json.hpp>
#include <mutex>
#include <vector>
#include <queue>
2026-01-09 13:59:10 +08:00
#include <condition_variable>
#include <atomic>
#include <deque>
2026-01-09 13:59:10 +08:00
#include <boost/process.hpp>
#include "Netra.hpp"
#include <X11/Xlib.h>
#include <optional>
#include <filesystem>
2026-01-09 13:59:10 +08:00
using namespace std;
using namespace QCL;
using namespace cv;
using namespace chrono_literals;
namespace bp = boost::process;
// 路径和接口
const string mqtt_url = "tcp://127.0.0.1:1883";
const string clientId = "video_subData";
const string Topic = "/video/PersonData";
const string filePath = "/home/orangepi/RKApp/InitAuth/conf/.env";
const string warningPath = "/mnt/save/warning/";
const string videoPath = "/mnt/save/video/";
2026-01-09 13:59:10 +08:00
// 保存检测结果
struct Dection
{
double x, y, w, h;
2026-01-09 13:59:10 +08:00
double distance;
};
// 保存报警距离
struct dangerDistance
{
int danger;
int warn;
int safe;
} dis;
struct Point2N
{
double x, y;
};
2026-01-09 13:59:10 +08:00
struct ZoneBox
{
string name;
array<Point2N, 4> vertices;
2026-01-09 13:59:10 +08:00
};
ZoneBox g_safe, g_warn, g_dang;
// 全局变量和对象
VideoCapture cap;
Mat handleFrame;
2026-01-09 13:59:10 +08:00
const int Qos = 0;
mqtt::async_client client(mqtt_url, clientId);
mutex detMutex;
vector<Dection> latestDection;
mutex alertMutex;
condition_variable alertcv;
queue<nlohmann::json> alertQueue;
std::atomic<bool> alertWorkerRunning{false};
atomic<bool> outPutMode = false;
2026-01-09 13:59:10 +08:00
bool mainRunning = true;
// 视频相关
const int FPS = 30;
const int PRE_RECORD_SECONDS = 10;
const int MAX_BUFFER_SIZE = FPS * PRE_RECORD_SECONDS;
mutex bufferMutex;
atomic<bool> isRecording{false};
atomic<bool> mediaMirror{false};
atomic<bool> mediaFlip{false};
// 报警线程用"最新一帧检测结果"
2026-01-09 13:59:10 +08:00
static std::mutex latestAlertMutex;
static std::condition_variable latestAlertCv;
static std::optional<std::vector<Dection>> latestAlertDets;
2026-01-09 13:59:10 +08:00
static std::atomic<uint64_t> latestAlertSeq{0};
// 推流线程
static std::mutex g_pipeMutex;
static std::condition_variable g_pipeCv;
// ⚡ 双缓冲主线程写A推流线程读B交替使用彻底消除数据竞争
static cv::Mat g_pipeBuf[2];
static std::atomic<int> g_pipeWriteIdx{0}; // 主线程当前写哪个
static std::atomic<bool> g_pipeRunning{false};
static std::atomic<uint64_t> g_pipeSeq{0};
// =====================================================================
// 优化:预分配环形缓冲区,彻底消除每帧 clone() 的 malloc/free 抖动
// =====================================================================
struct PreallocRingBuffer
{
vector<Mat> pool;
int head = 0;
int count = 0;
int capacity = 0;
void init(int cap_, int rows, int cols, int type)
{
capacity = cap_;
pool.resize(cap_);
for (auto &m : pool)
m.create(rows, cols, type);
head = 0;
count = 0;
}
void push(const Mat &src)
{
src.copyTo(pool[head]);
head = (head + 1) % capacity;
if (count < capacity)
count++;
}
deque<Mat> snapshot() const
{
deque<Mat> result;
result.resize(count);
int start = (count < capacity) ? 0 : head;
for (int i = 0; i < count; ++i)
{
int idx = (start + i) % capacity;
pool[idx].copyTo(result[i]);
}
return result;
}
bool empty() const { return count == 0; }
const Mat &back() const
{
int idx = (head - 1 + capacity) % capacity;
return pool[idx];
}
};
static PreallocRingBuffer g_ringBuffer;
2026-01-09 13:59:10 +08:00
// 把阈值/输出模式缓存到内存,避免频繁读文件
struct RuntimeConfig
{
std::atomic<int> danger{0};
std::atomic<int> warn{0};
std::atomic<int> safe{0};
std::atomic<bool> outPutMode{false};
};
static RuntimeConfig g_cfg;
// =====================================================================
// ⚡ 显示线程专用双缓冲
// 主线程写好一帧后通知显示线程,显示线程负责 imshow
// 主线程不再等待 waitKey彻底解放主线程帧率
// =====================================================================
static std::mutex g_dispMutex;
static std::condition_variable g_dispCv;
static cv::Mat g_dispBuf[2]; // 双缓冲
static std::atomic<int> g_dispWriteIdx{0}; // 主线程写的 slot
static std::atomic<uint64_t> g_dispSeq{0};
static std::atomic<bool> g_dispRunning{false};
// 函数声明
2026-01-09 13:59:10 +08:00
void MqttInit();
bool videoInit(VideoCapture &cap);
FILE *pipeInit();
bool processFrame(VideoCapture &cap, FILE *pipe, Mat &frame,
int64 &count, chrono::steady_clock::time_point &t0);
2026-01-09 13:59:10 +08:00
void cleanup(FILE *pipe, VideoCapture &cap);
void mainLoop(VideoCapture &cap, FILE *pipe);
void getMsgCallback(mqtt::const_message_ptr msg);
void drawRect(Mat &frame, double x, double y, double w, double h,
double distance, int dangerTh, int warnTh); // ⚡ 不再访问全局
2026-01-09 13:59:10 +08:00
void warnThread();
void setGPIOLevel(int level);
string getCurrentTimeStr();
void saveAlarmImage(const cv::Mat &frame);
void saveAlarmVideo(std::deque<cv::Mat> bufferSnapshot);
bool LoadZonesFromEnv();
void drawZones(Mat &img);
bool bottomTouchesDanger(const Dection &d, const ZoneBox &dangerBox,
int frameW, int frameH); // ⚡ 不再访问全局 handleFrame
void loadMirrerSet();
void SetMirror(Mat &frame);
void ReloadConfigIfChanged();
void Exit(int sig);
2026-01-09 13:59:10 +08:00
// =====================================================================
// 推流线程:双缓冲读,彻底消除主线程覆盖数据的竞争
// =====================================================================
static void pipeWriterThread(FILE *pipe)
{
uint64_t seen = g_pipeSeq.load(std::memory_order_relaxed);
2026-01-09 13:59:10 +08:00
while (g_pipeRunning.load())
{
int readIdx;
{
std::unique_lock<std::mutex> lk(g_pipeMutex);
g_pipeCv.wait(lk, [&]
{ return !g_pipeRunning.load() ||
g_pipeSeq.load(std::memory_order_relaxed) != seen; });
if (!g_pipeRunning.load())
break;
2026-01-09 13:59:10 +08:00
seen = g_pipeSeq.load(std::memory_order_relaxed);
// ⚡ 读另一个 slot主线程刚写完的 slot 的前一个)
readIdx = 1 - g_pipeWriteIdx.load(std::memory_order_relaxed);
}
2026-01-09 13:59:10 +08:00
const cv::Mat &local = g_pipeBuf[readIdx];
if (!pipe || local.empty())
continue;
2026-01-09 13:59:10 +08:00
const size_t bytes = local.total() * local.elemSize();
(void)fwrite(local.data, 1, bytes, pipe);
}
}
// =====================================================================
// ⚡ 显示线程imshow 和 waitKey 移出主线程,主线程只管采集处理
// =====================================================================
static void displayThread(int width, int height)
{
uint64_t seen = g_dispSeq.load(std::memory_order_relaxed);
// ⚡ 预分配显示帧resize 复用此内存
Mat scaled(height, width, CV_8UC3);
// ⚡ 预分配 RGB 帧cvtColor 复用此内存,不每帧 malloc
static Mat rgbFrame(height, width, CV_8UC3);
while (g_dispRunning.load())
{
int readIdx;
{
std::unique_lock<std::mutex> lk(g_dispMutex);
// 最多等 33ms约一帧超时也刷新一次防止 imshow 窗口冻结)
g_dispCv.wait_for(lk, std::chrono::milliseconds(33), [&] {
return !g_dispRunning.load() ||
g_dispSeq.load(std::memory_order_relaxed) != seen;
});
if (!g_dispRunning.load())
break;
seen = g_dispSeq.load(std::memory_order_relaxed);
readIdx = 1 - g_dispWriteIdx.load(std::memory_order_relaxed);
}
const cv::Mat &src = g_dispBuf[readIdx];
if (src.empty())
continue;
// ⚡ resize 到预分配内存,无 malloc
resize(src, scaled, Size(width, height), 0, 0, INTER_NEAREST);
// ⚡ BGR → RGB 转换,复用预分配内存,无 malloc
cv::cvtColor(scaled, rgbFrame, cv::COLOR_BGR2RGB);
// 显示 RGB 格式画面
imshow("处理后的画面", rgbFrame); // ✅ 只保留这一个
// waitKey 在显示线程中调用,不阻塞主线程
int key = cv::waitKey(1);
if (key == 'q' || key == 27)
{
cout << "用户请求退出" << endl;
mainRunning = false;
alertWorkerRunning = false;
break;
}
}
}
2026-01-09 13:59:10 +08:00
int main()
{
this_thread::sleep_for(5s);
if (!videoInit(cap))
return -1;
FILE *pipe = pipeInit();
if (!pipe)
return -1;
// ⚡ 推流双缓冲预分配
g_pipeBuf[0].create(480, 640, CV_8UC3);
g_pipeBuf[1].create(480, 640, CV_8UC3);
// 推流线程启动
g_pipeRunning = true;
std::thread(pipeWriterThread, pipe).detach();
// ⚡ 预分配环形缓冲区
g_ringBuffer.init(MAX_BUFFER_SIZE, 480, 640, CV_8UC3);
2026-01-09 13:59:10 +08:00
LoadZonesFromEnv();
2026-01-09 13:59:10 +08:00
signal(SIGINT, Exit);
MqttInit();
mainLoop(cap, pipe);
cleanup(pipe, cap);
return 0;
}
// =====================================================================
// 退出信号
// =====================================================================
void Exit(int sig)
{
cout << "Exiting....." << endl;
mainRunning = false;
alertWorkerRunning = false;
g_dispRunning = false;
latestAlertCv.notify_all();
alertcv.notify_all();
g_dispCv.notify_all();
}
// =====================================================================
// 只在 .env 文件发生修改后才重新加载 zones 和镜像配置
// =====================================================================
2026-01-09 13:59:10 +08:00
void ReloadConfigIfChanged()
{
static std::filesystem::file_time_type lastEnvWriteTime{};
static bool first = true;
std::error_code ec;
auto curWriteTime = std::filesystem::last_write_time(filePath, ec);
if (ec)
return;
if (first)
{
first = false;
lastEnvWriteTime = curWriteTime;
LoadZonesFromEnv();
loadMirrerSet();
return;
}
if (curWriteTime <= lastEnvWriteTime)
return;
lastEnvWriteTime = curWriteTime;
LoadZonesFromEnv();
loadMirrerSet();
}
// =====================================================================
// 镜像/翻转
// =====================================================================
2026-01-09 13:59:10 +08:00
void SetMirror(Mat &frame)
{
bool mirror = mediaMirror.load(std::memory_order_relaxed);
bool flipV = mediaFlip.load(std::memory_order_relaxed);
2026-01-09 13:59:10 +08:00
if (mirror && flipV)
cv::flip(frame, frame, -1);
else if (mirror)
cv::flip(frame, frame, 1);
else if (flipV)
cv::flip(frame, frame, 0);
}
// =====================================================================
// 读取配置:翻转/镜像设置
// =====================================================================
2026-01-09 13:59:10 +08:00
void loadMirrerSet()
{
ReadFile rf(filePath);
if (!rf.Open())
{
cerr << "文件打开失败" << endl;
return;
}
auto lines = rf.ReadLines();
rf.Close();
2026-01-09 13:59:10 +08:00
auto getBool = [&](const string &key, bool &out)
{
out = false;
for (auto &line : lines)
{
if (line.rfind(key + "=", 0) == 0)
{
auto val = line.substr(key.size() + 1);
for (auto &c : val)
c = ::tolower(c);
val.erase(remove_if(val.begin(), val.end(), ::isspace), val.end());
if (val == "true")
out = true;
return out;
}
}
return out;
};
2026-01-09 13:59:10 +08:00
bool mirror = false, flip = false;
getBool("MEDIA_MIRROR", mirror);
getBool("MEDIA_FLIP", flip);
mediaMirror.store(mirror);
mediaFlip.store(flip);
2026-01-09 13:59:10 +08:00
}
// =====================================================================
// 检测框底边是否接触 danger 多边形
// ⚡ 不再直接访问全局 handleFrame改为传入尺寸参数消除数据竞争
// =====================================================================
bool bottomTouchesDanger(const Dection &d, const ZoneBox &dangerBox,
int frameW, int frameH)
2026-01-09 13:59:10 +08:00
{
vector<Point> poly;
poly.reserve(4);
for (auto &p : dangerBox.vertices)
poly.emplace_back(static_cast<int>(p.x * frameW),
static_cast<int>(p.y * frameH));
2026-01-09 13:59:10 +08:00
auto toPixX = [&](double v) -> int
{
return (v <= 1.0) ? static_cast<int>(v * frameW) : static_cast<int>(v);
2026-01-09 13:59:10 +08:00
};
auto toPixY = [&](double v) -> int
{
return (v <= 1.0) ? static_cast<int>(v * frameH) : static_cast<int>(v);
2026-01-09 13:59:10 +08:00
};
int x0 = toPixX(d.x);
int y0 = toPixY(d.y);
int wpx = toPixX(d.w);
int hpx = toPixY(d.h);
int x1 = x0 + wpx;
int yb = y0 + hpx;
int samples = max(5, wpx / 20);
for (int i = 0; i <= samples; ++i)
{
int x = x0 + (i * (x1 - x0)) / samples;
double res = pointPolygonTest(poly, Point(x, yb), false);
2026-01-09 13:59:10 +08:00
if (res >= 0)
return true;
}
return false;
}
// =====================================================================
// 从配置文件读取 zones
// =====================================================================
2026-01-09 13:59:10 +08:00
bool LoadZonesFromEnv()
{
ReadFile rf(filePath);
if (!rf.Open())
{
cerr << "文件打开失败: " << filePath << endl;
return false;
}
auto lines = rf.ReadLines();
rf.Close();
auto getDouble = [&](const string &key, double &out)
{
for (auto &line : lines)
{
if (line.rfind(key + "=", 0) == 0)
{
try
{
out = stod(line.substr(key.size() + 1));
return true;
}
catch (...)
{
return false;
}
}
}
return false;
};
auto loadBox = [&](ZoneBox &box, const string &prefix)
{
box.name = prefix;
for (int i = 0; i < 4; ++i)
{
double x = 0.0, y = 0.0;
getDouble(QCL::format("{}_{}_X", prefix, i + 1), x);
getDouble(QCL::format("{}_{}_Y", prefix, i + 1), y);
box.vertices[i] = {x, y};
2026-01-09 13:59:10 +08:00
}
};
loadBox(g_safe, "SAFE");
loadBox(g_warn, "WARN");
loadBox(g_dang, "DANG");
return true;
}
// =====================================================================
// 绘制 zones 多边形
// =====================================================================
2026-01-09 13:59:10 +08:00
void drawZones(Mat &img)
{
auto drawPoly = [&](const ZoneBox &box, const Scalar &color)
{
vector<Point> pts;
pts.reserve(4);
for (auto &p : box.vertices)
pts.emplace_back(static_cast<int>(p.x * img.cols),
static_cast<int>(p.y * img.rows));
2026-01-09 13:59:10 +08:00
polylines(img, pts, true, color, 2);
};
drawPoly(g_safe, Scalar(0, 255, 0));
drawPoly(g_warn, Scalar(0, 255, 255));
drawPoly(g_dang, Scalar(0, 0, 255));
2026-01-09 13:59:10 +08:00
}
// =====================================================================
2026-01-09 13:59:10 +08:00
// 保存图片
// =====================================================================
2026-01-09 13:59:10 +08:00
void saveAlarmImage(const Mat &frame)
{
if (frame.empty())
{
cerr << "报警图片保存跳过: 帧为空" << endl;
return;
}
string fileName = warningPath + "alarm_" + getCurrentTimeStr() + ".jpg";
cout << "imgpath = " << fileName << endl;
if (!imwrite(fileName, frame))
cerr << "图片保存失败" << endl;
}
// =====================================================================
2026-01-09 13:59:10 +08:00
// 保存视频
// ⚡ 改为移动语义传入,避免 deque 再次拷贝
// =====================================================================
2026-01-09 13:59:10 +08:00
void saveAlarmVideo(deque<Mat> bufferSnapshot)
{
if (bufferSnapshot.empty() || bufferSnapshot.front().empty())
{
cerr << "报警视频保存跳过: 缓冲为空" << endl;
return;
}
// ⚡ move 进 lambda彻底避免 deque 拷贝
thread([buf = std::move(bufferSnapshot)]() mutable
2026-01-09 13:59:10 +08:00
{
string fileName = videoPath + "alarm_" + getCurrentTimeStr() + ".mp4";
VideoWriter write;
int codec = write.fourcc('H', '2', '6', '4');
Size size = buf.front().size();
bool color = buf.front().channels() == 3;
if (!write.open(fileName, codec, FPS, size, color))
{
cerr << "视频文件打开失败: " << fileName << endl;
return;
}
for (auto &ii : buf)
if (!ii.empty()) write.write(ii);
write.release(); })
2026-01-09 13:59:10 +08:00
.detach();
}
// =====================================================================
// 获取当前时间字符串
// =====================================================================
2026-01-09 13:59:10 +08:00
string getCurrentTimeStr()
{
auto now = chrono::system_clock::now();
auto time_t_now = chrono::system_clock::to_time_t(now);
stringstream ss;
ss << put_time(localtime(&time_t_now), "%Y%m%d_%H%M%S");
return ss.str();
}
// =====================================================================
// 调用 GPIO 报警输出程序
// =====================================================================
2026-01-09 13:59:10 +08:00
void setGPIOLevel(int level)
{
string cmd = "echo 'orangepi' | sudo -S /home/orangepi/RKApp/GPIOSignal/bin/sendGpioSignal " + to_string(level);
system(cmd.c_str());
}
// =====================================================================
// 低频刷新距离阈值配置到内存缓存
// =====================================================================
2026-01-09 13:59:10 +08:00
static bool RefreshDistanceConfig()
{
ReadFile rf(filePath);
if (!rf.Open())
return false;
auto lines = rf.ReadLines();
rf.Close();
int danger = g_cfg.danger.load();
int warn = g_cfg.warn.load();
int safe = g_cfg.safe.load();
bool opm = g_cfg.outPutMode.load();
for (auto &line : lines)
{
if (line.find("NEAR_THRESHOLD=") != string::npos)
danger = stoi(line.substr(sizeof("NEAR_THRESHOLD=") - 1));
else if (line.find("MID_THRESHOLD=") != string::npos)
warn = stoi(line.substr(sizeof("MID_THRESHOLD=") - 1));
else if (line.find("MAX_DISTANCE=") != string::npos)
safe = stoi(line.substr(sizeof("MAX_DISTANCE=") - 1));
else if (line.find("outPutMode:") != string::npos)
{
string val = line.substr(sizeof("outPutMode:"));
opm = (val == "true");
}
}
g_cfg.danger.store(danger);
g_cfg.warn.store(warn);
g_cfg.safe.store(safe);
g_cfg.outPutMode.store(opm);
return true;
}
// =====================================================================
2026-01-09 13:59:10 +08:00
// 报警线程
// =====================================================================
2026-01-09 13:59:10 +08:00
void warnThread()
{
thread([]()
{
bool isAlarming = false;
2026-01-09 13:59:10 +08:00
auto lastDangerTime = chrono::steady_clock::now();
RefreshDistanceConfig();
int normalLevel = g_cfg.outPutMode.load() ? 0 : 1;
int alarmLevel = g_cfg.outPutMode.load() ? 1 : 0;
setGPIOLevel(normalLevel);
auto lastCfgRefresh = chrono::steady_clock::now();
uint64_t seenSeq = latestAlertSeq.load();
2026-01-09 13:59:10 +08:00
// ⚡ 记录报警时的帧尺寸(避免访问全局 handleFrame
int savedFrameW = 640, savedFrameH = 480;
2026-01-09 13:59:10 +08:00
while (alertWorkerRunning.load())
{
std::unique_lock<std::mutex> lk(latestAlertMutex);
bool gotNew = latestAlertCv.wait_for(
lk, std::chrono::milliseconds(50), [&] {
return !alertWorkerRunning.load() ||
latestAlertSeq.load() != seenSeq;
});
2026-01-09 13:59:10 +08:00
if (!alertWorkerRunning.load()) break;
2026-01-09 13:59:10 +08:00
auto now = chrono::steady_clock::now();
if (now - lastCfgRefresh >= chrono::seconds(1))
{
RefreshDistanceConfig();
lastCfgRefresh = now;
normalLevel = g_cfg.outPutMode.load() ? 0 : 1;
alarmLevel = g_cfg.outPutMode.load() ? 1 : 0;
2026-01-09 13:59:10 +08:00
}
if (!gotNew)
{
if (isAlarming)
{
auto dur = chrono::duration_cast<chrono::milliseconds>(
chrono::steady_clock::now() - lastDangerTime).count();
if (dur >= 2000)
{
isAlarming = false;
setGPIOLevel(normalLevel);
}
}
continue;
}
seenSeq = latestAlertSeq.load();
2026-01-09 13:59:10 +08:00
auto detsOpt = latestAlertDets;
lk.unlock();
if (!detsOpt.has_value()) continue;
2026-01-09 13:59:10 +08:00
bool currentFrameHasDanger = false;
const int dangerTh = g_cfg.danger.load();
2026-01-09 13:59:10 +08:00
// ⚡ 传入尺寸而非访问全局 handleFrame消除数据竞争
2026-01-09 13:59:10 +08:00
for (const auto &d : detsOpt.value())
{
if (d.distance > 0.0 && d.distance <= dangerTh)
{
currentFrameHasDanger = true;
break;
}
if (d.distance == 0.0 &&
bottomTouchesDanger(d, g_dang, savedFrameW, savedFrameH))
2026-01-09 13:59:10 +08:00
{
currentFrameHasDanger = true;
break;
2026-01-09 13:59:10 +08:00
}
}
if (currentFrameHasDanger)
{
lastDangerTime = chrono::steady_clock::now();
if (!isAlarming)
{
isAlarming = true;
setGPIOLevel(alarmLevel);
// ⚡ 报警时才 snapshot且用 move 传入 saveAlarmVideo
2026-01-09 13:59:10 +08:00
{
lock_guard<mutex> lk2(bufferMutex);
Mat framToSave;
2026-01-09 13:59:10 +08:00
deque<Mat> bufferToSave;
if (!g_ringBuffer.empty())
2026-01-09 13:59:10 +08:00
{
framToSave = g_ringBuffer.back().clone();
bufferToSave = g_ringBuffer.snapshot();
2026-01-09 13:59:10 +08:00
}
else if (!handleFrame.empty())
{
framToSave = handleFrame.clone();
}
if (!framToSave.empty())
saveAlarmImage(framToSave);
if (!bufferToSave.empty())
saveAlarmVideo(std::move(bufferToSave)); // ⚡ move
2026-01-09 13:59:10 +08:00
}
}
}
else
{
if (isAlarming)
{
auto dur = chrono::duration_cast<chrono::milliseconds>(
chrono::steady_clock::now() - lastDangerTime).count();
if (dur >= 2000)
{
isAlarming = false;
setGPIOLevel(normalLevel);
}
}
}
} })
.detach();
}
// =====================================================================
2026-01-09 13:59:10 +08:00
// 绘制矩形方框和深度信息
// ⚡ 改为直接接收帧引用和阈值参数,不再每次读全局 dis / 调 GetDistance()
// =====================================================================
void drawRect(Mat &frame, double x, double y, double w, double h,
double distance, int dangerTh, int warnTh)
2026-01-09 13:59:10 +08:00
{
const int W = frame.cols;
const int H = frame.rows;
2026-01-09 13:59:10 +08:00
auto toPixX = [&](double v) -> int
{
return (v <= 1.0) ? static_cast<int>(v * W) : static_cast<int>(v);
2026-01-09 13:59:10 +08:00
};
auto toPixY = [&](double v) -> int
{
return (v <= 1.0) ? static_cast<int>(v * H) : static_cast<int>(v);
2026-01-09 13:59:10 +08:00
};
int px = toPixX(x), py = toPixY(y);
int pw = toPixX(w), ph = toPixY(h);
2026-01-09 13:59:10 +08:00
px = std::max(0, std::min(px, W - 1));
py = std::max(0, std::min(py, H - 1));
pw = std::max(1, std::min(pw, W - px));
ph = std::max(1, std::min(ph, H - py));
2026-01-09 13:59:10 +08:00
Rect r(px, py, pw, ph);
Scalar sca(0, 255, 0);
if (distance > 0.0)
2026-01-09 13:59:10 +08:00
{
if (distance <= dangerTh)
sca = Scalar(0, 0, 255);
else if (distance <= warnTh)
sca = Scalar(0, 255, 255);
2026-01-09 13:59:10 +08:00
}
rectangle(frame, r, sca, 2);
putText(frame, to_string(distance), Point(px, py),
FONT_HERSHEY_SIMPLEX, 0.35, Scalar(0, 0, 0));
2026-01-09 13:59:10 +08:00
}
// =====================================================================
// mqtt 初始化
// =====================================================================
2026-01-09 13:59:10 +08:00
void MqttInit()
{
client.set_connected_handler([](const string &)
2026-01-09 13:59:10 +08:00
{ cout << "连接成功" << endl; });
client.set_message_callback(getMsgCallback);
client.connect()->wait();
client.subscribe(Topic, Qos)->wait();
alertWorkerRunning = true;
warnThread();
}
// =====================================================================
// mqtt 消息回调
// =====================================================================
2026-01-09 13:59:10 +08:00
void getMsgCallback(mqtt::const_message_ptr msg)
{
const std::string payload = msg->to_string();
try
{
auto json = nlohmann::json::parse(payload);
std::vector<Dection> dets;
dets.reserve(json.size());
for (const auto &ii : json)
{
Dection d;
d.x = static_cast<double>(ii.value("x", 0.0));
d.y = static_cast<double>(ii.value("y", 0.0));
d.w = static_cast<double>(ii.value("w", 0.0));
d.h = static_cast<double>(ii.value("h", 0.0));
d.distance = static_cast<double>(ii.value("distance", 0.0));
dets.push_back(d);
}
// ⚡ 绘制用数据也改为 movelatestAlertDets 需要独立副本)
2026-01-09 13:59:10 +08:00
{
lock_guard<mutex> lk(detMutex);
latestDection = dets; // 绘制用保留拷贝
2026-01-09 13:59:10 +08:00
}
{
std::lock_guard<std::mutex> lk(latestAlertMutex);
latestAlertDets = std::move(dets); // 报警用 move
2026-01-09 13:59:10 +08:00
latestAlertSeq.fetch_add(1, std::memory_order_relaxed);
}
latestAlertCv.notify_one();
}
catch (const nlohmann::json::parse_error &e)
{
cerr << "JSON 解析错误: " << e.what() << "\n原始 payload: " << payload << "\n";
}
catch (const std::exception &e)
{
cerr << "处理消息异常: " << e.what() << "\n";
}
}
// =====================================================================
2026-01-09 13:59:10 +08:00
// 摄像头初始化
// =====================================================================
2026-01-09 13:59:10 +08:00
bool videoInit(VideoCapture &cap)
{
if (cap.isOpened())
cap.release();
2026-01-09 13:59:10 +08:00
if (!cap.open("/dev/video10", cv::CAP_V4L2))
{
cerr << "摄像头打开失败:/dev/video10" << endl;
return false;
}
cap.set(CAP_PROP_FRAME_WIDTH, 640);
cap.set(CAP_PROP_FRAME_HEIGHT, 480);
cap.set(CAP_PROP_FPS, 30);
cap.set(CAP_PROP_BUFFERSIZE, 1); // ⚡ 驱动侧只缓存1帧最小化延迟
2026-01-09 13:59:10 +08:00
cap.set(CAP_PROP_FOURCC, VideoWriter::fourcc('M', 'J', 'P', 'G'));
2026-01-09 13:59:10 +08:00
double fccv = cap.get(CAP_PROP_FOURCC);
char fcc[5] = {
(char)((int)fccv & 0xFF),
(char)(((int)fccv >> 8) & 0xFF),
(char)(((int)fccv >> 16) & 0xFF),
(char)(((int)fccv >> 24) & 0xFF),
0};
2026-01-09 13:59:10 +08:00
cout << "摄像头初始化成功 分辨率=" << cap.get(CAP_PROP_FRAME_WIDTH)
<< "x" << cap.get(CAP_PROP_FRAME_HEIGHT)
<< " FPS=" << cap.get(CAP_PROP_FPS)
<< " FOURCC=" << fcc << endl;
return true;
}
// =====================================================================
// FFmpeg 管道初始化
// =====================================================================
2026-01-09 13:59:10 +08:00
FILE *pipeInit()
{
FILE *pipe = popen(
"ffmpeg "
"-nostats -hide_banner -loglevel error "
"-f rawvideo -pixel_format bgr24 -video_size 640x480 -framerate 30 -i - "
"-c:v h264_rkmpp -rc_mode 2 -qp_init 32 -profile:v baseline -g 1 -bf 0 "
"-fflags nobuffer -flags low_delay "
"-rtsp_transport tcp -f rtsp rtsp://127.0.0.1:8554/stream",
"w");
if (!pipe)
{
cerr << "FFmpeg管道打开失败" << endl;
return nullptr;
}
setvbuf(pipe, NULL, _IONBF, 0);
cout << "FFmpeg管道初始化成功" << endl;
return pipe;
}
// =====================================================================
// 处理单帧(优化版)
// =====================================================================
bool processFrame(VideoCapture &cap, FILE *pipe, Mat &frame,
int64 &count, chrono::steady_clock::time_point &t0)
2026-01-09 13:59:10 +08:00
{
// ⚡ 只 grab 一次丢掉驱动缓冲中最旧的一帧read 拿最新帧
// 原来 grab 两次反而多一次内核交互
cap.grab();
if (!cap.retrieve(frame) || frame.empty())
2026-01-09 13:59:10 +08:00
{
cerr << "读取帧失败,重试中..." << endl;
this_thread::sleep_for(10ms); // ⚡ 缩短重试间隔
2026-01-09 13:59:10 +08:00
return true;
}
// ⚡ 颜色根治YUYV 双通道时转 BGRMJPG 已是 BGR 直接跳过
if (frame.channels() == 2)
cv::cvtColor(frame, frame, cv::COLOR_YUV2BGR_YUYV);
handleFrame = frame; // 浅拷贝,无 malloc
// ⚡ 一次性读取阈值,避免每个目标框都读原子变量
const int dangerTh = g_cfg.danger.load(std::memory_order_relaxed);
const int warnTh = g_cfg.warn.load(std::memory_order_relaxed);
// ⚡ move 替代 copy
2026-01-09 13:59:10 +08:00
vector<Dection> destCopy;
{
lock_guard<mutex> lk(detMutex);
destCopy = std::move(latestDection);
2026-01-09 13:59:10 +08:00
latestDection.clear();
}
// ⚡ 直接传帧和阈值,不再访问全局 dis
2026-01-09 13:59:10 +08:00
for (const auto &ii : destCopy)
drawRect(handleFrame, ii.x, ii.y, ii.w, ii.h, ii.distance, dangerTh, warnTh);
2026-01-09 13:59:10 +08:00
// 每 5 秒检查一次配置文件是否变化
2026-01-09 13:59:10 +08:00
static auto lastZonesRefresh = std::chrono::steady_clock::now();
auto now = std::chrono::steady_clock::now();
if (now - lastZonesRefresh >= std::chrono::seconds(5))
{
ReloadConfigIfChanged();
lastZonesRefresh = now;
}
// ⚡ SetMirror 在 clone 之前对 handleFrame 操作
// handleFrame 与 frame 共享数据flip 会修改原始数据
// 这里先 clone 一份再 flip保证 frame 下次仍可用
handleFrame = frame.clone();
2026-01-09 13:59:10 +08:00
SetMirror(handleFrame);
drawZones(handleFrame);
// =====================================================================
// ⚡ 推流:写入双缓冲的"当前写 slot",写完后切换 slot
// =====================================================================
if (pipe && g_pipeRunning.load())
{
int writeIdx = g_pipeWriteIdx.load(std::memory_order_relaxed);
Mat &pipeDst = g_pipeBuf[writeIdx];
if (handleFrame.cols != 640 || handleFrame.rows != 480)
resize(handleFrame, pipeDst, Size(640, 480));
else
handleFrame.copyTo(pipeDst);
// 切换写 slot通知推流线程读另一个
g_pipeWriteIdx.store(1 - writeIdx, std::memory_order_relaxed);
g_pipeSeq.fetch_add(1, std::memory_order_relaxed);
g_pipeCv.notify_one();
}
2026-01-09 13:59:10 +08:00
// =====================================================================
// ⚡ 显示:写入显示双缓冲,切换 slot通知显示线程
// =====================================================================
{
int writeIdx = g_dispWriteIdx.load(std::memory_order_relaxed);
handleFrame.copyTo(g_dispBuf[writeIdx]);
g_dispWriteIdx.store(1 - writeIdx, std::memory_order_relaxed);
g_dispSeq.fetch_add(1, std::memory_order_relaxed);
g_dispCv.notify_one();
}
// ⚡ 环形缓冲区写入
2026-01-09 13:59:10 +08:00
{
lock_guard<mutex> lk(bufferMutex);
g_ringBuffer.push(handleFrame);
2026-01-09 13:59:10 +08:00
}
return true;
}
// =====================================================================
// 主处理循环(优化版)
// ⚡ 主线程只负责采集+处理imshow/waitKey 移至独立显示线程
// =====================================================================
2026-01-09 13:59:10 +08:00
void mainLoop(VideoCapture &cap, FILE *pipe)
{
int64 count = 0;
auto t0 = chrono::steady_clock::now();
Mat frame;
cout << "开始视频处理循环..." << endl;
// 创建全屏窗口(必须在主线程/OpenCV线程中调用
2026-01-09 13:59:10 +08:00
namedWindow("处理后的画面", WINDOW_NORMAL);
setWindowProperty("处理后的画面", WND_PROP_FULLSCREEN, WINDOW_FULLSCREEN);
// 获取屏幕分辨率,用完立即释放
2026-01-09 13:59:10 +08:00
Display *display = XOpenDisplay(nullptr);
int screen = DefaultScreen(display);
int width = DisplayWidth(display, screen);
int height = DisplayHeight(display, screen);
XCloseDisplay(display);
2026-01-09 13:59:10 +08:00
// ⚡ 显示双缓冲预分配
g_dispBuf[0].create(480, 640, CV_8UC3);
g_dispBuf[1].create(480, 640, CV_8UC3);
2026-01-09 13:59:10 +08:00
// ⚡ 启动显示线程imshow/waitKey 不再占用主线程
g_dispRunning = true;
std::thread(displayThread, width, height).detach();
// ⚡ 主线程纯采集+处理,不等待显示,帧率上限由摄像头和处理速度决定
2026-01-09 13:59:10 +08:00
while (mainRunning)
{
if (!processFrame(cap, pipe, frame, count, t0))
break;
}
}
// =====================================================================
2026-01-09 13:59:10 +08:00
// 资源清理
// =====================================================================
2026-01-09 13:59:10 +08:00
void cleanup(FILE *pipe, VideoCapture &cap)
{
g_pipeRunning = false;
g_pipeCv.notify_all();
g_dispRunning = false;
g_dispCv.notify_all();
2026-01-09 13:59:10 +08:00
alertWorkerRunning = false;
latestAlertCv.notify_all();
alertcv.notify_all();
// 给各线程一点时间正常退出
this_thread::sleep_for(100ms);
2026-01-09 13:59:10 +08:00
try
{
client.disconnect()->wait();
}
catch (const std::exception &e)
{
std::cerr << e.what() << '\n';
}
if (pipe)
{
pclose(pipe);
cout << "FFmpeg管道已关闭" << endl;
}
if (cap.isOpened())
{
cap.release();
cout << "摄像头已释放" << endl;
}
destroyAllWindows();
cout << "所有资源已清理完毕" << endl;
}