diff --git a/.idea/modules.xml b/.idea/modules.xml index d18be63..abc4740 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/.idea/xumu.iml b/.idea/xumu.iml deleted file mode 100644 index 502bcd2..0000000 --- a/.idea/xumu.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/config.py b/config.py index 3bd3058..f5b6b99 100644 --- a/config.py +++ b/config.py @@ -1,31 +1,6 @@ # 数据库插入类型 import base64 -import copy -import time - - -rfid_type = ["TEXT", "TEXT", "FLOAT", "INT32"] -air_type = ["TEXT", "FLOAT", "FLOAT", "INT32"] -else_type = ["TEXT", "FLOAT", "INT32"] -rfid_measurement = ["iccid", "RFID", "temperature" "type"] -air_measurement = ["iccid", "air_temperature", "air_humidity", "type"] -else_measurement = ["iccid", "value", "type"] -dataTypes = { - 0: rfid_type, - 1: air_type, - 2: else_type, - 3: else_type, - 4: else_type, - 5: else_type, -} -measurements = { - 0: rfid_measurement, - 1: air_measurement, - 2: else_measurement, - 3: else_measurement, - 4: else_measurement, - 5: else_measurement, -} +# url baseHost = "https://iot.lihaink.cn/iotdb_restapi" # 注意这里前面不能加/ insertUri = "rest/v2/insertRecords" @@ -40,206 +15,3 @@ headers = { 'ContentType': 'application/json', 'Authorization': "Basic " + token } - - -def RFID_template(type, deviceId): - return [ - f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.temperature(v) WITH datatype=FLOAT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.RFID(r) WITH datatype=TEXT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", - ] - - -def common_template(type, deviceId): - return [ - f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.value(v) WITH datatype=FLOAT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", - ] - - -def air_template(type, deviceId): - return [ - f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.air_temperature(at) WITH datatype=FLOAT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.air_humidity(ah) WITH datatype=FLOAT,ENCODING=PLAIN", - f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", - ] - - -def warning_sql(deviceId, type): - template = "warning" - match type: - case 0: - rfid = RFID_template(template, deviceId) - rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") - rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") - return rfid - case 1: - air = air_template(template, deviceId) - air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") - air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") - return air - case _: - common = common_template(template, deviceId) - common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") - common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") - return common - - -def rfid_deviceId(rfid, deviceId): - return [ - f"insert into root.rfid(rfid, deviceId) values('{rfid}', '{deviceId}')" - ] - - -temperature_threshold = [37, 41] -air_temperature_threshold = [0, 40] -air_humidity_threshold = [30, 80] -danqi_threshold = [1.24] -jiawan_threshold = [3000] -zaoyin_threshold = [55] -yanwu_threshold = [200] - - -def is_warning(deviceId, v, t): - match t: - case 0: - temperature = v[1] - if temperature_threshold[1] <= temperature or temperature <= temperature_threshold[0]: - return True - case 1: - air_temperature = v[1] - air_humidity = v[2] - if air_temperature_threshold[1] <= air_temperature or air_temperature <= air_temperature_threshold[0]: - return True - if air_humidity_threshold[1] <= air_humidity or air_humidity <= air_humidity_threshold[0]: - return True - case 2: - danqi = v[1] - if danqi >= danqi_threshold[0]: - return True - case 3: - jiawan = v[1] - if jiawan > jiawan_threshold[0]: - return True - case 4: - zaoyin = v[1] - if zaoyin >= zaoyin_threshold[0]: - return True - case 5: - yanwu = v[1] - if yanwu >= yanwu_threshold[0]: - return True - return False - - -def insert_to_warning_sql(deviceId, v, t): - ml = copy.deepcopy(measurements[t]) - dt = copy.deepcopy(dataTypes[t]) - ml.append("reason") - ml.append("solve") - dt.append("TEXT") - dt.append("TEXT") - # 设置原因和解决方案 - # 解决方案可以调用AI进行生成回答 - if t == 0: - # RFID 温度 - temperature = v[1] - prompt = f"动物体温为{temperature},正常吗?如果体温过高或过低,如何解决?" - if temperature_threshold[1] <= temperature: - v.append("体温过高") - elif temperature <= temperature_threshold[0]: - v.append("体温过低") - elif t == 1: - # air 空气、湿度 - air_temperature = v[1] - air_humidity = v[2] - prompt = f"室外天气温度为{air_temperature},室外天气湿度{air_humidity},正常吗?如果过高或过低,如何解决?" - if air_temperature_threshold[1] <= air_temperature: - v.append("室外天气温度偏高") - elif air_temperature <= air_temperature_threshold[0]: - v.append("室外天气温度偏低") - elif air_humidity_threshold[1] <= air_humidity: - v.append("室外天气湿度偏高") - elif air_humidity <= air_humidity_threshold[0]: - v.append("室外天气湿度偏低") - elif t == 2: - # danqi 氮气 - danqi = v[1] - prompt = f"空气中氮气浓度为{danqi},正常吗?如果过高或过低,如何解决?" - v.append("氮气浓度偏高") - elif t == 3: - # jiawan 甲烷 - jiawan = v[1] - prompt = f"空气中甲烷浓度为{jiawan},正常吗?如果过高或过低,如何解决?" - v.append("甲烷浓度过高") - elif t == 4: - # zaoyin 噪音 - zaoyin = v[1] - prompt = f"噪音分贝为{zaoyin},正常吗?如果过高或过低,如何解决?" - v.append("噪音强度过高") - elif t == 5: - # yanwu 烟雾 - yanwu = v[1] - prompt = f"烟雾浓度为{yanwu},正常吗?如果过高或过低,如何解决?" - v.append("烟雾浓度偏高") - - v.append("无") - return { - "devices": ["root.warning." + deviceId], - "timestamps": [int(time.time() * 1000)], - "measurements_list": [ml], - "data_types_list": [dt], - "values_list": [v], - "is_aligned": False - } - - -# 数据库创建字段sql语句 -def farm_sql(deviceId, type): - template = "farm" - match type: - case 0: - return RFID_template(template, deviceId) - case 1: - return air_template(template, deviceId) - case _: - return common_template(template, deviceId) - - -def get_client_change_status_sql(timestamp, status): - if status: - return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, True)"] - else: - return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, False)"] - - -# 监控视频接口 -def get_video_url(username): - return f"http://rtsp.lihaink.cn/live/xumu_{username}.live.mp4" - - -# example -# RFID -send1 = { - "m": ["cid", "r", "v"], - "v": ["abcd", "rfid", 10.62], - "t": 0, - "l": 63 -} -# 空气 -send2 = { - "m": ["cid", "at", "ah"], - "v": ["test2", 10.62, 50.22], - "t": 1, - "l": 65 -} -# 普通 -send3 = { - "m": ["cid", "v"], - "v": ["test1", 10.62], - "t": 2, - "l": 50 -} diff --git a/sql/sql.py b/sql/sql.py new file mode 100644 index 0000000..c41c8ef --- /dev/null +++ b/sql/sql.py @@ -0,0 +1,381 @@ +import copy +import time +from datetime import datetime, timedelta + +rfid_type = ["TEXT", "TEXT", "FLOAT", "INT32"] +air_type = ["TEXT", "FLOAT", "FLOAT", "INT32"] +else_type = ["TEXT", "FLOAT", "INT32"] +rfid_measurement = ["iccid", "RFID", "temperature" "type"] +air_measurement = ["iccid", "air_temperature", "air_humidity", "type"] +else_measurement = ["iccid", "value", "type"] +dataTypes = { + 0: rfid_type, + 1: air_type, + 2: else_type, + 3: else_type, + 4: else_type, + 5: else_type, +} +measurements = { + 0: rfid_measurement, + 1: air_measurement, + 2: else_measurement, + 3: else_measurement, + 4: else_measurement, + 5: else_measurement, +} +temperature_threshold = [37, 41] +air_temperature_threshold = [0, 40] +air_humidity_threshold = [30, 90] +danqi_threshold = [1.24] +jiawan_threshold = [3000] +zaoyin_threshold = [75] +yanwu_threshold = [200] +# example +# RFID +send1 = { + "m": ["cid", "r", "v"], + "v": ["abcd", "rfid", 10.62], + "t": 0, + "l": 63 +} +# 空气 +send2 = { + "m": ["cid", "at", "ah"], + "v": ["test2", 10.62, 50.22], + "t": 1, + "l": 65 +} +# 普通 +send3 = { + "m": ["cid", "v"], + "v": ["test1", 10.62], + "t": 2, + "l": 50 +} + + +def has_rfid_sql(rfid): + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + return send_json + + +def get_device_online_sql(iccid, deviceId): + sql = "select * from root.farm.clientId where time >= 0" + # 检查iccid是否有值,如果有,添加到SQL语句中 + if iccid: + sql += f" and iccid = '{iccid}'" + # 检查deviceId是否有值,如果有,添加到SQL语句中 + if deviceId: + sql += f" and deviceId = '{deviceId}'" + send_json = { + "sql": sql + } + return send_json + + +def get_device_sql(iccid, deviceId): + sql = "SELECT * FROM root.device where time>=0" + # 检查iccid是否有值,如果有,添加到SQL语句中 + if iccid: + sql += f" and iccid = '{iccid}'" + # 检查deviceId是否有值,如果有,添加到SQL语句中 + if deviceId: + sql += f" and deviceId = '{deviceId}'" + send_json = { + "sql": sql + } + return send_json + + +def data_query_sql(deviceId): + sql = f"select last * from root.farm.{deviceId}" + send_json = { + "sql": sql + } + return send_json + + +def get_warning_sql(deviceId, limit, offset): + sql = f"select * from root.warning.{deviceId}" + if limit: + sql += f" limit {limit}" + if offset: + sql += f" offset {offset}" + send_json = { + "sql": sql + } + return send_json + + +def rfid_query_sql(rfid): + sql = f"select deviceId from root.rfid where rfid='{rfid}'" + send_json = { + "sql": sql + } + return send_json + + +def rfid_last_query_sql(rfid, deviceId): + sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1" + send_json = { + "sql": sql + } + return send_json + + +def update_register_sql(timestamp, iccid, deviceId, type): + send_json = { + "sql": f"insert into root.device(timestamp, iccid, deviceId, type) values({timestamp}, '{iccid}', '{deviceId}', {type})" + } + return send_json + + +def create_register_sql(iccid, deviceId, type): + send_json = { + "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})" + } + return send_json + + +def insert_device_status_sql(iccid, clientid, deviceId): + send_json = { + "sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', True, '{deviceId}')" + } + return send_json + + +def message_publish_sql(deviceId, m, t, v): + send_json = { + "devices": ["root.farm." + deviceId], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [m], + "data_types_list": [dataTypes[t]], + "values_list": [v], + "is_aligned": False + } + return send_json + + +def get_has_clientId_sql(clientid): + send_json = { + "sql": f"select timestamp from root.farm.clientId where clientId='{clientid}'" + } + return send_json + + +# 获取空气设备历史7天的报警数据sql +def air_device_past_seven_days_sql(deviceId): + current_date = datetime.now().date() + tomorrow = current_date + timedelta(days=1) + past_6_days_date = current_date - timedelta(days=6) + sql = f"select MAX_VALUE(air_temperature),MAX_VALUE(air_humidity) from root.warning.{deviceId} group by ([{past_6_days_date}, {tomorrow}), 1d)" + send_json = { + "sql": sql + } + return send_json + +# 获取普通设备历史7天的报警数据sql +def common_device_past_seven_days_sql(deviceId): + current_date = datetime.now().date() + tomorrow = current_date + timedelta(days=1) + past_6_days_date = current_date - timedelta(days=6) + sql = f"select MAX_VALUE(value) from root.warning.{deviceId} group by ([{past_6_days_date}, {tomorrow}), 1d)" + send_json = { + "sql": sql + } + return send_json + + +# 获取设备所有报警数量的sql +def get_device_all_warning_sql(deviceId): + return {"sql": f"select count(iccid) from root.warning.{deviceId}"} + + +# 获取设备当天报警数量的sql +def get_device_today_warning_sql(deviceId): + now = datetime.now().date() + return { + "sql": f"select count(iccid) from root.warning.{deviceId} where time >= {now}T00:00:00" + } + + +# 改变设备在线状态表的sql +def get_client_change_status_sql(timestamp, status): + if status: + return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, True)"] + else: + return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, False)"] + + +# 创建RFID设备的sql +def RFID_template(type, deviceId): + return [ + f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.temperature(v) WITH datatype=FLOAT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.RFID(r) WITH datatype=TEXT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", + ] + + +# 创建普通设备的sql +def common_template(type, deviceId): + return [ + f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.value(v) WITH datatype=FLOAT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", + ] + + +# 创建空气设备的sql +def air_template(type, deviceId): + return [ + f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.air_temperature(at) WITH datatype=FLOAT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.air_humidity(ah) WITH datatype=FLOAT,ENCODING=PLAIN", + f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", + ] + + +# 创建warning表的sql +def warning_sql(deviceId, type): + template = "warning" + match type: + case 0: + rfid = RFID_template(template, deviceId) + rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") + rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") + return rfid + case 1: + air = air_template(template, deviceId) + air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") + air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") + return air + case _: + common = common_template(template, deviceId) + common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") + common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") + return common + + +# 插入到rfid和deviceId映射关系表 +def rfid_deviceId(rfid, deviceId): + return [ + f"insert into root.rfid(rfid, deviceId) values('{rfid}', '{deviceId}')" + ] + + +# 判断是否为报警数据的函数 +def is_warning(deviceId, v, t): + match t: + case 0: + temperature = v[1] + if temperature_threshold[1] <= temperature or temperature <= temperature_threshold[0]: + return True + case 1: + air_temperature = v[1] + air_humidity = v[2] + if air_temperature_threshold[1] <= air_temperature or air_temperature <= air_temperature_threshold[0]: + return True + if air_humidity_threshold[1] <= air_humidity or air_humidity <= air_humidity_threshold[0]: + return True + case 2: + danqi = v[1] + if danqi >= danqi_threshold[0]: + return True + case 3: + jiawan = v[1] + if jiawan > jiawan_threshold[0]: + return True + case 4: + zaoyin = v[1] + if zaoyin >= zaoyin_threshold[0]: + return True + case 5: + yanwu = v[1] + if yanwu >= yanwu_threshold[0]: + return True + return False + + +# 插入到warning表的sql +def insert_to_warning_sql(deviceId, v, t): + ml = copy.deepcopy(measurements[t]) + dt = copy.deepcopy(dataTypes[t]) + ml.append("reason") + ml.append("solve") + dt.append("TEXT") + dt.append("TEXT") + # 设置原因和解决方案 + # 解决方案可以调用AI进行生成回答 + if t == 0: + # RFID 温度 + temperature = v[1] + prompt = f"动物体温为{temperature},正常吗?如果体温过高或过低,如何解决?" + if temperature_threshold[1] <= temperature: + v.append("体温过高") + elif temperature <= temperature_threshold[0]: + v.append("体温过低") + elif t == 1: + # air 空气、湿度 + air_temperature = v[1] + air_humidity = v[2] + prompt = f"室外天气温度为{air_temperature},室外天气湿度{air_humidity},正常吗?如果过高或过低,如何解决?" + if air_temperature_threshold[1] <= air_temperature: + v.append("室外天气温度偏高") + elif air_temperature <= air_temperature_threshold[0]: + v.append("室外天气温度偏低") + elif air_humidity_threshold[1] <= air_humidity: + v.append("室外天气湿度偏高") + elif air_humidity <= air_humidity_threshold[0]: + v.append("室外天气湿度偏低") + elif t == 2: + # danqi 氮气 + danqi = v[1] + prompt = f"空气中氮气浓度为{danqi},正常吗?如果过高或过低,如何解决?" + v.append("氮气浓度偏高") + elif t == 3: + # jiawan 甲烷 + jiawan = v[1] + prompt = f"空气中甲烷浓度为{jiawan},正常吗?如果过高或过低,如何解决?" + v.append("甲烷浓度过高") + elif t == 4: + # zaoyin 噪音 + zaoyin = v[1] + prompt = f"噪音分贝为{zaoyin},正常吗?如果过高或过低,如何解决?" + v.append("噪音强度过高") + elif t == 5: + # yanwu 烟雾 + yanwu = v[1] + prompt = f"烟雾浓度为{yanwu},正常吗?如果过高或过低,如何解决?" + v.append("烟雾浓度偏高") + + v.append("无") + return { + "devices": ["root.warning." + deviceId], + "timestamps": [int(time.time() * 1000)], + "measurements_list": [ml], + "data_types_list": [dt], + "values_list": [v], + "is_aligned": False + } + + +# 数据库创建字段sql语句 +def farm_sql(deviceId, type): + template = "farm" + match type: + case 0: + return RFID_template(template, deviceId) + case 1: + return air_template(template, deviceId) + case _: + return common_template(template, deviceId) + + +# 监控视频接口 +def get_video_url(username): + return f"http://rtsp.lihaink.cn/live/xumu_{username}.live.mp4" diff --git a/xumu.py b/xumu.py index 75166f6..fd2fc3d 100644 --- a/xumu.py +++ b/xumu.py @@ -1,12 +1,12 @@ import re import json from typing import Any -from datetime import datetime, timedelta import requests import uvicorn from fastapi import FastAPI, Request from pydantic import BaseModel from config import * +from sql.sql import * # fastapi app = FastAPI() @@ -29,15 +29,7 @@ async def video_query(username): @app.get("/api/xumu/warning") async def get_warning(deviceId, limit, offset): try: - sql = f"select * from root.warning.{deviceId}" - if limit: - sql += f" limit {limit}" - if offset: - sql += f" offset {offset}" - # 检查iccid是否有值,如果有,添加到SQL语句中 - send_json = { - "sql": sql - } + send_json = get_warning_sql(deviceId, limit, offset) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: @@ -47,16 +39,7 @@ async def get_warning(deviceId, limit, offset): @app.get("/api/xumu/device/online") async def device_online_query(iccid, deviceId): try: - sql = "select * from root.farm.clientId where time >= 0" - # 检查iccid是否有值,如果有,添加到SQL语句中 - if iccid: - sql += f" and iccid = '{iccid}'" - # 检查deviceId是否有值,如果有,添加到SQL语句中 - if deviceId: - sql += f" and deviceId = '{deviceId}'" - send_json = { - "sql": sql - } + send_json = get_device_online_sql(iccid, deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: @@ -69,10 +52,7 @@ async def data_query(deviceId): try: if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") - sql = f"select last * from root.farm.{deviceId}" - send_json = { - "sql": sql - } + send_json = data_query_sql(deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: @@ -83,10 +63,7 @@ async def rfid_last_query(rfid, deviceId): try: if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") - sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1" - send_json = { - "sql": sql - } + send_json = rfid_last_query_sql(rfid, deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: @@ -106,10 +83,7 @@ async def rest_query(request: Request): async def rfid_query(rfid): # TODO 可能一个rfid有多个iccid设备上传,需要获取所有的设备数据,然后返回,现在这个功能只有一个设备 不一定是最新的 try: - sql = f"select deviceId from root.rfid where rfid='{rfid}'" - send_json = { - "sql": sql - } + send_json = rfid_query_sql(rfid) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=404, msg="404") @@ -125,10 +99,7 @@ async def rfid_query(rfid): async def has_rfid(rfid): try: - sql = f"select deviceId from root.rfid where rfid='{rfid}'" - send_json = { - "sql": sql - } + send_json = has_rfid_sql(rfid) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=404, msg="404") @@ -145,16 +116,7 @@ async def has_rfid(rfid): @app.get("/api/xumu/device/query") async def get_device(iccid, deviceId): try: - sql = "SELECT * FROM root.device where time>=0" - # 检查iccid是否有值,如果有,添加到SQL语句中 - if iccid: - sql += f" and iccid = '{iccid}'" - # 检查deviceId是否有值,如果有,添加到SQL语句中 - if deviceId: - sql += f" and deviceId = '{deviceId}'" - send_json = { - "sql": sql - } + send_json = get_device_sql(iccid, deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: @@ -191,24 +153,18 @@ async def register(request: Request): if theIccid == iccid: return BaseResponse(code=302, msg="该设备已经注册过了") # 如果不一致,则直接进行更新注册 - send_json = { - "sql": f"insert into root.device(timestamp, iccid, deviceId, type) values({timestamp}, '{iccid}', '{deviceId}', {type})" - } + send_json = update_register_sql(timestamp, iccid, deviceId, type) r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) else: # 创建该设备 - send_json = { - "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})" - } + send_json = create_register_sql(iccid, deviceId, type) r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) else: return BaseResponse(code=500, msg="500 Internal error") # 插入设备状态表 - send_json = { - "sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', True, '{deviceId}')" - } + send_json = insert_device_status_sql(iccid, clientid, deviceId) r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) # 创建相应的数据库和字段 @@ -264,14 +220,7 @@ async def message_publish(data): if r["code"] != 200: return BaseResponse(code=500, msg="RFID创建映射失败", data=r) res.append(r) - send_json = { - "devices": ["root.farm." + deviceId], - "timestamps": [int(time.time() * 1000)], - "measurements_list": [m], - "data_types_list": [dataTypes[t]], - "values_list": [v], - "is_aligned": False - } + send_json = message_publish_sql(deviceId, m, t, v) r = requests.post(baseHost + insertUri, headers=headers, json=send_json) r = r.json() res.append(r) @@ -289,8 +238,8 @@ async def message_publish(data): # 改变设备状态接口 async def client_change_status(clientid, status): try: - sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'" - r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query}) + send_json = get_has_clientId_sql(clientid) + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code == 200: r = r.json() values = r["timestamps"] @@ -347,13 +296,10 @@ async def past_seven_days(deviceId): device_type = device_values[1][0] # 如果是空气设备,则有两个值 if device_type == 1: - sql = f"select MAX_VALUE(air_temperature),MAX_VALUE(air_humidity) from root.warning.{deviceId} group by ([{past_6_days_date}, {tomorrow}), 1d)" + send_json = air_device_past_seven_days_sql(deviceId) # 如果是其他设备,则有一个值,RFID不算 else: - sql = f"select MAX_VALUE(value) from root.warning.{deviceId} group by ([{past_6_days_date}, {tomorrow}), 1d)" - send_json = { - "sql": sql - } + send_json = common_device_past_seven_days_sql(deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) data = r.json() timestamps = data["timestamps"] @@ -376,18 +322,16 @@ async def warning_statistics(deviceId): if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") # 获取设备总报警数 - r = requests.post(baseHost + queryUri, headers=headers, - json={"sql": f"select count(iccid) from root.warning.{deviceId}"}) + send_json = get_device_all_warning_sql(deviceId) + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) data = r.json() values = data["values"] total_count = 0 if len(values) != 0: total_count = values[0][0] # 获取设备当天报警数 - now = datetime.now().date() - r = requests.post(baseHost + queryUri, headers=headers, - json={ - "sql": f"select count(iccid) from root.warning.{deviceId} where time >= {now}T00:00:00"}) + send_json = get_device_today_warning_sql(deviceId) + r = requests.post(baseHost + queryUri, headers=headers, json=send_json) data = r.json() values = data["values"] today_count = 0