diff --git a/xumu.py b/xumu.py index 0756616..73fcf67 100644 --- a/xumu.py +++ b/xumu.py @@ -262,9 +262,13 @@ async def process_data(request: Request): data = json.loads(data) event = data["event"] clientid = data["clientid"] - pattern = r"test" - a = re.match(pattern, clientid) - if not a: + pattern = r'^farm_[a-zA-Z0-9]{12}$' + amatch = re.match(pattern, clientid) + if not amatch: + return BaseResponse(code=500, msg="不符合畜牧clientId") + pattern = r'^test' + amatch = re.match(pattern, clientid) + if not amatch: return BaseResponse(code=500, msg="不符合畜牧clientId") # 连接成功处理 if event == "client.connected": diff --git a/xumu_ori.py b/xumu_ori.py index 2544052..2777865 100644 --- a/xumu_ori.py +++ b/xumu_ori.py @@ -164,8 +164,6 @@ async def process_data(request: Request): try: data = await request.body() data = data.decode("utf-8") - print(data) - return data = json.loads(data) # 连接成功处理 # 断开连接处理 diff --git a/xumu_test.py b/xumu_test.py deleted file mode 100644 index 7340199..0000000 --- a/xumu_test.py +++ /dev/null @@ -1,258 +0,0 @@ -import time -import json -from typing import Any - -import requests -import uvicorn -from fastapi import FastAPI, Request -from pydantic import BaseModel -from config import * - -# fastapi -app = FastAPI() - - -# 统一返回数据 -class BaseResponse(BaseModel): - code: int = 200 - msg: str = "success" - data: Any = None - - -@app.get("/api/xumu/video") -async def data_query(username): - return BaseResponse(data=get_video_url(username)) - - -# 数据查询接口 -@app.get("/api/xumu/data/query") -async def data_query(deviceId): - try: - if deviceId is None or deviceId == "" or len(deviceId) != 4: - return BaseResponse(code=500, msg="参数错误") - sql = f"select last * from root.farm.{deviceId}" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 原生查询接口 -@app.post("/api/xumu/rest/v2/query") -async def rest_query(request: Request): - data = await request.json() - r = requests.post(baseHost + queryUri, headers=headers, json=data) - return BaseResponse(data=r.json()) - - -# RFID查询接口 -@app.get("/api/xumu/rfid/query") -async def rfid_query(rfid): - try: - sql = f"select deviceId from root.rfid where rfid='{rfid}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - if r.status_code != 200: - return BaseResponse(data=r.json(), code=404, msg="404") - r = r.json() - values = r["values"] - if len(values) == 0: - return BaseResponse(msg="No Such RFID", code=500) - deviceId = values[0][0] - return await data_query(deviceId) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -async def has_rfid(rfid): - try: - sql = f"select deviceId from root.rfid where rfid='{rfid}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - if r.status_code != 200: - return BaseResponse(data=r.json(), code=404, msg="404") - r = r.json() - values = r["values"] - if len(values) == 0: - return BaseResponse(msg="No Such RFID", code=500) - return BaseResponse(data=r) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 设备查询接口 -@app.get("/api/xumu/device/query") -async def get_device(iccid, deviceId): - try: - sql = "SELECT * FROM root.device where time>=0" - # 检查iccid是否有值,如果有,添加到SQL语句中 - if iccid: - sql += f" and iccid = '{iccid}'" - # 检查deviceId是否有值,如果有,添加到SQL语句中 - if deviceId: - sql += f" and deviceId = '{deviceId}'" - send_json = { - "sql": sql - } - r = requests.post(baseHost + queryUri, headers=headers, json=send_json) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 设备注册接口 -@app.post("/api/xumu/device/register") -async def register(request: Request): - try: - data = await request.body() - data = data.decode("utf-8") - data = json.loads(data) - clientid = data["clientid"] - payload = data["payload"] - receive_len = len(payload) - payload = json.loads(payload) - send_len = payload["l"] - if receive_len != send_len: - print(receive_len) - return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") - deviceId = payload["d"] - iccid = payload["cid"] - type = payload["t"] - # 该设备是否已经创建 - a_device = await get_device(iccid, deviceId) - if a_device.code == 200: - v = a_device.data["values"] - if len(v) != 0: - return BaseResponse(code=302, msg="该设备已经注册过了") - # 创建该设备 - send_json = { - "devices": ["root.device"], - "timestamps": [int(time.time() * 1000)], - "measurements_list": [["iccid", "deviceId", "type"]], - "data_types_list": [["TEXT", "TEXT", "INT32"]], - "values_list": [[iccid, deviceId, type]], - "is_aligned": False - } - # TODO 测试 - # send_json = { - # "sql": f"insert into root.device(iccid, deviceId, type) values('{iccid}', '{deviceId}', {type})" - # } - res = [] - r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) - res.append(r.json()) - # 插入设备状态表 - send_json = { - "sql": f"insert into root.farm.clientId(iccid, clientId, is_online, deviceId) values('{iccid}','{clientid}', False, '{deviceId}')" - } - r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) - res.append(r.json()) - # 创建相应的数据库和字段 - sql_list = get_sql(deviceId, type) - for sql in sql_list: - send_json = { - "sql": sql - } - r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) - res.append(r.json()) - return BaseResponse(data=res) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -async def message_publish(data): - try: - payload = data["payload"].strip() - receive_len = len(payload) - payload = json.loads(payload) - send_len = payload["l"] - if receive_len != send_len: - print("校验错误,收到的消息长度:", receive_len) - return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") - m = payload["m"] - v = payload["v"] - t = payload["t"] - deviceId = v[0] - m.append("t") - v.append(t) - res = [] - # 判断t是否为0,0则代表是RFID设备,需要做一个映射 - if t == 0: - rfid = v[1] - r = await has_rfid(rfid) - if r.code != 200: - sql_list = rfid_deviceId(rfid, deviceId) - sql = sql_list[0] - send_json = { - "sql": sql - } - r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) - if r.status_code == 200: - r = r.json() - if r["code"] != 200: - return BaseResponse(code=500, msg="RFID创建映射失败", data=r) - res.append(r) - send_json = { - "devices": ["root.farm." + deviceId], - "timestamps": [int(time.time() * 1000)], - "measurements_list": [m], - "data_types_list": [dataTypes[t]], - "values_list": [v], - "is_aligned": False - } - r = requests.post(baseHost + insertUri, headers=headers, json=send_json) - r = r.json() - res.append(r) - return BaseResponse(data=res) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -# 改变设备状态接口 -async def client_change_status(clientid, status): - print(clientid) - try: - sql_query = f"select timestamp from root.farm.clientId where clientId='{clientid}'" - r = requests.post(baseHost + queryUri, headers=headers, json={"sql": sql_query}) - if r.status_code == 200: - r = r.json() - values = r["timestamps"] - if len(values) > 0: - timestamp = values[0] - sql_update_list = get_client_change_status_sql(timestamp, status) - sql_update = sql_update_list[0] - r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update}) - return BaseResponse(data=r.json()) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -@app.post("/api/xumu/data/collect") -async def process_data(request: Request): - try: - data = await request.body() - data = data.decode("utf-8") - data = json.loads(data) - event = data["event"] - clientid = data["clientid"] - # 连接成功处理 - if event == "client.connected": - return await client_change_status(clientid, True) - # 断开连接处理 - elif event == "client.disconnected": - return await client_change_status(clientid, False) - # 消息体处理 - elif event == "message.publish": - return await message_publish(data) - except Exception as e: - return BaseResponse(code=500, msg=str(e)) - - -if __name__ == '__main__': - uvicorn.run(app, host="0.0.0.0", port=8002)