# 数据库插入类型 import base64 import time rfid_type = ["TEXT", "TEXT", "FLOAT", "INT32"] air_type = ["TEXT", "FLOAT", "FLOAT", "INT32"] else_type = ["TEXT", "FLOAT", "INT32"] rfid_measurement = ["iccid", "temperature", "RFID", "type"] air_measurement = ["iccid", "air_temperature", "air_humidity", "type"] else_measurement = ["iccid", "value", "type"] dataTypes = { 0: rfid_type, 1: air_type, 2: else_type, 3: else_type, 4: else_type, 5: else_type, } measurements = { 0: rfid_measurement, 1: air_measurement, 2: else_measurement, 3: else_measurement, 4: else_measurement, 5: else_measurement, } baseHost = "https://iot.lihaink.cn/iotdb_restapi" # 注意这里前面不能加/ insertUri = "rest/v2/insertRecords" queryUri = "rest/v2/query" nonQueryUri = "rest/v2/nonQuery" # 鉴权 username = 'root' password = 'root' code = (username + ":" + password).encode("utf-8") token = base64.encodebytes(code).decode("utf-8").strip() headers = { 'ContentType': 'application/json', 'Authorization': "Basic " + token } def RFID_template(type, deviceId): return [ f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.temperature(v) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.RFID(r) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", ] def common_template(type, deviceId): return [ f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.value(v) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", ] def air_template(type, deviceId): return [ f"CREATE TIMESERIES root.{type}.{deviceId}.iccid(cid) WITH datatype=TEXT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.air_temperature(at) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.air_humidity(ah) WITH datatype=FLOAT,ENCODING=PLAIN", f"CREATE TIMESERIES root.{type}.{deviceId}.type(t) WITH datatype=INT32,ENCODING=PLAIN", ] def warning_sql(deviceId, type): template = "warning" match type: case 0: return RFID_template(template, deviceId) case 1: return air_template(template, deviceId) case _: return common_template(template, deviceId) def rfid_deviceId(rfid, deviceId): return [ f"insert into root.rfid(rfid, deviceId) values('{rfid}', '{deviceId}')" ] def is_warning(deviceId, v, t): match t: case 0: temperature = v[1] if 41.0 <= temperature or temperature <= 37.0: return True case 1: air_temperature = v[1] air_humidity = v[2] if 40.0 <= air_temperature or air_temperature <= 0: return True if 80.0 <= air_humidity or air_humidity <= 30.0: return True case 2: danqi = v[1] if danqi >= 1.24: return True case 3: jiawan = v[1] if jiawan > 1000: return True case 4: zaoyin = v[1] if zaoyin >= 55: return True case 5: yanwu = v[1] if yanwu >= 200: return True return False def insert_to_warning_sql(deviceId, v, t): return { "devices": ["root.warning." + deviceId], "timestamps": [int(time.time() * 1000)], "measurements_list": [measurements[t]], "data_types_list": [dataTypes[t]], "values_list": [v], "is_aligned": False } # 数据库创建字段sql语句 def farm_sql(deviceId, type): template = "farm" match type: case 0: return RFID_template(template, deviceId) case 1: return air_template(template, deviceId) case _: return common_template(template, deviceId) def get_client_change_status_sql(timestamp, status): if status: return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, True)"] else: return [f"insert into root.farm.clientId(timestamp, is_online) values({timestamp}, False)"] # 监控视频接口 def get_video_url(username): return f"http://rtsp.lihaink.cn/live/xumu_{username}.live.mp4" # example # RFID send1 = { "m": ["cid", "r", "v"], "v": ["abcd", "rfid", 10.62], "t": 0, "l": 63 } # 空气 send2 = { "m": ["cid", "at", "ah"], "v": ["test2", 10.62, 50.22], "t": 1, "l": 65 } # 普通 send3 = { "m": ["cid", "v"], "v": ["test1", 10.62], "t": 2, "l": 50 }