import time import paho.mqtt.client as mqtt from device import device_name from tool import * def valid(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'msg' not in origin_data: client.publish('error', payload='msg must be supplied', qos=0) return False if 'device_name' not in origin_data: client.publish('error', payload='device_name must be supplied', qos=0) return False if device_name != origin_data['device_name']: return False return True class MQTTClient: def __init__(self, broker, port, topic): self.broker = broker self.port = port self.topic = topic self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message def on_connect(self, client, userdata, flags, rc): self.client.subscribe(self.topic) client.publish('success', payload='成功订阅lot_mqtt' + str(time.time()), qos=0) def on_message(self, client, userdata, msg): if not valid(msg, client): client.publish('error', payload='验证失败', qos=0) return client.publish('success', payload='验证通过', qos=0) try: origin_data = json.loads(msg.payload.decode('utf-8')) data = origin_data["msg"] if data == "push_stream": # 启动推流视频 push_stream(client) elif data == "close_stream": # 关闭推流视频 close_stream(client) elif data == "exec": # 执行命令 exec_sh(msg, client) elif data == "update": # git更新项目和配置文件 update(client) elif data == "record_list": # 查看录像列表 get_list_record() elif data == "record": # 获取录像 get_record(msg, client) elif data == "status": # 查看运行状态 get_status(client) else: # 错误类型 client.publish('error', payload='No Such Type', qos=0) except Exception as e: pass def start(self): self.client.connect(self.broker, self.port) if __name__ == '__main__': MQTT = MQTTClient('192.168.1.27', 1883, 'lot_mqtt') while True: try: MQTT.start() MQTT.client.loop_forever() except: time.sleep(30)