From 3791368e677d5b74bc0ac2da1eaf4b4438b3a8a1 Mon Sep 17 00:00:00 2001 From: xyj <10908227994@qq.com> Date: Mon, 18 Dec 2023 10:51:06 +0800 Subject: [PATCH] update --- MQTT.py | 19 +++--------- conf/device/device.conf | 6 ++++ conf/main/common.conf | 4 +-- config.py | 69 +++++++++++++++++++++-------------------- data_upload.py | 4 +-- tool.py | 38 ++++++++++++++--------- 6 files changed, 72 insertions(+), 68 deletions(-) diff --git a/MQTT.py b/MQTT.py index 05cb3b2..a1f7496 100644 --- a/MQTT.py +++ b/MQTT.py @@ -1,25 +1,16 @@ -import datetime import time import paho.mqtt.client as mqtt from tool import * -from config import broker, port, subscribe_topic, publish_topic, username, password, publish_pwd +from config import broker, port, subscribe_topic, username, password, info_topic def valid(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'msg' not in origin_data: - client.publish('error', payload='msg must be supplied', qos=0) - return False - # TODO 增加安全性 - if 'pwd' not in origin_data: - client.publish('error', payload='pwd must be supplied', qos=0) - return False - # 判断密码是否正确 - if publish_pwd != origin_data['pwd']: - client.publish('error', payload='pwd is not corrected', qos=0) + client.publish(info_topic, payload=BaseResponse(code=404, msg="msg must be supplied"), qos=0) return False return True @@ -40,14 +31,12 @@ class MQTTClient: def on_connect(self, client, userdata, flags, rc): if rc == 0: self.client.subscribe(self.topic) - client.publish('success', payload='成功订阅' + subscribe_topic + ',time=' + str(datetime.datetime.now()), + client.publish(info_topic, payload=BaseResponse(code=200, msg='成功订阅' + self.topic), qos=0) def on_message(self, client, userdata, msg): if not valid(msg, client): - client.publish('error', payload='验证失败', qos=0) return - client.publish('success', payload='验证通过', qos=0) try: origin_data = json.loads(msg.payload.decode('utf-8')) data = origin_data["msg"] @@ -77,7 +66,7 @@ class MQTTClient: get_status(client) else: # 错误类型 - client.publish('error', payload='No Such Type', qos=0) + client.publish(info_topic, payload=BaseResponse(code=404, msg='No Such Msg Type'), qos=0) except Exception as e: pass diff --git a/conf/device/device.conf b/conf/device/device.conf index 8cf0cd3..fa5bb8f 100644 --- a/conf/device/device.conf +++ b/conf/device/device.conf @@ -5,6 +5,7 @@ subscribe_topic=lihai_lot_walnutpi_dev_1 # 发布消息的主题 publish_topic=camera_1 +info_topic=info_dev_1 username=lihai_lot_land_1 password=123456 #################################### @@ -14,6 +15,7 @@ password=123456 subscribe_topic=lihai_lot_walnutpi_dev_2 # 发布消息的主题 publish_topic=camera_2 +info_topic=info_dev_2 username=lihai_lot_land_1 password=123456 #################################### @@ -23,6 +25,7 @@ password=123456 subscribe_topic=lihai_lot_walnutpi_dev_3 # 发布消息的主题 publish_topic=camera_3 +info_topic=info_dev_3 username=lihai_lot_land_1 password=123456 #################################### @@ -32,6 +35,7 @@ password=123456 subscribe_topic=lihai_lot_walnutpi_dev_4 # 发布消息的主题 publish_topic=camera_4 +info_topic=info_dev_4 username=lihai_lot_land_1 password=123456 #################################### @@ -41,6 +45,7 @@ password=123456 subscribe_topic=lihai_lot_walnutpi_dev_5 # 发布消息的主题 publish_topic=camera_5 +info_topic=info_dev_5 username=lihai_lot_land_1 password=123456 #################################### @@ -50,5 +55,6 @@ password=123456 subscribe_topic=lihai_lot_walnutpi_dev_6 # 发布消息的主题 publish_topic=camera_6 +info_topic=info_dev_6 username=lihai_lot_land_1 password=123456 \ No newline at end of file diff --git a/conf/main/common.conf b/conf/main/common.conf index c208843..eebf322 100644 --- a/conf/main/common.conf +++ b/conf/main/common.conf @@ -4,6 +4,4 @@ host=mqtt.lihaink.cn port=1883 [record] post_record_list_url=https://ceshi-iot.lihaink.cn/api/index/file_list -post_record_url=https://ceshi-iot.lihaink.cn/api/index/upload -[publish_pwd] -value=123456 \ No newline at end of file +post_record_url=https://ceshi-iot.lihaink.cn/api/index/upload \ No newline at end of file diff --git a/config.py b/config.py index 9d74a1f..bc77ef9 100644 --- a/config.py +++ b/config.py @@ -2,40 +2,43 @@ import configparser import subprocess -p = subprocess.Popen(['cat /home/pi/device_name'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) -out, err = p.communicate() -# 设备名称,必须要有 -device_name = out.decode('utf-8').strip() +try: + p = subprocess.Popen(['cat /home/pi/device_name'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = p.communicate() + # 设备名称,必须要有 + device_name = out.decode('utf-8').strip() -# 读取配置 -config = configparser.ConfigParser() -# 读取公共配置 -config.read('conf/main/common.conf') -# 域名 -broker = config.get("broker", "host") -# 端口,这里必须是int类型 -port = config.getint("broker", "port") -# 录像地址 -post_record_list_url = config.get("record", "post_record_list_url") -post_record_url = config.get("record", "post_record_url") -# 发布消息的认证pwd -publish_pwd = config.get("publish_pwd", "value") + # 读取配置 + config = configparser.ConfigParser() + # 读取公共配置 + config.read('conf/main/common.conf') + # 域名 + broker = config.get("broker", "host") + # 端口,这里必须是int类型 + port = config.getint("broker", "port") + # 录像地址 + post_record_list_url = config.get("record", "post_record_list_url") + post_record_url = config.get("record", "post_record_url") + # 读取设备配置 + config.read('conf/device/device.conf') + # 订阅的主题 + subscribe_topic = config.get(device_name, "subscribe_topic") + # 发布数据的主题 + publish_topic = config.get(device_name, "publish_topic") + # 发布信息的主题 + info_topic = config.get(device_name, "info_topic") + # 用户 + username = config.get(device_name, "username") + # 密码 + password = config.get(device_name, "password") -# 读取设备配置 -config.read('conf/device/device.conf') -# 订阅的主题 -subscribe_topic = config.get(device_name, "subscribe_topic") -# 发布的主题 -publish_topic = config.get(device_name, "publish_topic") -# 用户 -username = config.get(device_name, "username") -# 密码 -password = config.get(device_name, "password") + # 特殊配置 + config.read('conf/zhanguan/topic.conf') + zhanguan_device_name = config.get("device", "name") -# 特殊配置 -config.read('conf/zhanguan/topic.conf') -zhanguan_device_name = config.get("device", "name") - -# tool配置 -mp4_path = '/home/pi/mp4' \ No newline at end of file + # tool配置 + mp4_path = '/home/pi/mp4' +except Exception as e: + # print(e) + pass diff --git a/data_upload.py b/data_upload.py index 15891e9..4dc15f3 100644 --- a/data_upload.py +++ b/data_upload.py @@ -3,7 +3,7 @@ import time import paho.mqtt.client as mqtt from tool import * -from config import broker, port, subscribe_topic, username, password +from config import broker, port, subscribe_topic, username, password, info_topic times = 6 @@ -24,7 +24,7 @@ class DataUploadClient: def on_connect(self, client, userdata, flags, rc): global times times = 6 - self.client.publish('success', payload=subscribe_topic + ':连接成功,执行数据推送和本地存储' + str(time.time()), + self.client.publish(info_topic, payload=subscribe_topic + ':连接成功,执行数据推送和本地存储' + str(time.time()), qos=0) subprocess.Popen(['/usr/bin/bash /home/pi/lot_manager/bash/start_data_upload.sh'], shell=True) diff --git a/tool.py b/tool.py index 63bec6f..4862dab 100755 --- a/tool.py +++ b/tool.py @@ -2,9 +2,18 @@ import json import os import subprocess +import pydantic import requests +from pydantic import BaseModel + +from config import mp4_path, post_record_list_url, post_record_url, info_topic + + +# 统一返回 +class BaseResponse(BaseModel): + code: int = pydantic.Field(200, description="MQTT Return Status Code") + msg: str = pydantic.Field("success", description="MQTT Status Message") -from config import mp4_path, post_record_list_url, post_record_url def exception_handler(func): def wrapper(*args, **kwargs): @@ -14,6 +23,8 @@ def exception_handler(func): print(f"函数{func.__name__}中发生了异常:{e}") return wrapper + + def push_stream(client): p = subprocess.Popen(['/bin/bash /home/pi/lot_manager/bash/start_push_stream.sh'], shell=True, @@ -21,7 +32,7 @@ def push_stream(client): stderr=subprocess.PIPE) out, err = p.communicate() output = out.decode('utf-8').strip() - client.publish('success', payload=json.dumps(output, ensure_ascii=False), qos=0) + client.publish(info_topic, payload=BaseResponse(code=200, msg=json.dumps(output, ensure_ascii=False)), qos=0) def close_stream(client): @@ -31,13 +42,13 @@ def close_stream(client): stderr=subprocess.PIPE) out, err = p.communicate() output = out.decode('utf-8').strip() - client.publish('success', payload=json.dumps(output, ensure_ascii=False), qos=0) + client.publish(info_topic, payload=BaseResponse(code=200, msg=json.dumps(output, ensure_ascii=False)), qos=0) def exec_sh(msg, client): origin_data = json.loads(msg.payload.decode('utf-8')) if 'data' not in origin_data: - client.publish('error', payload='data must be supplied', qos=0) + client.publish(info_topic, payload='data must be supplied', qos=0) return cmd = origin_data["data"] if cmd in ["supervisorctl stop __mqtt__", @@ -45,13 +56,13 @@ def exec_sh(msg, client): "supervisorctl stop all"]: return if cmd == "supervisorctl reload": - client.publish('success', payload='reloading', qos=0) + client.publish(info_topic, payload='reloading', qos=0) subprocess.Popen([cmd], shell=True) return p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() output = out.decode('utf-8').strip() - client.publish('success', payload=json.dumps(output, ensure_ascii=False), qos=0) + client.publish(info_topic, payload=BaseResponse(code=200, msg=json.dumps(output, ensure_ascii=False)), qos=0) def get_status(client): @@ -61,7 +72,7 @@ def get_status(client): stderr=subprocess.PIPE) out, err = p.communicate() output = out.decode('utf-8').strip() - client.publish('success', payload=json.dumps(output, ensure_ascii=False), qos=0) + client.publish(info_topic, payload=BaseResponse(code=200, msg=json.dumps(output, ensure_ascii=False)), qos=0) def update(client): @@ -71,11 +82,11 @@ def update(client): stderr=subprocess.PIPE) out, err = p.communicate() output = out.decode('utf-8').strip() - client.publish('success', payload=json.dumps(output, ensure_ascii=False), qos=0) + client.publish(info_topic, payload=BaseResponse(code=200, msg=json.dumps(output, ensure_ascii=False)), qos=0) def reload(client): - client.publish('success', payload="reloading", qos=0) + client.publish(info_topic, payload="reloading", qos=0) subprocess.Popen(['supervisorctl reload'], shell=True) @@ -84,18 +95,15 @@ def get_list_record(client): "data": os.listdir(mp4_path) } r = requests.post(post_record_list_url, json=data) - client.publish('success', payload=str(r), qos=0) + client.publish(info_topic, payload=BaseResponse(code=200, msg=str(r)), qos=0) def get_record(msg, client): filename = json.loads(msg.payload.decode('utf-8'))["data"] if filename is None or filename == '': - client.publish('error', payload='没有该文件', qos=0) + client.publish(info_topic, payload=BaseResponse(code=404, msg='没有该文件'), qos=0) return files = {filename: open(os.path.join(mp4_path, filename), 'rb'), "Content-Type": "application/octet-stream"} r = requests.post(post_record_url, files=files) - client.publish('success', payload=str(r), qos=0) - - - + client.publish(info_topic, payload=BaseResponse(code=200, msg=str(r)), qos=0)