import datetime import time import paho.mqtt.client as mqtt from device import device_name from tool import * import configparser config = configparser.ConfigParser() config.read('/home/pi/lot_manager/conf/main/config.conf') # config.read('/home/lihai/pythonProjects/lot_manager/conf/main/config.conf') def valid(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'msg' not in origin_data: client.publish('error', payload='msg must be supplied', qos=0) return False if 'device_name' not in origin_data: client.publish('error', payload='device_name must be supplied', qos=0) return False if device_name != origin_data['device_name']: return False return True class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker self.port = port self.topic = topic self.username = username self.password = password self.client = mqtt.Client() # self.client.username_pw_set(self.username, self.password) self.client.on_connect = self.on_connect self.client.on_message = self.on_message def on_connect(self, client, userdata, flags, rc): client.subscribe(self.topic) client.publish('success', payload='成功订阅lot_mqtt,time=' + str(datetime.datetime.now()), qos=0) def on_message(self, client, userdata, msg): if not valid(msg, client): client.publish('error', payload='验证失败', qos=0) return client.publish('success', payload='验证通过', qos=0) try: origin_data = json.loads(msg.payload.decode('utf-8')) data = origin_data["msg"] if data == "push_stream": # 启动推流视频 push_stream(client) elif data == "close_stream": # 关闭推流视频 close_stream(client) elif data == "exec": # 执行命令 exec_sh(msg, client) elif data == "update": # git更新项目和配置文件 update(client) elif data == "record_list": # 查看录像列表 get_list_record() elif data == "record": # 获取录像 get_record(msg, client) elif data == "status": # 查看运行状态 get_status(client) else: # 错误类型 client.publish('error', payload='No Such Type', qos=0) except Exception as e: pass def start(self): self.client.connect(self.broker, self.port) if __name__ == '__main__': # broker = config.get("broker", "host") # # 这里必须是int类型 # port = config.getint("broker", "port") # topic = config.get("topic", "name") # username = config.get("security", "username") # password = config.get("security", "password") # print(broker, port, topic, username, password) MQTT = MQTTClient("192.168.1.27", 1883, "lot_mqtt", "demo", "123456") while True: try: MQTT.start() MQTT.client.loop_forever() except Exception as e: time.sleep(30)