import datetime import time import paho.mqtt.client as mqtt from tool import * from config import broker, port, subscribe_topic, publish_topic, username, password def valid(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'msg' not in origin_data: client.publish('error', payload='msg must be supplied', qos=0) return False return True class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker self.port = port self.topic = topic self.username = username self.password = password # 千万不要指定client_id 不然死翘翘 self.client = mqtt.Client() self.client.username_pw_set(self.username, self.password) self.client.on_connect = self.on_connect self.client.on_message = self.on_message def on_connect(self, client, userdata, flags, rc): if rc == 0: self.client.subscribe(self.topic) client.publish('success', payload='成功订阅' + 'device_name' + ',time=' + str(datetime.datetime.now()), qos=0) def on_message(self, client, userdata, msg): if not valid(msg, client): client.publish('error', payload='验证失败', qos=0) return client.publish('success', payload='验证通过', qos=0) try: origin_data = json.loads(msg.payload.decode('utf-8')) data = origin_data["msg"] if data == "push_stream": # 启动推流视频 push_stream(client) elif data == "close_stream": # 关闭推流视频 close_stream(client) elif data == "exec": # 执行命令 exec_sh(msg, client) elif data == "update": # git更新项目和配置文件 update(client) elif data == "record_list": # 查看录像列表 get_list_record() elif data == "record": # 获取录像 get_record(msg, client) elif data == "status": # 查看运行状态 get_status(client) else: # 错误类型 client.publish('error', payload='No Such Type', qos=0) except Exception as e: pass def start(self): self.client.connect(self.broker, self.port) if __name__ == '__main__': print(broker, port, subscribe_topic, publish_topic, username, password) # MQTT客户端 MQTT = MQTTClient(broker, port, subscribe_topic, username, password) # 循环连接 while True: try: MQTT.start() # 阻塞监听 MQTT.client.loop_forever() except Exception as e: print(e) # 异常等待时间在进行连接 time.sleep(30)