mqtt
This commit is contained in:
@@ -63,6 +63,19 @@ namespace QCL
|
||||
*/
|
||||
std::vector<int> getClientSockets();
|
||||
|
||||
/**
|
||||
* @brief 从服务器的客户端列表中移除并关闭一个客户端socket
|
||||
* @param clientSock 客户端Socket描述符
|
||||
*/
|
||||
void removeClient(int clientSock);
|
||||
|
||||
/**
|
||||
* @brief 非阻塞探测客户端是否已断开(不消耗数据)
|
||||
* @param clientSock 客户端Socket描述符
|
||||
* @return true 已断开或发生致命错误;false 仍然存活或暂无数据
|
||||
*/
|
||||
bool isClientDisconnected(int clientSock);
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief 监听并接受新的客户端连接(运行在独立线程中)
|
||||
@@ -196,6 +209,8 @@ namespace QCL
|
||||
*/
|
||||
bool overwriteAtPos(const std::string &content, size_t pos, size_t length);
|
||||
|
||||
void close();
|
||||
|
||||
private:
|
||||
std::string filePath_; ///< 文件路径
|
||||
std::mutex writeMutex_; ///< 线程锁,保证多线程写入安全
|
||||
|
||||
@@ -181,6 +181,35 @@ namespace QCL
|
||||
return clientSockets_;
|
||||
}
|
||||
|
||||
void TcpServer::removeClient(int clientSock)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(clientsMutex_);
|
||||
for (auto it = clientSockets_.begin(); it != clientSockets_.end(); ++it)
|
||||
{
|
||||
if (*it == clientSock)
|
||||
{
|
||||
close(*it);
|
||||
clientSockets_.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool TcpServer::isClientDisconnected(int clientSock)
|
||||
{
|
||||
char tmp;
|
||||
ssize_t n = recv(clientSock, &tmp, 1, MSG_PEEK | MSG_DONTWAIT);
|
||||
if (n == 0)
|
||||
return true; // 对端有序关闭
|
||||
if (n < 0)
|
||||
{
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||
return false; // 只是暂时无数据
|
||||
return true; // 其它错误视为断开
|
||||
}
|
||||
return false; // 有数据可读
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 获取连接客户端的IP和端口
|
||||
* @param clientSock 客户端Socket描述符
|
||||
|
||||
Binary file not shown.
@@ -10,6 +10,8 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <mqtt/async_client.h>
|
||||
|
||||
#include <boost/process.hpp>
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
@@ -34,7 +36,12 @@ string filepath = "/home/orangepi/InitAuth/conf/.env";
|
||||
string cameraPath = "/opt/rknn-yolov11/.env";
|
||||
string passwd = "/home/orangepi/InitAuth/pwd/.env";
|
||||
|
||||
string url = "http://116.147.36.110:8095/device/validateDevice";
|
||||
// 云端Web认证接口
|
||||
const string url = "http://116.147.36.110:8095/device/validateDevice";
|
||||
const string mqtt_url = "tcp://192.168.12.1:1883";
|
||||
const string clientId = "RK3588_SubTest";
|
||||
const string Topic = "/Test";
|
||||
const int Qos = 1;
|
||||
|
||||
std::atomic<bool> isRunning(true); // 全局运行标志
|
||||
std::atomic<bool> confirm(true); // 发送卡和ID
|
||||
@@ -54,6 +61,8 @@ struct OutSignal
|
||||
bool inPutMode; // 触发输入模式 true--高电平,false -- 低电平
|
||||
} algor{};
|
||||
|
||||
mqtt::async_client client(mqtt_url, clientId);
|
||||
|
||||
// 确认是否已经进行过认证
|
||||
bool ConfirmInit();
|
||||
|
||||
@@ -94,6 +103,12 @@ void StartNet();
|
||||
// 开启服务
|
||||
void StartService();
|
||||
|
||||
// mqtt初始化
|
||||
void mqttInit();
|
||||
|
||||
// 接收消息回调
|
||||
void getMsgCallback(mqtt::const_message_ptr msg);
|
||||
|
||||
/*
|
||||
parames:
|
||||
argv[1] - SSID of the WiFi network to connect to
|
||||
@@ -110,6 +125,14 @@ int main(int argc, char *argv[])
|
||||
blockAllSignals();
|
||||
signal(SIGINT, Exit); // 捕获Ctrl+C信号
|
||||
|
||||
// 初始化mqtt服务器
|
||||
mqttInit();
|
||||
|
||||
while (1)
|
||||
{
|
||||
this_thread::sleep_for(chrono::seconds(1));
|
||||
}
|
||||
|
||||
// 开启服务器
|
||||
MyServer = new TcpServer(8848);
|
||||
|
||||
@@ -149,6 +172,27 @@ int main(int argc, char *argv[])
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mqttInit()
|
||||
{
|
||||
|
||||
client.set_connected_handler([](const string &cause)
|
||||
{
|
||||
cout << cause << endl;
|
||||
cout << "Connected Successed!\n"; });
|
||||
client.set_message_callback(getMsgCallback);
|
||||
|
||||
// 连接服务器
|
||||
client.connect()->wait();
|
||||
client.subscribe(Topic, Qos)->wait();
|
||||
}
|
||||
|
||||
// 接收消息回调
|
||||
void getMsgCallback(mqtt::const_message_ptr msg)
|
||||
{
|
||||
cout << "收到消息" << endl;
|
||||
cout << "recv:" << msg->to_string() << endl;
|
||||
}
|
||||
|
||||
// 开启服务
|
||||
void StartService()
|
||||
{
|
||||
@@ -263,9 +307,11 @@ void ReceiveData()
|
||||
continue;
|
||||
}
|
||||
int index = client.size();
|
||||
for (int ii = 0; ii < index; ii++)
|
||||
// cout << "index: " << index << endl;
|
||||
for (int ii = 0; ii < index; ++ii) // 从新到旧,优先处理新连接
|
||||
{
|
||||
buffer = MyServer->receiveFromClient(client[ii], false); // 非阻塞模式接受数据
|
||||
// cout << buffer << endl;
|
||||
if (buffer.empty() == false)
|
||||
{
|
||||
cout << "已收到" << buffer << endl;
|
||||
@@ -313,14 +359,15 @@ void ReceiveData()
|
||||
double safe = toDouble(safe_json);
|
||||
|
||||
// 整文件读取,逐行替换,保持注释不变
|
||||
ReadFile rf(cameraPath);
|
||||
if (!rf.Open())
|
||||
ReadFile *rf = new ReadFile(cameraPath);
|
||||
if (!rf->Open())
|
||||
{
|
||||
cerr << "文件打开失败: " << cameraPath << "\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
auto lines = rf.ReadLines();
|
||||
auto lines = rf->ReadLines();
|
||||
delete rf;
|
||||
for (auto &line : lines)
|
||||
{
|
||||
if (line.rfind("NEAR_THRESHOLD=", 0) == 0)
|
||||
@@ -346,32 +393,90 @@ void ReceiveData()
|
||||
if (i + 1 < lines.size())
|
||||
out += "\n";
|
||||
}
|
||||
WriteFile wf(cameraPath);
|
||||
wf.overwriteText(out);
|
||||
WriteFile *wf = new WriteFile(cameraPath);
|
||||
wf->overwriteText(out);
|
||||
delete wf;
|
||||
}
|
||||
}
|
||||
else if (buffer.find("media") != string::npos)
|
||||
{
|
||||
// 写摄像头参数(与距离相同方式: 整体读取-逐行替换-整体写回)
|
||||
cout << buffer << endl;
|
||||
CalculateInfo(media, buffer);
|
||||
ReadFile *rf = new ReadFile(cameraPath);
|
||||
if (!rf->Open())
|
||||
{
|
||||
cerr << "文件打开失败: " << cameraPath << "\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
// 写摄像头画面设置
|
||||
string conf = "";
|
||||
cout << buffer << endl;
|
||||
if (buffer.find("media") != string::npos)
|
||||
{ // 写摄像头参数
|
||||
cout << buffer << endl;
|
||||
WriteFile cam(cameraPath);
|
||||
CalculateInfo(media, buffer);
|
||||
conf = format("\nMEDIA_MIRROR={}\nMEDIA_FLIP={}\nMEDIA_OCCLUSION={}\n", media.mirror == 1 ? "true" : "false", media.flip == 1 ? "true" : "false", media.occlusion == 1 ? "true" : "false");
|
||||
cout << conf;
|
||||
cam.writeAfterPatternOrAppend("***---***", conf);
|
||||
}
|
||||
if (buffer.find("algorithm") != string::npos)
|
||||
auto lines = rf->ReadLines();
|
||||
delete rf;
|
||||
for (auto &line : lines)
|
||||
{
|
||||
// 写输入输出参数
|
||||
WriteFile mode(filepath);
|
||||
int pos = mode.countBytesPattern("***---***", true);
|
||||
if (line.rfind("MEDIA_MIRROR=", 0) == 0)
|
||||
{
|
||||
line = format("MEDIA_MIRROR={}", media.mirror ? "true" : "false");
|
||||
}
|
||||
else if (line.rfind("MEDIA_FLIP=", 0) == 0)
|
||||
{
|
||||
line = format("MEDIA_FLIP={}", media.flip ? "true" : "false");
|
||||
}
|
||||
else if (line.rfind("MEDIA_OCCLUSION=", 0) == 0)
|
||||
{
|
||||
line = format("MEDIA_OCCLUSION={}", media.occlusion ? "true" : "false");
|
||||
}
|
||||
}
|
||||
|
||||
string out;
|
||||
out.reserve(4096);
|
||||
for (size_t i = 0; i < lines.size(); ++i)
|
||||
{
|
||||
out += lines[i];
|
||||
if (i + 1 < lines.size())
|
||||
out += "\n";
|
||||
}
|
||||
WriteFile *wf = new WriteFile(cameraPath);
|
||||
wf->overwriteText(out);
|
||||
delete wf;
|
||||
}
|
||||
}
|
||||
else if (buffer.find("algorithm") != string::npos)
|
||||
{
|
||||
// 写输入输出参数(与距离相同方式: 整体读取-逐行替换-整体写回)
|
||||
CalculateInfo(algor, buffer);
|
||||
conf = format("\noutPutMode:{}\ninPutMode:{}", algor.outPutMode == 1 ? "true" : "false", algor.inPutMode == 1 ? "true" : "false");
|
||||
mode.writeAfterPatternOrAppend("***---***", conf);
|
||||
ReadFile *rf2 = new ReadFile(filepath);
|
||||
if (!rf2->Open())
|
||||
{
|
||||
cerr << "文件打开失败: " << filepath << "\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
auto lines = rf2->ReadLines();
|
||||
delete rf2;
|
||||
for (auto &line : lines)
|
||||
{
|
||||
if (line.rfind("outPutMode:", 0) == 0)
|
||||
{
|
||||
line = format("outPutMode:{}", algor.outPutMode ? "true" : "false");
|
||||
}
|
||||
else if (line.rfind("inPutMode:", 0) == 0)
|
||||
{
|
||||
line = format("inPutMode:{}", algor.inPutMode ? "true" : "false");
|
||||
}
|
||||
}
|
||||
|
||||
string out;
|
||||
out.reserve(4096);
|
||||
for (size_t i = 0; i < lines.size(); ++i)
|
||||
{
|
||||
out += lines[i];
|
||||
if (i + 1 < lines.size())
|
||||
out += "\n";
|
||||
}
|
||||
WriteFile *wf2 = new WriteFile(filepath);
|
||||
wf2->overwriteText(out);
|
||||
delete wf2;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -390,7 +495,8 @@ void OpenRTSP()
|
||||
// 视频推流
|
||||
void VideoStream()
|
||||
{
|
||||
string commnd = "ffmpeg -f v4l2 -i /dev/video0 -c:v h264_rkmpp -rtsp_transport tcp -f rtsp rtsp://192.168.12.1:8554/stream";
|
||||
// 静音ffmpeg的统计输出,保留错误;避免污染日志
|
||||
string commnd = "ffmpeg -nostats -hide_banner -loglevel error -f v4l2 -i /dev/video0 -c:v h264_rkmpp -rtsp_transport tcp -f rtsp rtsp://192.168.12.1:8554/stream 2>/dev/null";
|
||||
video_proc = bp::child("/bin/bash", bp::args = {"-c", commnd});
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
all:wifi
|
||||
|
||||
wifi:main.cpp
|
||||
g++ -g -o wifi main.cpp /home/orangepi/RKApp/ApCreate/NetraLib/src/Netra.cpp /home/orangepi/RKApp/ApCreate/NetraLib/src/encrypt.cpp /home/orangepi/RKApp/ApCreate/NetraLib/src/NetRequest.cpp -lpthread -I/home/orangepi/RKApp/ApCreate/NetraLib/include
|
||||
g++ -g -o wifi main.cpp /home/orangepi/RKApp/ApCreate/NetraLib/src/Netra.cpp /home/orangepi/RKApp/ApCreate/NetraLib/src/encrypt.cpp /home/orangepi/RKApp/ApCreate/NetraLib/src/NetRequest.cpp -I/home/orangepi/RKApp/ApCreate/NetraLib/include -lpaho-mqttpp3 -lpaho-mqtt3a -lpthread
|
||||
mv ./wifi ../bin/wifi
|
||||
|
||||
clean:
|
||||
|
||||
Reference in New Issue
Block a user