# 数据库插入类型 import base64 import copy import time rfid_type = ["TEXT", "TEXT", "FLOAT", "INT32"] air_type = ["TEXT", "FLOAT", "FLOAT", "INT32"] else_type = ["TEXT", "FLOAT", "INT32"] rfid_measurement = ["iccid", "RFID", "temperature" "type"] air_measurement = ["iccid", "air_temperature", "air_humidity", "type"] else_measurement = ["iccid", "value", "type"] dataTypes = { 0: rfid_type, 1: air_type, 2: else_type, 3: else_type, 4: else_type, 5: else_type, } measurements = { 0: rfid_measurement, 1: air_measurement, 2: else_measurement, 3: else_measurement, 4: else_measurement, 5: else_measurement, } baseHost = "https://iot.lihaink.cn/iotdb_restapi" # 注意这里前面不能加/ insertUri = "rest/v2/insertRecords" queryUri = "rest/v2/query" nonQueryUri = "rest/v2/nonQuery" # 鉴权 username = 'root' password = 'root' code = (username + ":" + password).encode("utf-8") token = base64.encodebytes(code).decode("utf-8").strip() headers = { 'ContentType': 'application/json', 'Authorization': "Basic " + token } def RFID_template(type, deviceId): return [ f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.temperature(v) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.RFID(r) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", ] def common_template(type, deviceId): return [ f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.value(v) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", ] def air_template(type, deviceId): return [ f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.air_temperature(at) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.air_humidity(ah) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", ] def warning_sql(deviceId, type): template = "warning" match type: case 0: rfid = RFID_template(template, deviceId) rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") rfid.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") return rfid case 1: air = air_template(template, deviceId) air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") air.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") return air case _: common = common_template(template, deviceId) common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.reason(rs) WITH datatype=TEXT,ENCODING=PLAIN") common.append(f"CREATE TIMESERIES root.{template}.{deviceId}.solve(s) WITH datatype=TEXT,ENCODING=PLAIN") return common def rfid_deviceId(rfid, deviceId): return [ f"insert into root.rfid(rfid, deviceId) values('{rfid}', '{deviceId}')" ] temperature_threshold = [37, 41] air_temperature_threshold = [0, 40] air_humidity_threshold = [30, 80] danqi_threshold = [1.24] jiawan_threshold = [3000] zaoyin_threshold = [55] yanwu_threshold = [200] def is_warning(deviceId, v, t): match t: case 0: temperature = v[1] if temperature_threshold[1] <= temperature or temperature <= temperature_threshold[0]: return True case 1: air_temperature = v[1] air_humidity = v[2] if air_temperature_threshold[1] <= air_temperature or air_temperature <= air_temperature_threshold[0]: return True if air_humidity_threshold[1] <= air_humidity or air_humidity <= air_humidity_threshold[0]: return True case 2: danqi = v[1] if danqi >= danqi_threshold[0]: return True case 3: jiawan = v[1] if jiawan > jiawan_threshold[0]: return True case 4: zaoyin = v[1] if zaoyin >= zaoyin_threshold[0]: return True case 5: yanwu = v[1] if yanwu >= yanwu_threshold[0]: return True return False def insert_to_warning_sql(deviceId, v, t): ml = copy.deepcopy(measurements[t]) dt = copy.deepcopy(dataTypes[t]) ml.append("reason") ml.append("solve") dt.append("TEXT") dt.append("TEXT") # 设置原因和解决方案 # 解决方案可以调用AI进行生成回答 if t == 0: # RFID 温度 temperature = v[1] prompt = f"动物体温为{temperature},正常吗?如果体温过高或过低,如何解决?" if temperature_threshold[1] <= temperature: v.append("体温过高") elif temperature <= temperature_threshold[0]: v.append("体温过低") elif t == 1: # air 空气、湿度 air_temperature = v[1] air_humidity = v[2] prompt = f"室外天气温度为{air_temperature},室外天气湿度{air_humidity},正常吗?如果过高或过低,如何解决?" if air_temperature_threshold[1] <= air_temperature: v.append("室外天气温度偏高") elif air_temperature <= air_temperature_threshold[0]: v.append("室外天气温度偏低") elif air_humidity_threshold[1] <= air_humidity: v.append("室外天气湿度偏高") elif air_humidity <= air_humidity_threshold[0]: v.append("室外天气湿度偏低") elif t == 2: # danqi 氮气 danqi = v[1] prompt = f"空气中氮气浓度为{danqi},正常吗?如果过高或过低,如何解决?" v.append("氮气浓度偏高") elif t == 3: # jiawan 甲烷 jiawan = v[1] prompt = f"空气中甲烷浓度为{jiawan},正常吗?如果过高或过低,如何解决?" v.append("甲烷浓度过高") elif t == 4: # zaoyin 噪音 zaoyin = v[1] prompt = f"噪音分贝为{zaoyin},正常吗?如果过高或过低,如何解决?" v.append("噪音强度过高") elif t == 5: # yanwu 烟雾 yanwu = v[1] prompt = f"烟雾浓度为{yanwu},正常吗?如果过高或过低,如何解决?" v.append("烟雾浓度偏高") v.append("无") return { "devices": ["root.warning." + deviceId], "timestamps": [int(time.time() * 1000)], "measurements_list": [ml], "data_types_list": [dt], "values_list": [v], "is_aligned": False } # 数据库创建字段sql语句 def farm_sql(deviceId, type): template = "farm" match type: case 0: return RFID_template(template, deviceId) case 1: return air_template(template, deviceId) case _: return common_template(template, deviceId) def get_client_change_status_sql(timestamp, status): if status: return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, True)"] else: return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, False)"] # 监控视频接口 def get_video_url(username): return f"http://rtsp.lihaink.cn/live/xumu_{username}.live.mp4" # example # RFID send1 = { "m": ["cid", "r", "v"], "v": ["abcd", "rfid", 10.62], "t": 0, "l": 63 } # 空气 send2 = { "m": ["cid", "at", "ah"], "v": ["test2", 10.62, 50.22], "t": 1, "l": 65 } # 普通 send3 = { "m": ["cid", "v"], "v": ["test1", 10.62], "t": 2, "l": 50 }