commit 5387def0123ad3ada0520a3b8f968b8ef285147a Author: xyj <10908227994@qq.com> Date: Sat Dec 2 10:53:31 2023 +0800 first diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..53670f4 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,33 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/lot_manager2.iml b/.idea/lot_manager2.iml new file mode 100644 index 0000000..4cfde48 --- /dev/null +++ b/.idea/lot_manager2.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..419a75c --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..af6e32c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/MQTT.py b/MQTT.py new file mode 100644 index 0000000..501de0f --- /dev/null +++ b/MQTT.py @@ -0,0 +1,48 @@ +import json + +import paho.mqtt.client as mqtt + +from tool import push_stream, close_stream, update, exec_sh + + +class MQTT: + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + client.subscribe('lot_mqtt') + + # Message receiving callback + def on_message(self, client, userdata, msg): + data = json.loads(msg.payload.decode('utf-8'))["msg"] + if data == "push_stream": + # 启动推流视频 + push_stream() + client.publish('success', payload='push_stream success', qos=0) + elif data == "close_stream": + # 关闭推流视频 + close_stream() + client.publish('success', payload='close_stream success', qos=0) + elif data == "exec": + # 执行命令 + exec_sh(msg) + client.publish('success', payload='exec_sh success', qos=0) + elif data == "update": + # git更新项目和配置文件 + update() + client.publish('success', payload='update success', qos=0) + else: + client.publish('error', payload='No Such Type', qos=0) + + def __init__(self): + self.client = mqtt.Client() + self.client.username_pw_set("demo", "123456") + # Specify callback function + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + # Establish a connection + self.client.connect('ceshi-mqtt.lihaink.cn', 1883) + # Publish a message + self.client.loop_forever(retry_first_connection=True) + + +if __name__ == '__main__': + mq = MQTT() diff --git a/api.py b/api.py new file mode 100644 index 0000000..9d42af0 --- /dev/null +++ b/api.py @@ -0,0 +1,33 @@ +import os +import subprocess + +import uvicorn +from db.models.base import BaseResponse +from db.models.log_data_model import LOT_DATA, LOT_DATA_MODEL +from db.repository import add_kb_to_db, get_kb_detail, get_kb_detail_by_time, delete_kb_detail_by_time + +from fastapi import FastAPI + + +def add(data: LOT_DATA): + try: + add_kb_to_db(data) + return BaseResponse() + except Exception as e: + return BaseResponse(code=500, msg=e) + + +def delete(start_time, end_time): + try: + delete_kb_detail_by_time(start_time, end_time) + return BaseResponse() + except Exception as e: + return BaseResponse(code=500, msg=e) + + +def get_data(): + try: + data = get_kb_detail() + return BaseResponse(data=data) + except Exception as e: + return BaseResponse(code=404, msg=e) diff --git a/ceshi.py b/ceshi.py new file mode 100644 index 0000000..b2b0fb9 --- /dev/null +++ b/ceshi.py @@ -0,0 +1,413 @@ +# cd demo/ceshi-1/ +# python ceshi.py + +import serial +import time +import struct +import json +import paho.mqtt.client as mqtt + +from api import add +from db.models.log_data_model import LOT_DATA + + +def hex_to_float(hex_str): + hex_int = int(hex_str, 16) + return struct.unpack('!f', struct.pack('!I', hex_int))[0] + + +def print_json(data): + print(json.dumps(data, sort_keys=True, indent=4, separators=(', ', ': '), ensure_ascii=False)) + + +def run_with_client(client): + temp_send = '06 03 01 F4 00 02 85 B2 ' # 温湿度查询指令 + co2_send = '06 03 01 F7 00 02 75 B2 ' # 二氧化碳查询指令 + pressure_send = '06 03 01 F9 00 02 14 71 ' # 气压查询指令 + sun_send = '06 03 01 FA 00 02 E4 71 ' # 光照查询指令 + + soil_send = '02 03 00 00 00 04 44 3A' # 土壤查询指令 + danlinjia_send = '02 03 00 04 00 03 44 39' # 氮磷钾查询指令 + + rainfall_send = '03 03 00 00 00 01 85 E8' # 雨量查询指令 + + windspeed_send = '04 03 00 00 00 02 C4 5E' # 风速查询指令 + winddirection_send = '05 03 00 00 00 02 C5 8F' # 风向查询指令 + + ser = serial.Serial("/dev/ttyS2", 9600) + + # 发送的数据转为2进制b'\x01\x03\x00\x00\x00\x02\xc4\x0b' + temp_send = bytes.fromhex(temp_send) + co2_send = bytes.fromhex(co2_send) + pressure_send = bytes.fromhex(pressure_send) + sun_send = bytes.fromhex(sun_send) + soil_send = bytes.fromhex(soil_send) + danlinjia_send = bytes.fromhex(danlinjia_send) + rainfall_send = bytes.fromhex(rainfall_send) + windspeed_send = bytes.fromhex(windspeed_send) + winddirection_send = bytes.fromhex(winddirection_send) + + while True: + # if ser.is_open: + ser.write(temp_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + tempbuffer_data = ser.in_waiting + if tempbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(tempbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + airtemp_data = int(return_data_hex[6:10], 16) / 10 + airhumi_data = int(return_data_hex[10:14], 16) / 10 + time.sleep(5) + + ser.write(co2_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + co2buffer_data = ser.in_waiting + if co2buffer_data: + return_data = ser.read(co2buffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + co2_data = int(return_data_hex[6:10], 16) + time.sleep(5) + + ser.write(pressure_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + pressurebuffer_data = ser.in_waiting + if pressurebuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(pressurebuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + pressure_data = int(return_data_hex[6:10], 16) / 10 + time.sleep(5) + + ser.write(sun_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + sunbuffer_data = ser.in_waiting + if sunbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(sunbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + sun_data = int(return_data_hex[6:14], 16) + time.sleep(5) + + # print('send soil directives') + ser.write(soil_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + soilbuffer_data = ser.in_waiting + # print(buffer_data, 'buffer_data') + if soilbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(soilbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + humidity_data = int(return_data_hex[6:10], 16) / 10 + temperature_data = int(return_data_hex[10:14], 16) / 10 + electrical_data = int(return_data_hex[14:18], 16) / 10 + PH_data = int(return_data_hex[18:22], 16) / 10 + time.sleep(5) + + # print('send danlinjia directives') + ser.write(danlinjia_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + danlinjiabuffer_data = ser.in_waiting + # print(buffer_data, 'buffer_data') + if danlinjiabuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(danlinjiabuffer_data) + # print('返回的数据2进制:', return_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # # 对返回的数据进行解析,获取温度和湿度数据 + dan_data = int(return_data_hex[6:10], 16) / 10 + lin_data = int(return_data_hex[10:14], 16) / 10 + jia_data = int(return_data_hex[14:18], 16) / 10 + time.sleep(5) + + # print('send rainfall directives') + ser.write(rainfall_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + rainfallbuffer_data = ser.in_waiting + # print(buffer_data, 'buffer_data') + if rainfallbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(rainfallbuffer_data) + # print('返回的数据2进制:', return_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # print('返回的数据转换为16进制:', return_data_hex) + # # 对返回的数据进行解析,获取温度和湿度数据 + # print("当前雨量值为:", int(return_data_hex[6:10], 16)/10)#单位mm + rainfall_data = int(return_data_hex[6:10], 16) / 10 + time.sleep(5) + + ser.write(windspeed_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + windspeedbuffer_data = ser.in_waiting + # print(buffer_data, 'buffer_data') + if windspeedbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(windspeedbuffer_data) + # print('返回的数据2进制:', return_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # print('返回的数据转换为16进制:', return_data_hex) + # print("当前风速为:", int(return_data_hex[6:10], 16)/10)#单位mm + speedwind_data = int(return_data_hex[6:10], 16) / 10 + time.sleep(5) + + # print('send winddirection directives') + ser.write(winddirection_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + winddirectionbuffer_data = ser.in_waiting + # print(buffer_data, 'buffer_data') + if winddirectionbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(winddirectionbuffer_data) + # print('返回的数据2进制:', return_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # print('返回的数据转换为16进制:', return_data_hex) + # print("当前风向为:", int(return_data_hex[10:14], 16))#单位mm + winddirection_data = int(return_data_hex[10:14], 16) + time.sleep(5) + # print('{"name":"%d","name1":"%d"}', 123,456) + # data = [{'ngvhgv': airtemp_data}, {'nvjgvjvj':airhumi_data}] + data = {'ambient_temperature': airtemp_data, + 'ambient_humidity': airhumi_data, + 'carbon_dioxide': co2_data, + 'ambient_air_pressure': pressure_data, + 'ambient_lighting': sun_data, + 'soil_moisture': humidity_data, + 'soil_temperature': temperature_data, + 'soil_conductivity': electrical_data, + 'soil_PH': PH_data, + 'soil_potassium_phosphate_nitrogen': dan_data, + 'soil_potassium_phosphate_phosphorus': lin_data, + 'soil_potassium_phosphate_potassium': jia_data, + 'rainfall': rainfall_data, + 'wind_speed': speedwind_data, + 'wind_direction': winddirection_data, + 'create_time': int(time.time()) + } + t = LOT_DATA(**data) + # TODO 判断数据是否正常 + # 发送给服务器 + client.publish('demo', payload=json.dumps(data, ensure_ascii=False), qos=0) + add(t) + + +def run_no_client(): + temp_send = '06 03 01 F4 00 02 85 B2 ' # 温湿度查询指令 + co2_send = '06 03 01 F7 00 02 75 B2 ' # 二氧化碳查询指令 + pressure_send = '06 03 01 F9 00 02 14 71 ' # 气压查询指令 + sun_send = '06 03 01 FA 00 02 E4 71 ' # 光照查询指令 + + soil_send = '02 03 00 00 00 04 44 3A' # 土壤查询指令 + danlinjia_send = '02 03 00 04 00 03 44 39' # 氮磷钾查询指令 + + rainfall_send = '03 03 00 00 00 01 85 E8' # 雨量查询指令 + + windspeed_send = '04 03 00 00 00 02 C4 5E' # 风速查询指令 + winddirection_send = '05 03 00 00 00 02 C5 8F' # 风向查询指令 + + ser = serial.Serial("/dev/ttyS2", 9600) + + # 发送的数据转为2进制b'\x01\x03\x00\x00\x00\x02\xc4\x0b' + temp_send = bytes.fromhex(temp_send) + co2_send = bytes.fromhex(co2_send) + pressure_send = bytes.fromhex(pressure_send) + sun_send = bytes.fromhex(sun_send) + soil_send = bytes.fromhex(soil_send) + danlinjia_send = bytes.fromhex(danlinjia_send) + rainfall_send = bytes.fromhex(rainfall_send) + windspeed_send = bytes.fromhex(windspeed_send) + winddirection_send = bytes.fromhex(winddirection_send) + + while True: + ser.write(temp_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + tempbuffer_data = ser.in_waiting + if tempbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(tempbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + airtemp_data = int(return_data_hex[6:10], 16) / 10 + airhumi_data = int(return_data_hex[10:14], 16) / 10 + time.sleep(5) + + ser.write(co2_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + co2buffer_data = ser.in_waiting + if co2buffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(co2buffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + co2_data = int(return_data_hex[6:10], 16) + time.sleep(5) + + ser.write(pressure_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + pressurebuffer_data = ser.in_waiting + if pressurebuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(pressurebuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + pressure_data = int(return_data_hex[6:10], 16) / 10 + time.sleep(5) + + ser.write(sun_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + sunbuffer_data = ser.in_waiting + if sunbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(sunbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + sun_data = int(return_data_hex[6:14], 16) + time.sleep(5) + + ser.write(soil_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + soilbuffer_data = ser.in_waiting + if soilbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(soilbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # 对返回的数据进行解析,获取温度和湿度数据 + humidity_data = int(return_data_hex[6:10], 16) / 10 + temperature_data = int(return_data_hex[10:14], 16) / 10 + electrical_data = int(return_data_hex[14:18], 16) / 10 + PH_data = int(return_data_hex[18:22], 16) / 10 + time.sleep(5) + + ser.write(danlinjia_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + danlinjiabuffer_data = ser.in_waiting + if danlinjiabuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(danlinjiabuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # # 对返回的数据进行解析,获取温度和湿度数据 + dan_data = int(return_data_hex[6:10], 16) / 10 + lin_data = int(return_data_hex[10:14], 16) / 10 + jia_data = int(return_data_hex[14:18], 16) / 10 + time.sleep(5) + + ser.write(rainfall_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + rainfallbuffer_data = ser.in_waiting + if rainfallbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(rainfallbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + # # 对返回的数据进行解析,获取温度和湿度数据 + rainfall_data = int(return_data_hex[6:10], 16) / 10 + time.sleep(5) + + ser.write(windspeed_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + windspeedbuffer_data = ser.in_waiting + if windspeedbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(windspeedbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + speedwind_data = int(return_data_hex[6:10], 16) / 10 + time.sleep(5) + + ser.write(winddirection_send) + time.sleep(1) + # 获取返回的缓冲data,获取的是buffer_data的长度 9 + winddirectionbuffer_data = ser.in_waiting + if winddirectionbuffer_data: + # 返回的数据为2进制:b'\x01\x03\x04\x01\x08\x022\xfa\xb8' + return_data = ser.read(winddirectionbuffer_data) + # 二进制转换为16进制:010304010802307b79 + return_data_hex = str(return_data.hex()) + winddirection_data = int(return_data_hex[10:14], 16) + time.sleep(5) + data = {'ambient_temperature': airtemp_data, + 'ambient_humidity': airhumi_data, + 'carbon_dioxide': co2_data, + 'ambient_air_pressure': pressure_data, + 'ambient_lighting': sun_data, + 'soil_moisture': humidity_data, + 'soil_temperature': temperature_data, + 'soil_conductivity': electrical_data, + 'soil_PH': PH_data, + 'soil_potassium_phosphate_nitrogen': dan_data, + 'soil_potassium_phosphate_phosphorus': lin_data, + 'soil_potassium_phosphate_potassium': jia_data, + 'rainfall': rainfall_data, + 'wind_speed': speedwind_data, + 'wind_direction': winddirection_data} + data = LOT_DATA(**data, create_time=int(time.time())) + # TODO 判断数据是否正常 + add(data) + + +class UPLOAD: + def __init__(self): + self.times = 3 + self.client = mqtt.Client(transport="websockets") + self.client.username_pw_set("demo", "123456") + # Specify callback function + self.client.on_connect = self.on_connect + # Establish a connection + self.client.connect('ceshi-mqtt.lihaink.cn', 8083) + # Publish a message + self.client.loop_forever() + + def on_connect(self, client, userdata, flags, rc): + from threading import Thread + if rc == 0: + print("连接成功,执行数据推送和本地存储") + nt1 = Thread(target=self.t1) + nt1.start() + else: + if self.times != 0: + self.times -= 1 + client.reconnect() + else: + print("3次失败,执行本地存储") + nt2 = Thread(target=self.t2) + nt2.start() + + def t2(self): + run_no_client() + + def t1(self): + run_with_client(self.client) diff --git a/conf/data_upload.conf b/conf/data_upload.conf new file mode 100644 index 0000000..e76b4fc --- /dev/null +++ b/conf/data_upload.conf @@ -0,0 +1,13 @@ +[program:data_upload] +directory=/home/pi/lot_manager +command=/usr/bin/python ceshi.py +user=pi +;是否随开机自启 或者reload自启动 +autostart=true +;失败重启 +autorestart=true +;重启次数 +restart_times=3 +redirect_stderr=true +stopsignal=TERM +stopasgroup=True diff --git a/conf/push_stream.conf b/conf/push_stream.conf new file mode 100644 index 0000000..d9d4dc3 --- /dev/null +++ b/conf/push_stream.conf @@ -0,0 +1,9 @@ +[program:push_stream] +directory=/home/pi/lot_manager +command=/usr/bin/ffmpeg -rtsp_transport tcp -re -i rtsp://admin:123456@192.168.0.226:554/mpeg4 -c:v copy -c:a aac -preset ultrafast -r 20 -flvflags no_duration_filesize -f rtsp -rtsp_transport tcp rstp://192.168.1.27:554/live/test +user=pi +autostart=false +autorestart=true +redirect_stderr=true +stopsignal=TERM +stopasgroup=True \ No newline at end of file diff --git a/conf/test.conf b/conf/test.conf new file mode 100644 index 0000000..b8ec499 --- /dev/null +++ b/conf/test.conf @@ -0,0 +1,13 @@ +[program:test] +directory=/home/pi/lot_manager +command=/usr/bin/python test.py +user=pi +;是否随开机自启 或者reload自启动 +autostart=false +;失败重启 +autorestart=true +;重启次数 +restart_times=3 +redirect_stderr=true +stopsignal=TERM +stopasgroup=True diff --git a/db/__init__.py b/db/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/db/base.py b/db/base.py new file mode 100755 index 0000000..f7e4b6c --- /dev/null +++ b/db/base.py @@ -0,0 +1,19 @@ +import os + +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta +from sqlalchemy.orm import sessionmaker + +import json +KB_ROOT_PATH = "./" +DB_ROOT_PATH = os.path.join(KB_ROOT_PATH, "lot_data.db") +SQLALCHEMY_DATABASE_URI = f"sqlite:///{DB_ROOT_PATH}" + +engine = create_engine( + SQLALCHEMY_DATABASE_URI, + json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False), +) + +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +Base: DeclarativeMeta = declarative_base() diff --git a/db/models/__init__.py b/db/models/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/db/models/base.py b/db/models/base.py new file mode 100755 index 0000000..43c821d --- /dev/null +++ b/db/models/base.py @@ -0,0 +1,25 @@ +from datetime import datetime +from typing import Any, List + +import pydantic +from pydantic import BaseModel +from sqlalchemy import Column, DateTime, String, Integer + +from db.models.log_data_model import LOT_DATA_MODEL, LOT_DATA + + +class BaseResponse(BaseModel): + code: int = 200 + msg: str = "success" + data: Any = None + + +class BaseModel: + """ + 基础模型 + """ + id = Column(Integer, primary_key=True, index=True, comment="主键ID") + create_time = Column(DateTime, default=datetime.utcnow, comment="创建时间") + update_time = Column(DateTime, default=None, onupdate=datetime.utcnow, comment="更新时间") + create_by = Column(String, default=None, comment="创建者") + update_by = Column(String, default=None, comment="更新者") diff --git a/db/models/log_data_model.py b/db/models/log_data_model.py new file mode 100755 index 0000000..f281a4a --- /dev/null +++ b/db/models/log_data_model.py @@ -0,0 +1,49 @@ +from pydantic import BaseModel, Field +from sqlalchemy import * +from db.base import Base + + +class LOT_DATA(BaseModel): + create_time: int = Field(None, description='创建时间(时间戳) ') + wind_speed: float = Field(None, description='风速:(0到30)m/s ') + wind_direction: float = Field(None, description='风向:0~360°') + ambient_temperature: float = Field(None, description='环境温度:℃') + ambient_humidity: float = Field(None, description='环境湿度:%RH') + carbon_dioxide: float = Field(None, description='二氧化碳:0~5000ppm') + ambient_air_pressure: float = Field(None, description='环境气压:0~120KPa') + rainfall: float = Field(None, description='雨量:mm') + ambient_lighting: float = Field(None, description='环境光照:0-65535Lux;0-20万Lux') + soil_temperature: float = Field(None, description='土壤温度:-40~80℃') + soil_moisture: float = Field(None, description='土壤湿度:0-100%RH') + soil_conductivity: float = Field(None, description='土壤电导率:0-20000μS/cm') + soil_PH: float = Field(None, description='土壤PH:3~9PH') + soil_potassium_phosphate_nitrogen: float = Field(None, description='土壤磷酸钾:氮的标准值在140-225mg/kg') + soil_potassium_phosphate_phosphorus: float = Field(None, description='土壤磷酸钾:磷的标准值在57-100mg/kg,') + soil_potassium_phosphate_potassium: float = Field(None, description='土壤磷酸钾:钾的标准值在106-150mg/kg') + + +class LOT_DATA_MODEL(Base): + """ + 物联网数据模型 + """ + __tablename__ = 'LOT_DATA_MODEL' + id = Column(Integer, primary_key=True, autoincrement=True, comment='ID') + create_time = Column(Integer, comment="创建时间") + wind_speed = Column(Float, comment='风速:(0到30)m/s ') + wind_direction = Column(Float, comment='风向:0~360°') + ambient_temperature = Column(Float, comment='环境温度:℃') + ambient_humidity = Column(Float, comment='环境湿度:%RH') + carbon_dioxide = Column(Float, comment='二氧化碳:0~5000ppm') + ambient_air_pressure = Column(Float, comment='环境气压:0~120KPa') + rainfall = Column(Float, comment='雨量:mm') + ambient_lighting = Column(Float, comment='环境光照:0-65535Lux;0-20万Lux') + soil_temperature = Column(Float, comment='土壤温度:-40~80℃') + soil_moisture = Column(Float, comment='土壤湿度:0-100%RH') + soil_conductivity = Column(Float, comment='土壤电导率:0-20000μS/cm') + soil_PH = Column(Float, comment='土壤PH:3~9PH') + soil_potassium_phosphate_nitrogen = Column(Float, comment='土壤磷酸钾:氮的标准值在140-225mg/kg') + soil_potassium_phosphate_phosphorus = Column(Float, comment='土壤磷酸钾:磷的标准值在57-100mg/kg,') + soil_potassium_phosphate_potassium = Column(Float, comment='土壤磷酸钾:钾的标准值在106-150mg/kg') + + def __repr__(self): + return f"LOT_DATA_MODEL(id={self.id}, wind_speed={self.wind_speed}, wind_direction={self.wind_direction}, ambient_temperature={self.ambient_temperature}, ambient_humidity={self.ambient_humidity}, carbon_dioxide={self.carbon_dioxide}, ambient_air_pressure={self.ambient_air_pressure}, rainfall={self.rainfall}, ambient_lighting={self.ambient_lighting}, soil_temperature={self.soil_temperature}, soil_moisture={self.soil_moisture}, soil_conductivity={self.soil_conductivity}, Soil_PH={self.soil_PH}, soil_potassium_phosphate_nitrogen={self.soil_potassium_phosphate_nitrogen}, soil_potassium_phosphate_phosphorus={self.soil_potassium_phosphate_phosphorus}, soil_potassium_phosphate_potassium={self.soil_potassium_phosphate_potassium})" diff --git a/db/repository/__init__.py b/db/repository/__init__.py new file mode 100755 index 0000000..747de98 --- /dev/null +++ b/db/repository/__init__.py @@ -0,0 +1 @@ +from .lot_data_repository import * diff --git a/db/repository/lot_data_repository.py b/db/repository/lot_data_repository.py new file mode 100755 index 0000000..e3c5ea7 --- /dev/null +++ b/db/repository/lot_data_repository.py @@ -0,0 +1,51 @@ +import time + +from db.models.log_data_model import LOT_DATA_MODEL, LOT_DATA +from db.session import with_session + + +@with_session +def add_kb_to_db(session, data: LOT_DATA): + # 创建知识库实例 + kb = LOT_DATA_MODEL(**data.__dict__) + session.add(kb) + + +@with_session +def delete_kb_from_db(session, id): + kb = session.query(LOT_DATA_MODEL).filter_by(id=id).first() + if kb: + session.delete(kb) + return True + + +# 直接查询所有数据 +@with_session +def get_kb_detail(session): + all_data = session.query(LOT_DATA_MODEL).all() + data = [LOT_DATA(**d.__dict__) for d in all_data] + return data + + +# 根据时间查询数据 +@with_session +def get_kb_detail_by_time(session, start_time, end_time): + all_data = session.query(LOT_DATA_MODEL).filter( + start_time <= LOT_DATA_MODEL.create_time, LOT_DATA_MODEL.create_time <= end_time).all() + data = [LOT_DATA(**d.__dict__) for d in all_data] + return data + + +# 删除数据 +@with_session +def delete_kb_detail_by_time(session, start_time, end_time): + d = session.query(LOT_DATA_MODEL).filter(start_time <= LOT_DATA_MODEL.create_time, LOT_DATA_MODEL.create_time <= end_time).delete() + return d + + +# 查询数据 +@with_session +def get_data_by_field(session, field, value): + query = session.query(LOT_DATA_MODEL).filter(getattr(LOT_DATA_MODEL, field) == value) + result = query.all() + return result diff --git a/db/session.py b/db/session.py new file mode 100755 index 0000000..47cb59f --- /dev/null +++ b/db/session.py @@ -0,0 +1,46 @@ +from functools import wraps +from contextlib import contextmanager +from db.base import SessionLocal +from sqlalchemy.orm import Session + + +@contextmanager +def session_scope() -> Session: + """上下文管理器用于自动获取 Session, 避免错误""" + session = SessionLocal() + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() + + +def with_session(f): + @wraps(f) + def wrapper(*args, **kwargs): + with session_scope() as session: + try: + result = f(session, *args, **kwargs) + session.commit() + return result + except: + session.rollback() + raise + + return wrapper + + +def get_db() -> SessionLocal: + db = SessionLocal() + try: + yield db + finally: + db.close() + + +def get_db0() -> SessionLocal: + db = SessionLocal() + return db diff --git a/lot_data.db b/lot_data.db new file mode 100644 index 0000000..d43d67f Binary files /dev/null and b/lot_data.db differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0bfef0a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +pydantic~=1.10.13 +sqlalchemy~=2.0.19 +uvicorn~=0.23.2 +fastapi~=0.95.1 +paho-mqtt~=1.6.1 +requests~=2.31.0 \ No newline at end of file diff --git a/start_push.sh b/start_push.sh new file mode 100644 index 0000000..d536bca --- /dev/null +++ b/start_push.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +supervisorctl start push_stream diff --git a/stop_push.sh b/stop_push.sh new file mode 100644 index 0000000..19aa07e --- /dev/null +++ b/stop_push.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +supervisorctl stop push_stream diff --git a/test.py b/test.py new file mode 100644 index 0000000..619a846 --- /dev/null +++ b/test.py @@ -0,0 +1,30 @@ +import datetime +import time + +import requests + + +while (True): + print(time.time()) + time.sleep(10.0) +# data = { +# "wind_speed": 26, +# "wind_direction": 6, +# "ambient_temperature": 66, +# "ambient_humidity": 78, +# "carbon_dioxide": 94, +# "ambient_air_pressure": 48, +# "rainfall": 3, +# "ambient_lighting": 59, +# "soil_temperature": 71, +# "soil_moisture": 19, +# "soil_conductivity": 62, +# "soil_PH": 49, +# "soil_potassium_phosphate_nitrogen": 5, +# "soil_potassium_phosphate_phosphorus": 5, +# "soil_potassium_phosphate_potassium": 86 +# } +# data = LOT_DATA(**data, create_time=int(time.time())) +# add(data) +# print(data) +# requests.post("http://192.168.1.27:8000/add/lot_data", json=data) diff --git a/tool.py b/tool.py new file mode 100644 index 0000000..28242ff --- /dev/null +++ b/tool.py @@ -0,0 +1,19 @@ +import json +import subprocess + + +def push_stream(): + subprocess.Popen(['/bin/bash start_push.sh'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + +def close_stream(): + subprocess.Popen(['/bin/bash stop_push.sh'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + +def exec_sh(msg): + cmd = json.loads(msg.payload.decode('utf-8'))["data"] + subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + +def update(): + subprocess.Popen(['/bin/bash update.sh'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/update.sh b/update.sh new file mode 100644 index 0000000..2e62734 --- /dev/null +++ b/update.sh @@ -0,0 +1,9 @@ +#!/bin/bash +set -e +# git更新 +git pull origin device1 +# 配置文件复制到supervisor管理 +cp -r conf/*.conf /etc/supervisor/conf.d/ +# 更新配置文件 +supervisorctl reread +supervisorctl update