From b0dc21dec1794682402c89e11f14b7e47a81d828 Mon Sep 17 00:00:00 2001 From: xyj <10908227994@qq.com> Date: Sat, 9 Dec 2023 18:49:22 +0800 Subject: [PATCH] update --- MQTT.py | 1 - data_upload.py | 76 ++++++++++++++++++++++++++++--------------------- data_upload3.py | 43 ++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 33 deletions(-) mode change 100755 => 100644 data_upload.py create mode 100755 data_upload3.py diff --git a/MQTT.py b/MQTT.py index d9b1bf8..933e540 100644 --- a/MQTT.py +++ b/MQTT.py @@ -34,7 +34,6 @@ class MQTTClient: def on_connect(self, client, userdata, flags, rc): print("Connected with result code " + str(rc)) self.client.subscribe(self.topic) - client.subscribe('lot_mqtt') client.publish('success', payload='成功订阅lot_mqtt' + str(time.time()), qos=0) def on_disconnect(self, client, userdata, rc): diff --git a/data_upload.py b/data_upload.py old mode 100755 new mode 100644 index ea89a47..76804a0 --- a/data_upload.py +++ b/data_upload.py @@ -1,43 +1,55 @@ -import os -import subprocess +import json import time + import paho.mqtt.client as mqtt from device import device_name +from tool import * -def on_connect(client, userdata, flags, rc): - global times - if rc == 0: - print("连接成功,执行数据推送和本地存储") - client.publish('success', payload='连接成功,执行数据推送和本地存储', qos=0) - # subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/start_data_upload.sh'], shell=True) - times = 3 +def valid(msg, client): + origin_data = json.loads(msg.payload.decode('utf-8')) + if 'msg' not in origin_data: + client.publish('error', payload='msg must be supplied', qos=0) + return False + if 'device_name' not in origin_data: + client.publish('error', payload='device_name must be supplied', qos=0) + return False + client.publish('error', payload=device_name, qos=0) + if device_name != origin_data['device_name']: + return False + return True -def on_disconnect(client, userdata, rc): - print("失败,执行本地存储") - # subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/stop_data_upload.sh'], shell=True) +class DataUploadClient: + def __init__(self, broker, port, topic): + self.broker = broker + self.port = port + self.topic = topic + self.client = mqtt.Client() + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect -times = 3 + def on_connect(self, client, userdata, flags, rc): + self.client.publish('success', payload='连接成功,执行数据推送和本地存储' + str(time.time()), qos=0) + subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/start_data_upload.sh'], shell=True) -client = mqtt.Client(client_id=device_name) -client.username_pw_set("demo", "123456") -# Specify callback function -client.on_connect = on_connect -client.on_disconnect = on_disconnect + def on_disconnect(self, client, userdata, rc): + print("失败,执行本地存储") + subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/stop_data_upload.sh'], shell=True) -# 尝试连接 MQTT 服务器 -while True: - try: - # ceshi-mqtt.lihaink.cn - client.connect('192.168.1.27', 1883) - client.loop_forever() - except Exception as e: - print("Connection failed:", e) - if times == 0: - # 执行本地存储 - on_disconnect(None, None, None) - print("正在尝试重连") - time.sleep(3) - times -= 1 + def start(self): + self.client.connect(self.broker, self.port) + + +if __name__ == '__main__': + MQTT = DataUploadClient('192.168.1.27', 1883, 'lot_mqtt') + while True: + try: + MQTT.start() + if MQTT.client.is_connected(): + print("连接成功") + MQTT.client.loop_forever() + except: + print("重新连接") + time.sleep(1) diff --git a/data_upload3.py b/data_upload3.py new file mode 100755 index 0000000..ea89a47 --- /dev/null +++ b/data_upload3.py @@ -0,0 +1,43 @@ +import os +import subprocess +import time +import paho.mqtt.client as mqtt + +from device import device_name + + +def on_connect(client, userdata, flags, rc): + global times + if rc == 0: + print("连接成功,执行数据推送和本地存储") + client.publish('success', payload='连接成功,执行数据推送和本地存储', qos=0) + # subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/start_data_upload.sh'], shell=True) + times = 3 + + +def on_disconnect(client, userdata, rc): + print("失败,执行本地存储") + # subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/stop_data_upload.sh'], shell=True) + +times = 3 + +client = mqtt.Client(client_id=device_name) +client.username_pw_set("demo", "123456") +# Specify callback function +client.on_connect = on_connect +client.on_disconnect = on_disconnect + +# 尝试连接 MQTT 服务器 +while True: + try: + # ceshi-mqtt.lihaink.cn + client.connect('192.168.1.27', 1883) + client.loop_forever() + except Exception as e: + print("Connection failed:", e) + if times == 0: + # 执行本地存储 + on_disconnect(None, None, None) + print("正在尝试重连") + time.sleep(3) + times -= 1