import datetime import time import paho.mqtt.client as mqtt from device import device_name from tool import * import configparser config = configparser.ConfigParser() config.read('/home/pi/lot_manager/conf/main/config.conf') def valid(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'msg' not in origin_data: client.publish('error', payload='msg must be supplied', qos=0) return False if 'device_name' not in origin_data: client.publish('error', payload='device_name must be supplied', qos=0) return False if device_name != origin_data['device_name']: return False return True class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker self.port = port self.topic = topic self.username = username self.password = password # 千万不要指定client_id 不然死翘翘 self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message def on_connect(self, client, userdata, flags, rc): if rc == 0: self.client.subscribe(self.topic) client.publish('success', payload='成功订阅lot_mqtt,time=' + str(datetime.datetime.now()), qos=0) def on_message(self, client, userdata, msg): if not valid(msg, client): client.publish('error', payload='验证失败', qos=0) return client.publish('success', payload='验证通过', qos=0) try: origin_data = json.loads(msg.payload.decode('utf-8')) data = origin_data["msg"] if data == "push_stream": # 启动推流视频 push_stream(client) elif data == "close_stream": # 关闭推流视频 close_stream(client) elif data == "exec": # 执行命令 exec_sh(msg, client) elif data == "update": # git更新项目和配置文件 update(client) elif data == "record_list": # 查看录像列表 get_list_record() elif data == "record": # 获取录像 get_record(msg, client) elif data == "status": # 查看运行状态 get_status(client) else: # 错误类型 client.publish('error', payload='No Such Type', qos=0) except Exception as e: pass def start(self): self.client.username_pw_set(self.username, self.password) self.client.connect(self.broker, self.port) if __name__ == '__main__': broker = config.get("broker", "host") # 这里必须是int类型 port = config.getint("broker", "port") topic = config.get("topic", "name") username = config.get("security", "username") password = config.get("security", "password") # print(broker, port, topic, username, password) MQTT = MQTTClient(broker, 1883, port, username, password) while True: try: MQTT.start() MQTT.client.loop_forever() except Exception as e: time.sleep(30)