diff --git a/MQTT.py b/MQTT.py old mode 100755 new mode 100644 index f992039..d9b1bf8 --- a/MQTT.py +++ b/MQTT.py @@ -1,17 +1,10 @@ import json -import os import time + import paho.mqtt.client as mqtt from device import device_name -from tool import push_stream, close_stream, update, exec_sh, get_record, get_list_record, get_status - - -def on_connect(client, userdata, flags, rc): - if rc == 0: - print("成功订阅") - client.subscribe('lot_mqtt') - client.publish('success', payload='成功订阅', qos=0) +from tool import * def valid(msg, client): @@ -28,56 +21,73 @@ def valid(msg, client): return True -# Message receiving callback -def on_message(client, userdata, msg): - if not valid(msg, client): - client.publish('error', payload='验证失败', qos=0) - return - client.publish('success', payload='验证通过', qos=0) - try: - origin_data = json.loads(msg.payload.decode('utf-8')) - data = origin_data["msg"] - if data == "push_stream": - # 启动推流视频 - push_stream(client) - elif data == "close_stream": - # 关闭推流视频 - close_stream(client) - elif data == "exec": - # 执行命令 - exec_sh(msg, client) - elif data == "update": - # git更新项目和配置文件 - update(client) - elif data == "record_list": - # 查看录像列表 - get_list_record() - elif data == "record": - # 获取录像 - get_record(msg, client) - elif data == "status": - # 查看运行状态 - get_status(client) - else: - # 错误类型 - client.publish('error', payload='No Such Type', qos=0) - except Exception as e: - pass +class MQTTClient: + def __init__(self, broker, port, topic): + self.broker = broker + self.port = port + self.topic = topic + self.client = mqtt.Client() + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.on_message = self.on_message + + def on_connect(self, client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + self.client.subscribe(self.topic) + client.subscribe('lot_mqtt') + client.publish('success', payload='成功订阅lot_mqtt' + str(time.time()), qos=0) + + def on_disconnect(self, client, userdata, rc): + print("Disconnected with code " + str(rc)) + + def on_message(self, client, userdata, msg): + if not valid(msg, client): + client.publish('error', payload='验证失败', qos=0) + return + client.publish('success', payload='验证通过', qos=0) + try: + origin_data = json.loads(msg.payload.decode('utf-8')) + data = origin_data["msg"] + if data == "push_stream": + # 启动推流视频 + push_stream(client) + elif data == "close_stream": + # 关闭推流视频 + close_stream(client) + elif data == "exec": + # 执行命令 + exec_sh(msg, client) + elif data == "update": + # git更新项目和配置文件 + update(client) + elif data == "record_list": + # 查看录像列表 + get_list_record() + elif data == "record": + # 获取录像 + get_record(msg, client) + elif data == "status": + # 查看运行状态 + get_status(client) + else: + # 错误类型 + client.publish('error', payload='No Such Type', qos=0) + except Exception as e: + pass + + def start(self): + self.client.connect(self.broker, self.port) -times = 120 if __name__ == '__main__': - client = mqtt.Client(client_id=device_name) - client.username_pw_set("demo", "123456") - # Specify callback function - client.on_connect = on_connect - # 尝试连接 MQTT 服务器 + MQTT = MQTTClient('192.168.1.27', 1883, 'lot_mqtt') while True: try: - # ceshi-mqtt.lihaink.cn - client.connect('192.168.1.27', 1883) - client.loop_forever() - except Exception as e: - print("Connection failed:", e) - time.sleep(10) - print("正在尝试重连") + MQTT.start() + if MQTT.client.is_connected(): + print("连接成功") + MQTT.client.loop_forever() + except: + print("重新连接") + time.sleep(1) + diff --git a/conf/common/mqtt.conf b/conf/common/mqtt.conf index 21ba091..e2d50ef 100644 --- a/conf/common/mqtt.conf +++ b/conf/common/mqtt.conf @@ -1,6 +1,6 @@ [program:mqtt] directory=/home/pi/lot_manager -command=/usr/bin/python test.py +command=/usr/bin/python MQTT.py user=pi autostart=true autorestart=true diff --git a/test.py b/test.py deleted file mode 100644 index a9e1e62..0000000 --- a/test.py +++ /dev/null @@ -1,94 +0,0 @@ -import json -import time - -import paho.mqtt.client as mqtt - -from device import device_name -from tool import * - - -def valid(msg, client): - origin_data = json.loads(msg.payload.decode('utf-8')) - if 'msg' not in origin_data: - client.publish('error', payload='msg must be supplied', qos=0) - return False - if 'device_name' not in origin_data: - client.publish('error', payload='device_name must be supplied', qos=0) - return False - client.publish('error', payload=device_name, qos=0) - if device_name != origin_data['device_name']: - return False - return True - - -class MQTTClient: - def __init__(self, broker, port, topic): - self.broker = broker - self.port = port - self.topic = topic - self.client = mqtt.Client() - self.client.on_connect = self.on_connect - self.client.on_disconnect = self.on_disconnect - self.client.on_message = self.on_message - - def on_connect(self, client, userdata, flags, rc): - print("Connected with result code " + str(rc)) - self.client.subscribe(self.topic) - client.subscribe('lot_mqtt') - client.publish('success', payload='成功订阅lot_mqtt' + str(time.time()), qos=0) - - def on_disconnect(self, client, userdata, rc): - print("Disconnected with code " + str(rc)) - - def on_message(self, client, userdata, msg): - if not valid(msg, client): - client.publish('error', payload='验证失败', qos=0) - return - client.publish('success', payload='验证通过', qos=0) - try: - origin_data = json.loads(msg.payload.decode('utf-8')) - data = origin_data["msg"] - if data == "push_stream": - # 启动推流视频 - push_stream(client) - elif data == "close_stream": - # 关闭推流视频 - close_stream(client) - elif data == "exec": - # 执行命令 - exec_sh(msg, client) - elif data == "update": - # git更新项目和配置文件 - update(client) - elif data == "record_list": - # 查看录像列表 - get_list_record() - elif data == "record": - # 获取录像 - get_record(msg, client) - elif data == "status": - # 查看运行状态 - get_status(client) - else: - # 错误类型 - client.publish('error', payload='No Such Type', qos=0) - except Exception as e: - pass - - def start(self): - self.client.connect(self.broker, self.port) - # self.client.loop_start() - - -if __name__ == '__main__': - MQTT = MQTTClient('192.168.1.27', 1883, 'lot_mqtt') - while True: - try: - MQTT.start() - if MQTT.client.is_connected(): - print("连接成功") - MQTT.client.loop_forever() - except: - print("重新连接") - time.sleep(1) -