Files
RkApp/PyApp/Pub.py

342 lines
11 KiB
Python
Raw Normal View History

2025-12-18 14:40:00 +08:00
import os # 文件系统操作
import base64 # Base64编码图片
import paho.mqtt.client as mqtt # MQTT通信
import pymysql # 数据库操作
import time # 时间处理
from datetime import datetime, timedelta # 时间处理
import serial # 串口通信
import re # 正则匹配GPS串口数据
import threading # 多线程上传图片
# ======== 配置区 ========
IMAGE_FOLDER = "/mnt/save/warning" # 图片目录
MQTT_BROKER = "116.147.36.110"
MQTT_PORT = 1883
MQTT_TOPIC = "images/topic"
DB_CONFIG = {
"host": "116.147.36.110",
"port": 13306,
"user": "root",
"password": "Wxit11335577",
"database": "mqtt"
}
SERIAL_PORT = "/dev/ttyUSB3"
SERIAL_BAUDRATE = 115200
ICCID_SERIAL_PORT = "/dev/ttyUSB2"
ICCID_BAUDRATE = 115200
ICCID_READ_INTERVAL_SEC = 30 # 周期性刷新 ICCID服务频繁读取
MAX_UPLOADED_SIZE = 100000 # 最大上传记录数
NOW_SIZE = 0 # 当前记录数
LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "upload.log")
# ======== 全局变量与线程锁 ========
last_lon, last_lat = None, None # 上次读取到的GPS
ser = None # GPS串口对象
iccid_ser = None # ICCID串口对象
serial_lock = threading.Lock()
iccid_lock = threading.Lock()
uploaded_lock = threading.Lock()
uploaded = {} # 保存已上传记录的文件及时间
current_iccid = None # 当前读取到的 SIM ICCID
# ======== 工具函数 ========
def write_log(message):
"""写入日志文件"""
with open(LOG_FILE, "a") as logf:
logf.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}\n")
# def extract_gps_data(serial_line):
# """
# 从串口数据中提取经纬度(只处理$GNGLL格式
# 示例数据: $GNGLL,3114.72543,N,12130.62735,E,...
# 只做方向转换,不转换度分格式
# 返回 (lon, lat)都为float带正负号
# """
# match = re.search(r'\$GNGLL,(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line)
# if match:
# lat = float(match.group(1))
# lat_dir = match.group(2)
# lon = float(match.group(3))
# lon_dir = match.group(4)
# if lat_dir == 'S':
# lat = -lat
# if lon_dir == 'W':
# lon = -lon
# return lon, lat
# return None, None
def read_iccid():
"""
/dev/ttyUSB2 读取 SIM ICCID发送 AT+QCCID带重试短超时
返回字符串 ICCID None
"""
global iccid_ser, current_iccid
with iccid_lock:
try:
if iccid_ser is None or not iccid_ser.is_open:
iccid_ser = serial.Serial(ICCID_SERIAL_PORT, ICCID_BAUDRATE, timeout=1)
# 清空缓冲
iccid_ser.reset_input_buffer()
iccid_ser.reset_output_buffer()
# 发送指令
iccid_ser.write(b"AT+QCCID\r\n")
iccid_ser.flush()
# 读取多次,拼接回应
resp = ""
for _ in range(10):
chunk = iccid_ser.readline().decode(errors="ignore")
if chunk:
resp += chunk
time.sleep(0.1)
# 解析 ICCID
# 常见返回:\r\n+QCCID: 8986....\r\nOK\r\n 或 +QCCID:xxxxxxxx
m = re.search(r"\+QCCID:\s*([0-9A-Za-z]+)", resp)
if m:
current_iccid = m.group(1).strip()
write_log(f"读取ICCID成功: {current_iccid}")
return current_iccid
# 兜底匹配19~22位数字
m2 = re.search(r"(\d{19,22})", resp)
if m2:
current_iccid = m2.group(1)
write_log(f"读取ICCID(兜底)成功: {current_iccid}")
return current_iccid
write_log(f"读取ICCID失败: 无有效回应 -> {resp.strip()}")
return None
except Exception as e:
write_log(f"读取ICCID异常: {e}")
return None
def iccid_refresh_worker():
"""
后台线程周期刷新 ICCID确保服务频繁读取
"""
write_log("ICCID刷新线程启动")
while True:
try:
read_iccid()
except Exception as e:
write_log(f"ICCID刷新异常: {e}")
time.sleep(ICCID_READ_INTERVAL_SEC)
def extract_gps_data(serial_line):
"""
从串口数据中提取经纬度支持$GNGLL$GNRMC$GNGGA格式
只做方向正负号转换不转换度分格式
返回 (lon, lat)float带正负号
"""
# 先匹配$GNGLL你之前的
match = re.search(r'\$GNGLL,(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line)
if match:
lat = float(match.group(1))
lat_dir = match.group(2)
lon = float(match.group(3))
lon_dir = match.group(4)
if lat_dir == 'S':
lat = -lat
if lon_dir == 'W':
lon = -lon
return lon, lat
# 尝试匹配$GNRMC
match = re.search(r'\$GNRMC,[^,]*,[AV],(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line)
if match:
lat = float(match.group(1))
lat_dir = match.group(2)
lon = float(match.group(3))
lon_dir = match.group(4)
if lat_dir == 'S':
lat = -lat
if lon_dir == 'W':
lon = -lon
return lon, lat
# 尝试匹配$GNGGA
match = re.search(r'\$GNGGA,[^,]*,(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line)
if match:
lat = float(match.group(1))
lat_dir = match.group(2)
lon = float(match.group(3))
lon_dir = match.group(4)
if lat_dir == 'S':
lat = -lat
if lon_dir == 'W':
lon = -lon
return lon, lat
return None, None
def read_gps_data():
global last_lon, last_lat, ser
with serial_lock:
try:
if ser is None or not ser.is_open:
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE, timeout=1)
line = ser.readline().decode('utf-8').strip()
if line:
# 只对特定句子类型尝试解析并写日志
if line.startswith(('$GNGLL', '$GNRMC', '$GNGGA')):
lon, lat = extract_gps_data(line)
if lon is not None and lat is not None:
last_lon, last_lat = lon, lat
return lon, lat
else:
write_log(f"无效的GPS数据: {line}")
else:
# 忽略其他NMEA语句
pass
except Exception as e:
write_log(f"串口读取失败: {e}")
return 1201, 3129
# def read_gps_data():
# """
# 从串口读取一行数据并尝试解析GPS经纬度。
# 不成功时返回 (None, None)
# """
# global last_lon, last_lat, ser
# with serial_lock:
# try:
# if ser is None or not ser.is_open:
# ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE, timeout=1)
# line = ser.readline().decode('utf-8').strip()
# if line:
# lon, lat = extract_gps_data(line)
# if lon is not None and lat is not None:
# last_lon, last_lat = lon, lat # 更新最新值
# return lon, lat
# else:
# write_log(f"无效的GPS数据: {line}")
# except Exception as e:
# write_log(f"串口读取失败: {e}")
# return 1201, 3129
def save_to_db(topic, payload, lon, lat, card_no, create_time):
"""
将图片经纬度ICCID和创建时间写入数据库card_no, create_time
"""
try:
db = pymysql.connect(**DB_CONFIG)
cursor = db.cursor()
sql = "INSERT INTO mqtt_messages (topic, payload, lon, lat, card_no, create_time) VALUES (%s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (topic, payload, lon, lat, card_no, create_time))
db.commit()
write_log("数据库插入成功")
except Exception as e:
write_log(f"数据库插入失败: {e}")
finally:
try:
db.close()
except:
pass
def publish_image(image_path):
"""
发布图片到MQTT写入数据库并附带经纬度SIM ICCID创建时间
"""
with open(image_path, "rb") as f:
img_bytes = f.read()
img_b64 = base64.b64encode(img_bytes).decode("utf-8")
# MQTT发布
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT)
client.publish(MQTT_TOPIC, img_b64)
client.disconnect()
# GPS
lon, lat = read_gps_data()
if lon is None or lat is None:
write_log("GPS读取失败尝试使用上一次位置")
lon, lat = last_lon, last_lat
# ICCID若当前为空则尝试即时读取一次
card_no = current_iccid or read_iccid()
# 创建时间ISO格式或数据库期望格式
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
write_log(f"准备入库: lon={lon}, lat={lat}, card_no={card_no}, create_time={create_time}")
save_to_db(MQTT_TOPIC, img_b64, lon, lat, card_no, create_time)
def upload_worker(img_path):
"""上传线程任务,处理一张图片上传"""
try:
publish_image(img_path)
write_log("上传线程结束: " + img_path + ":\t成功!")
except Exception as e:
write_log(f"上传图片 {img_path} 时出错: {e}")
def clean_uploaded():
"""
清理7天前的已上传图片记录避免内存泄漏
"""
with uploaded_lock:
expiration_time = datetime.now() - timedelta(days=7)
keys_to_remove = [key for key, value in uploaded.items() if value < expiration_time]
for key in keys_to_remove:
del uploaded[key]
if keys_to_remove:
write_log(f"清理已上传图片记录,移除 {len(keys_to_remove)} 条记录")
# ======== 主循环程序入口 ========
if __name__ == "__main__":
# 启动 ICCID 周期刷新线程
threading.Thread(target=iccid_refresh_worker, daemon=True).start()
last_clean_time = time.time()
try:
while True:
now = time.time()
for filename in os.listdir(IMAGE_FOLDER):
if filename.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".gif")):
img_path = os.path.join(IMAGE_FOLDER, filename)
mtime = os.path.getmtime(img_path)
# 最近10秒内新创建且未上传
if now - mtime <= 10:
with uploaded_lock:
if img_path in uploaded:
continue
write_log("开始上传图片: " + img_path)
t = threading.Thread(target=upload_worker, args=(img_path,))
t.start()
with uploaded_lock:
uploaded[img_path] = datetime.now()
# 每小时清理一次上传记录
if time.time() - last_clean_time > 3600:
clean_uploaded()
last_clean_time = time.time()
time.sleep(2)
except Exception as e:
write_log(f"主程序异常: {e}")
finally:
try:
if ser is not None and ser.is_open:
ser.close()
if iccid_ser is not None and iccid_ser.is_open:
iccid_ser.close()
except:
pass