xumu_iotdb/sql/sql.py

382 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import time
from datetime import datetime, timedelta
rfid_type = ["TEXT", "TEXT", "FLOAT", "INT32"]
air_type = ["TEXT", "FLOAT", "FLOAT", "INT32"]
else_type = ["TEXT", "FLOAT", "INT32"]
rfid_measurement = ["iccid", "RFID", "temperature" "type"]
air_measurement = ["iccid", "air_temperature", "air_humidity", "type"]
else_measurement = ["iccid", "value", "type"]
dataTypes = {
0: rfid_type,
1: air_type,
2: else_type,
3: else_type,
4: else_type,
5: else_type,
}
measurements = {
0: rfid_measurement,
1: air_measurement,
2: else_measurement,
3: else_measurement,
4: else_measurement,
5: else_measurement,
}
temperature_threshold = [37, 41]
air_temperature_threshold = [0, 40]
air_humidity_threshold = [30, 90]
danqi_threshold = [1.24]
jiawan_threshold = [3000]
zaoyin_threshold = [75]
yanwu_threshold = [200]
# example
# RFID
send1 = {
"m": ["cid", "r", "v"],
"v": ["abcd", "rfid", 10.62],
"t": 0,
"l": 63
}
# 空气
send2 = {
"m": ["cid", "at", "ah"],
"v": ["test2", 10.62, 50.22],
"t": 1,
"l": 65
}
# 普通
send3 = {
"m": ["cid", "v"],
"v": ["test1", 10.62],
"t": 2,
"l": 50
}
def has_rfid_sql(rfid):
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
return send_json
def get_device_online_sql(iccid, deviceId):
sql = "select * from root.farm.clientId where time >= 0"
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
return send_json
def get_device_sql(iccid, deviceId):
sql = "SELECT * FROM root.device where time>=0"
# 检查iccid是否有值如果有添加到SQL语句中
if iccid:
sql += f" and iccid = '{iccid}'"
# 检查deviceId是否有值如果有添加到SQL语句中
if deviceId:
sql += f" and deviceId = '{deviceId}'"
send_json = {
"sql": sql
}
return send_json
def data_query_sql(deviceId):
sql = f"select last * from root.farm.{deviceId}"
send_json = {
"sql": sql
}
return send_json
def get_warning_sql(deviceId, limit, offset):
sql = f"select * from root.warning.{deviceId}"
if limit:
sql += f" limit {limit}"
if offset:
sql += f" offset {offset}"
send_json = {
"sql": sql
}
return send_json
def rfid_query_sql(rfid):
sql = f"select deviceId from root.rfid where rfid='{rfid}'"
send_json = {
"sql": sql
}
return send_json
def rfid_last_query_sql(rfid, deviceId):
sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1"
send_json = {
"sql": sql
}
return send_json
def update_register_sql(timestamp, iccid, deviceId, type):
send_json = {
"sql": f"insert into root.device(timestamp, iccid, deviceId, type) values({timestamp}, '{iccid}', '{deviceId}', {type})"
}
return send_json
def create_register_sql(iccid, deviceId, type):
send_json = {
"sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})"
}
return send_json
def insert_device_status_sql(iccid, clientid, deviceId):
send_json = {
"sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', True, '{deviceId}')"
}
return send_json
def message_publish_sql(deviceId, m, t, v):
send_json = {
"devices": ["root.farm." + deviceId],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [m],
"data_types_list": [dataTypes[t]],
"values_list": [v],
"is_aligned": False
}
return send_json
def get_has_clientId_sql(clientid):
send_json = {
"sql": f"select timestamp from root.farm.clientId where clientId='{clientid}'"
}
return send_json
# 获取空气设备历史7天的报警数据sql
def air_device_past_seven_days_sql(deviceId):
current_date = datetime.now().date()
tomorrow = current_date + timedelta(days=1)
past_6_days_date = current_date - timedelta(days=6)
sql = f"select MAX_VALUE(air_temperature),MAX_VALUE(air_humidity) from root.warning.{deviceId} group by ([{past_6_days_date}, {tomorrow}), 1d)"
send_json = {
"sql": sql
}
return send_json
# 获取普通设备历史7天的报警数据sql
def common_device_past_seven_days_sql(deviceId):
current_date = datetime.now().date()
tomorrow = current_date + timedelta(days=1)
past_6_days_date = current_date - timedelta(days=6)
sql = f"select MAX_VALUE(value) from root.warning.{deviceId} group by ([{past_6_days_date}, {tomorrow}), 1d)"
send_json = {
"sql": sql
}
return send_json
# 获取设备所有报警数量的sql
def get_device_all_warning_sql(deviceId):
return {"sql": f"select count(iccid) from root.warning.{deviceId}"}
# 获取设备当天报警数量的sql
def get_device_today_warning_sql(deviceId):
now = datetime.now().date()
return {
"sql": f"select count(iccid) from root.warning.{deviceId} where time >= {now}T00:00:00"
}
# 改变设备在线状态表的sql
def get_client_change_status_sql(timestamp, status):
if status:
return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, True)"]
else:
return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, False)"]
# 创建RFID设备的sql
def RFID_template(type, deviceId):
return [
f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.temperature(v) WITH datatype=FLOAT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.RFID(r) WITH datatype=TEXT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN",
]
# 创建普通设备的sql
def common_template(type, deviceId):
return [
f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.value(v) WITH datatype=FLOAT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN",
]
# 创建空气设备的sql
def air_template(type, deviceId):
return [
f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.air_temperature(at) WITH datatype=FLOAT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.air_humidity(ah) WITH datatype=FLOAT,ENCODING=PLAIN",
f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN",
]
# 创建warning表的sql
def warning_sql(deviceId, type):
template = "warning"
match type:
case 0:
rfid = RFID_template(template, deviceId)
rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN")
rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN")
return rfid
case 1:
air = air_template(template, deviceId)
air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN")
air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN")
return air
case _:
common = common_template(template, deviceId)
common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN")
common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN")
return common
# 插入到rfid和deviceId映射关系表
def rfid_deviceId(rfid, deviceId):
return [
f"insert into root.rfid(rfid, deviceId) values('{rfid}', '{deviceId}')"
]
# 判断是否为报警数据的函数
def is_warning(deviceId, v, t):
match t:
case 0:
temperature = v[1]
if temperature_threshold[1] <= temperature or temperature <= temperature_threshold[0]:
return True
case 1:
air_temperature = v[1]
air_humidity = v[2]
if air_temperature_threshold[1] <= air_temperature or air_temperature <= air_temperature_threshold[0]:
return True
if air_humidity_threshold[1] <= air_humidity or air_humidity <= air_humidity_threshold[0]:
return True
case 2:
danqi = v[1]
if danqi >= danqi_threshold[0]:
return True
case 3:
jiawan = v[1]
if jiawan > jiawan_threshold[0]:
return True
case 4:
zaoyin = v[1]
if zaoyin >= zaoyin_threshold[0]:
return True
case 5:
yanwu = v[1]
if yanwu >= yanwu_threshold[0]:
return True
return False
# 插入到warning表的sql
def insert_to_warning_sql(deviceId, v, t):
ml = copy.deepcopy(measurements[t])
dt = copy.deepcopy(dataTypes[t])
ml.append("reason")
ml.append("solve")
dt.append("TEXT")
dt.append("TEXT")
# 设置原因和解决方案
# 解决方案可以调用AI进行生成回答
if t == 0:
# RFID 温度
temperature = v[1]
prompt = f"动物体温为{temperature},正常吗?如果体温过高或过低,如何解决?"
if temperature_threshold[1] <= temperature:
v.append("体温过高")
elif temperature <= temperature_threshold[0]:
v.append("体温过低")
elif t == 1:
# air 空气、湿度
air_temperature = v[1]
air_humidity = v[2]
prompt = f"室外天气温度为{air_temperature},室外天气湿度{air_humidity},正常吗?如果过高或过低,如何解决?"
if air_temperature_threshold[1] <= air_temperature:
v.append("室外天气温度偏高")
elif air_temperature <= air_temperature_threshold[0]:
v.append("室外天气温度偏低")
elif air_humidity_threshold[1] <= air_humidity:
v.append("室外天气湿度偏高")
elif air_humidity <= air_humidity_threshold[0]:
v.append("室外天气湿度偏低")
elif t == 2:
# danqi 氮气
danqi = v[1]
prompt = f"空气中氮气浓度为{danqi},正常吗?如果过高或过低,如何解决?"
v.append("氮气浓度偏高")
elif t == 3:
# jiawan 甲烷
jiawan = v[1]
prompt = f"空气中甲烷浓度为{jiawan},正常吗?如果过高或过低,如何解决?"
v.append("甲烷浓度过高")
elif t == 4:
# zaoyin 噪音
zaoyin = v[1]
prompt = f"噪音分贝为{zaoyin},正常吗?如果过高或过低,如何解决?"
v.append("噪音强度过高")
elif t == 5:
# yanwu 烟雾
yanwu = v[1]
prompt = f"烟雾浓度为{yanwu},正常吗?如果过高或过低,如何解决?"
v.append("烟雾浓度偏高")
v.append("")
return {
"devices": ["root.warning." + deviceId],
"timestamps": [int(time.time() * 1000)],
"measurements_list": [ml],
"data_types_list": [dt],
"values_list": [v],
"is_aligned": False
}
# 数据库创建字段sql语句
def farm_sql(deviceId, type):
template = "farm"
match type:
case 0:
return RFID_template(template, deviceId)
case 1:
return air_template(template, deviceId)
case _:
return common_template(template, deviceId)
# 监控视频接口
def get_video_url(username):
return f"http://rtsp.lihaink.cn/live/xumu_{username}.live.mp4"