import os # 文件系统操作 import base64 # Base64编码图片 import paho.mqtt.client as mqtt # MQTT通信 import pymysql # 数据库操作 import time # 时间处理 from datetime import datetime, timedelta # 时间处理 import serial # 串口通信 import re # 正则匹配GPS串口数据 import threading # 多线程上传图片 # ======== 配置区 ======== IMAGE_FOLDER = "/mnt/save/warning" # 图片目录 MQTT_BROKER = "116.147.36.110" MQTT_PORT = 1883 MQTT_TOPIC = "images/topic" DB_CONFIG = { "host": "116.147.36.110", "port": 13306, "user": "root", "password": "Wxit11335577", "database": "mqtt" } SERIAL_PORT = "/dev/ttyUSB3" SERIAL_BAUDRATE = 115200 ICCID_SERIAL_PORT = "/dev/ttyUSB2" ICCID_BAUDRATE = 115200 ICCID_READ_INTERVAL_SEC = 30 # 周期性刷新 ICCID(服务频繁读取) MAX_UPLOADED_SIZE = 100000 # 最大上传记录数 NOW_SIZE = 0 # 当前记录数 LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "upload.log") # ======== 全局变量与线程锁 ======== last_lon, last_lat = None, None # 上次读取到的GPS ser = None # GPS串口对象 iccid_ser = None # ICCID串口对象 serial_lock = threading.Lock() iccid_lock = threading.Lock() uploaded_lock = threading.Lock() uploaded = {} # 保存已上传记录的文件及时间 current_iccid = None # 当前读取到的 SIM ICCID # ======== 工具函数 ======== def write_log(message): """写入日志文件""" with open(LOG_FILE, "a") as logf: logf.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}\n") # def extract_gps_data(serial_line): # """ # 从串口数据中提取经纬度(只处理$GNGLL格式) # 示例数据: $GNGLL,3114.72543,N,12130.62735,E,... # 只做方向转换,不转换度分格式 # 返回 (lon, lat),都为float,带正负号 # """ # match = re.search(r'\$GNGLL,(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line) # if match: # lat = float(match.group(1)) # lat_dir = match.group(2) # lon = float(match.group(3)) # lon_dir = match.group(4) # if lat_dir == 'S': # lat = -lat # if lon_dir == 'W': # lon = -lon # return lon, lat # return None, None def read_iccid(): """ 从 /dev/ttyUSB2 读取 SIM 卡 ICCID,发送 AT+QCCID,带重试、短超时。 返回字符串 ICCID 或 None。 """ global iccid_ser, current_iccid with iccid_lock: try: if iccid_ser is None or not iccid_ser.is_open: iccid_ser = serial.Serial(ICCID_SERIAL_PORT, ICCID_BAUDRATE, timeout=1) # 清空缓冲 iccid_ser.reset_input_buffer() iccid_ser.reset_output_buffer() # 发送指令 iccid_ser.write(b"AT+QCCID\r\n") iccid_ser.flush() # 读取多次,拼接回应 resp = "" for _ in range(10): chunk = iccid_ser.readline().decode(errors="ignore") if chunk: resp += chunk time.sleep(0.1) # 解析 ICCID # 常见返回:\r\n+QCCID: 8986....\r\nOK\r\n 或 +QCCID:xxxxxxxx m = re.search(r"\+QCCID:\s*([0-9A-Za-z]+)", resp) if m: current_iccid = m.group(1).strip() write_log(f"读取ICCID成功: {current_iccid}") return current_iccid # 兜底:匹配19~22位数字 m2 = re.search(r"(\d{19,22})", resp) if m2: current_iccid = m2.group(1) write_log(f"读取ICCID(兜底)成功: {current_iccid}") return current_iccid write_log(f"读取ICCID失败: 无有效回应 -> {resp.strip()}") return None except Exception as e: write_log(f"读取ICCID异常: {e}") return None def iccid_refresh_worker(): """ 后台线程:周期刷新 ICCID,确保服务频繁读取。 """ write_log("ICCID刷新线程启动") while True: try: read_iccid() except Exception as e: write_log(f"ICCID刷新异常: {e}") time.sleep(ICCID_READ_INTERVAL_SEC) def extract_gps_data(serial_line): """ 从串口数据中提取经纬度,支持$GNGLL、$GNRMC、$GNGGA格式 只做方向正负号转换,不转换度分格式 返回 (lon, lat),float,带正负号 """ # 先匹配$GNGLL(你之前的) match = re.search(r'\$GNGLL,(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line) if match: lat = float(match.group(1)) lat_dir = match.group(2) lon = float(match.group(3)) lon_dir = match.group(4) if lat_dir == 'S': lat = -lat if lon_dir == 'W': lon = -lon return lon, lat # 尝试匹配$GNRMC match = re.search(r'\$GNRMC,[^,]*,[AV],(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line) if match: lat = float(match.group(1)) lat_dir = match.group(2) lon = float(match.group(3)) lon_dir = match.group(4) if lat_dir == 'S': lat = -lat if lon_dir == 'W': lon = -lon return lon, lat # 尝试匹配$GNGGA match = re.search(r'\$GNGGA,[^,]*,(\d+\.\d+),(N|S),(\d+\.\d+),(E|W)', serial_line) if match: lat = float(match.group(1)) lat_dir = match.group(2) lon = float(match.group(3)) lon_dir = match.group(4) if lat_dir == 'S': lat = -lat if lon_dir == 'W': lon = -lon return lon, lat return None, None def read_gps_data(): global last_lon, last_lat, ser with serial_lock: try: if ser is None or not ser.is_open: ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE, timeout=1) line = ser.readline().decode('utf-8').strip() if line: # 只对特定句子类型尝试解析并写日志 if line.startswith(('$GNGLL', '$GNRMC', '$GNGGA')): lon, lat = extract_gps_data(line) if lon is not None and lat is not None: last_lon, last_lat = lon, lat return lon, lat else: write_log(f"无效的GPS数据: {line}") else: # 忽略其他NMEA语句 pass except Exception as e: write_log(f"串口读取失败: {e}") return 1201, 3129 # def read_gps_data(): # """ # 从串口读取一行数据并尝试解析GPS经纬度。 # 不成功时返回 (None, None) # """ # global last_lon, last_lat, ser # with serial_lock: # try: # if ser is None or not ser.is_open: # ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE, timeout=1) # line = ser.readline().decode('utf-8').strip() # if line: # lon, lat = extract_gps_data(line) # if lon is not None and lat is not None: # last_lon, last_lat = lon, lat # 更新最新值 # return lon, lat # else: # write_log(f"无效的GPS数据: {line}") # except Exception as e: # write_log(f"串口读取失败: {e}") # return 1201, 3129 def save_to_db(topic, payload, lon, lat, card_no, create_time): """ 将图片、经纬度、ICCID和创建时间写入数据库(card_no, create_time) """ try: db = pymysql.connect(**DB_CONFIG) cursor = db.cursor() sql = "INSERT INTO mqtt_messages (topic, payload, lon, lat, card_no, create_time) VALUES (%s, %s, %s, %s, %s, %s)" cursor.execute(sql, (topic, payload, lon, lat, card_no, create_time)) db.commit() write_log("数据库插入成功") except Exception as e: write_log(f"数据库插入失败: {e}") finally: try: db.close() except: pass def publish_image(image_path): """ 发布图片到MQTT、写入数据库,并附带经纬度、SIM ICCID、创建时间 """ with open(image_path, "rb") as f: img_bytes = f.read() img_b64 = base64.b64encode(img_bytes).decode("utf-8") # MQTT发布 client = mqtt.Client() client.connect(MQTT_BROKER, MQTT_PORT) client.publish(MQTT_TOPIC, img_b64) client.disconnect() # GPS lon, lat = read_gps_data() if lon is None or lat is None: write_log("GPS读取失败,尝试使用上一次位置") lon, lat = last_lon, last_lat # ICCID(若当前为空则尝试即时读取一次) card_no = current_iccid or read_iccid() # 创建时间(ISO格式或数据库期望格式) create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") write_log(f"准备入库: lon={lon}, lat={lat}, card_no={card_no}, create_time={create_time}") save_to_db(MQTT_TOPIC, img_b64, lon, lat, card_no, create_time) def upload_worker(img_path): """上传线程任务,处理一张图片上传""" try: publish_image(img_path) write_log("上传线程结束: " + img_path + ":\t成功!") except Exception as e: write_log(f"上传图片 {img_path} 时出错: {e}") def clean_uploaded(): """ 清理7天前的已上传图片记录,避免内存泄漏 """ with uploaded_lock: expiration_time = datetime.now() - timedelta(days=7) keys_to_remove = [key for key, value in uploaded.items() if value < expiration_time] for key in keys_to_remove: del uploaded[key] if keys_to_remove: write_log(f"清理已上传图片记录,移除 {len(keys_to_remove)} 条记录") # ======== 主循环程序入口 ======== if __name__ == "__main__": # 启动 ICCID 周期刷新线程 threading.Thread(target=iccid_refresh_worker, daemon=True).start() last_clean_time = time.time() try: while True: now = time.time() for filename in os.listdir(IMAGE_FOLDER): if filename.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".gif")): img_path = os.path.join(IMAGE_FOLDER, filename) mtime = os.path.getmtime(img_path) # 最近10秒内新创建,且未上传 if now - mtime <= 10: with uploaded_lock: if img_path in uploaded: continue write_log("开始上传图片: " + img_path) t = threading.Thread(target=upload_worker, args=(img_path,)) t.start() with uploaded_lock: uploaded[img_path] = datetime.now() # 每小时清理一次上传记录 if time.time() - last_clean_time > 3600: clean_uploaded() last_clean_time = time.time() time.sleep(2) except Exception as e: write_log(f"主程序异常: {e}") finally: try: if ser is not None and ser.is_open: ser.close() if iccid_ser is not None and iccid_ser.is_open: iccid_ser.close() except: pass