diff --git a/xumu_plus.py b/xumu_plus.py deleted file mode 100644 index 15538b4..0000000 --- a/xumu_plus.py +++ /dev/null @@ -1,286 +0,0 @@ -import re -import time -import json -from typing import Any - -import requests -import uvicorn -from fastapi import FastAPI, Request -from pydantic import BaseModel -from config import * - -# fastapi -app = FastAPI() - - -# 统一返回数据 -class BaseResponse(BaseModel): - code: int = 200 - msg: str = "success" - data: Any = None - - -# 监控视频接口 -@app.get("/api/xumu/video") -async def video_query(username): - return BaseResponse(data=get_video_url(username)) - - -@app.get("/api/xumu/device/online") -async def device_online_query(iccid, deviceId): - try: - sql = "select * from root.farm.clientId where time >= 0" - # 检查iccid是否有值,如果有,添加到SQL语句中 - if iccid: - sql += f" and iccid = '{iccid}'" - # 检查deviceId是否有值,如果有,添加到SQL语句中 - if deviceId: - sql += f" and deviceId = '{deviceId}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 数据查询接口 -@app.get("/api/xumu/data/query") -async def data_query(deviceId): - try: - if deviceId is None or deviceId == "" or len(deviceId) != 4: - return BaseResponse(code=500, msg="参数错误") - sql = f"select last * from root.farm.{deviceId}" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -async def rfid_last_query(rfid, deviceId): - try: - if deviceId is None or deviceId == "" or len(deviceId) != 4: - return BaseResponse(code=500, msg="参数错误") - sql = f"select * from root.farm.{deviceId} where RFID='{rfid}' order by time desc limit 1" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 原生查询接口 -@app.post("/api/xumu/rest/v2/query") -async def rest_query(request: Request): - data = await request.json() - r = requests.post(baseHost + queryUri, headers=headers, json=data) - return BaseResponse(data=r.json()) - - -# RFID查询接口 -@app.get("/api/xumu/rfid/query") -async def rfid_query(rfid): - try: - sql = f"select deviceId from root.rfid where rfid='{rfid}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - if r.status_code != 200: - return BaseResponse(data=r.json(), code=404, msg="404") - r = r.json() - values = r["values"] - if len(values) == 0: - return BaseResponse(msg="No Such RFID", code=500) - deviceId = values[0][0] - return await rfid_last_query(rfid, deviceId) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -async def has_rfid(rfid): - try: - sql = f"select deviceId from root.rfid where rfid='{rfid}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - if r.status_code != 200: - return BaseResponse(data=r.json(), code=404, msg="404") - r = r.json() - values = r["values"] - if len(values) == 0: - return BaseResponse(msg="No Such RFID", code=500) - return BaseResponse(data=r) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 设备查询接口 -@app.get("/api/xumu/device/query") -async def get_device(iccid, deviceId): - try: - sql = "SELECT * FROM root.device where time>=0" - # 检查iccid是否有值,如果有,添加到SQL语句中 - if iccid: - sql += f" and iccid = '{iccid}'" - # 检查deviceId是否有值,如果有,添加到SQL语句中 - if deviceId: - sql += f" and deviceId = '{deviceId}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 设备注册接口 -@app.post("/api/xumu/device/register") -async def register(request: Request): - try: - data = await request.body() - data = data.decode("utf-8") - data = json.loads(data) - clientid = data["clientid"] - payload = data["payload"] - receive_len = len(payload) - payload = json.loads(payload) - send_len = payload["l"] - if receive_len != send_len: - return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") - deviceId = payload["d"] - iccid = payload["cid"] - type = payload["t"] - # 该设备是否已经创建 - a_device = await get_device(iccid, deviceId) - if a_device.code == 200: - v = a_device.data["values"] - if len(v) != 0: - return BaseResponse(code=302, msg="该设备已经注册过了") - # 注册 - send_json = { - "devices": ["root.device", "root.farm.clientId"], - "timestamps": [int(time.time() * 1000), int(time.time() * 1000)], - "measurements_list": [["iccid", "deviceId", "type"], ["iccid", "clientId", "is_online", "deviceId"]], - "data_types_list": [["TEXT", "TEXT", "INT32"], ["TEXT", "TEXT", "BOOLEAN", "TEXT"]], - "values_list": [[iccid, deviceId, type], [iccid, clientid, True, deviceId]], - "is_aligned": False - } - r = requests.post(baseHost + insertUri, headers=headers, json=send_json) - res = [r.json()] - # 创建相应的数据库和字段 - sql_list = get_sql(deviceId, type) - for sql in sql_list: - send_json = { - "sql": sql - } - r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) - res.append(r.json()) - return BaseResponse(data=res) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -async def message_publish(data): - try: - payload = data["payload"].strip() - receive_len = len(payload) - payload = json.loads(payload) - send_len = payload["l"] - if receive_len != send_len: - return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") - m = payload["m"] - v = payload["v"] - t = payload["t"] - deviceId = v[0] - m.append("t") - v.append(t) - res = [] - # 判断t是否为0,0则代表是RFID设备,需要做一个映射 - if t == 0: - rfid = v[1] - r = await has_rfid(rfid) - if r.code != 200: - sql_list = rfid_deviceId(rfid, deviceId) - sql = sql_list[0] - send_json = { - "sql": sql - } - r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) - if r.status_code == 200: - r = r.json() - if r["code"] != 200: - return BaseResponse(code=500, msg="RFID创建映射失败", data=r) - res.append(r) - send_json = { - "devices": ["root.farm." + deviceId], - "timestamps": [int(time.time() * 1000)], - "measurements_list": [m], - "data_types_list": [dataTypes[t]], - "values_list": [v], - "is_aligned": False - } - r = requests.post(baseHost + insertUri, headers=headers, json=send_json) - r = r.json() - res.append(r) - return BaseResponse(data=res) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 改变设备状态接口 -async def client_change_status(clientid, status): - try: - sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'" - r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query}) - if r.status_code == 200: - r = r.json() - values = r["timestamps"] - if len(values) > 0: - timestamp = values[0] - sql_update_list = get_client_change_status_sql(timestamp, status) - sql_update = sql_update_list[0] - r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update}) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -@app.post("/api/xumu/data/collect") -async def process_data(request: Request): - try: - data = await request.body() - data = data.decode("utf-8") - data = json.loads(data) - event = data["event"] - clientid = data["clientid"] - # print(clientid + "======" + event) - pattern = r'^f\d{1,10}$' - amatch1 = re.match(pattern, clientid) - # 如果两个都不满足,那就错误 - if not amatch1: - return BaseResponse(code=200, msg="error") - # 连接成功处理 - if event == "client.connected": - return await client_change_status(clientid, True) - # 断开连接处理 - elif event == "client.disconnected": - return await client_change_status(clientid, False) - # 消息体处理 - elif event == "message.publish": - return await message_publish(data) - return BaseResponse() - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -if __name__ == '__main__': - uvicorn.run(app, host="0.0.0.0", port=8002)