import time import paho.mqtt.client as mqtt from tool import * from config import broker, port, subscribe_topic, username, password, info_topic def valid(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'msg' not in origin_data: client.publish(info_topic, payload=publish_payload(code=404, msg="msg must be supplied"), qos=0) return False return True class MQTTClient: def __init__(self, broker, port, topic, username, password): self.broker = broker self.port = port self.topic = topic self.username = username self.password = password # 千万不要指定client_id 不然死翘翘 self.client = mqtt.Client(client_id=subscribe_topic) self.client.username_pw_set(self.username, self.password) self.client.on_connect = self.on_connect self.client.on_message = self.on_message def on_connect(self, client, userdata, flags, rc): if rc == 0: self.client.subscribe(self.topic) client.publish(info_topic, payload=publish_payload(code=200, msg='成功订阅' + self.topic), qos=0) subprocess.Popen(['/bin/bash /home/pi/agri_xumu/bash/start_push_stream.sh'], shell=True) def on_message(self, client, userdata, msg): if not valid(msg, client): return try: origin_data = json.loads(msg.payload.decode('utf-8')) data = origin_data["msg"] if data == "push_stream": # 启动推流视频 push_stream(client) elif data == "close_stream": # 关闭推流视频 close_stream(client) elif data == "exec": # 执行命令 exec_sh(msg, client) elif data == "update": # git更新项目和配置文件 update(client) elif data == "reload": # 重启配置 reload(client) elif data == "status": # 查看运行状态 get_status(client) else: # 错误类型 client.publish(info_topic, payload=publish_payload(code=404, msg='No Such Msg Type'), qos=0) except Exception as e: pass def start(self): self.client.connect(self.broker, self.port) if __name__ == '__main__': # MQTT客户端 MQTT = MQTTClient(broker, port, subscribe_topic, username, password) # 循环连接 while True: try: MQTT.start() # 阻塞监听 MQTT.client.loop_forever() except Exception as e: # 异常等待时间在进行连接 subprocess.Popen(['/bin/bash /home/pi/agri_xumu/bash/stop_push_stream.sh'], shell=True) time.sleep(30)