import re import json from typing import Any import requests import uvicorn from fastapi import FastAPI, Request from pydantic import BaseModel from config import * from sql.sql import * # fastapi app = FastAPI() # 统一返回数据 class BaseResponse(BaseModel): code: int = 200 msg: str = "success" data: Any = None # 监控视频接口 @app.get("/api/xumu/video") async def video_query(username): return BaseResponse(data=get_video_url(username)) # 报警数据接口 @app.get("/api/xumu/warning") async def get_warning(deviceId, limit, offset): try: send_json = get_warning_sql(deviceId, limit, offset) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) @app.get("/api/xumu/device/online") async def device_online_query(iccid, deviceId): try: send_json = get_device_online_sql(iccid, deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 数据查询接口 @app.get("/api/xumu/data/query") async def data_query(deviceId): try: if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") send_json = data_query_sql(deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) async def rfid_last_query(rfid, deviceId): try: if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") send_json = rfid_last_query_sql(rfid, deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 原生查询接口 @app.post("/api/xumu/rest/v2/query") async def rest_query(request: Request): data = await request.json() r = requests.post(baseHost + queryUri, headers=headers, json=data) return BaseResponse(data=r.json()) # RFID查询接口 @app.get("/api/xumu/rfid/query") async def rfid_query(rfid): # TODO 可能一个rfid有多个iccid设备上传,需要获取所有的设备数据,然后返回,现在这个功能只有一个设备 不一定是最新的 try: send_json = rfid_query_sql(rfid) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=404, msg="404") r = r.json() values = r["values"] if len(values) == 0: return BaseResponse(msg="No Such RFID", code=500) deviceId = values[0][0] return await rfid_last_query(rfid, deviceId) except Exception as e: return BaseResponse(code=500, msg=str(e)) async def has_rfid(rfid): try: send_json = has_rfid_sql(rfid) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=404, msg="404") r = r.json() values = r["values"] if len(values) == 0: return BaseResponse(msg="No Such RFID", code=500) return BaseResponse(data=r) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 设备查询接口 @app.get("/api/xumu/device/query") async def get_device(iccid, deviceId): try: send_json = get_device_sql(iccid, deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 设备注册接口 @app.post("/api/xumu/device/register") async def register(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) clientid = data["clientid"] payload = data["payload"] receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: return BaseResponse(code=301, msg=f"data valid error, receive_len:{receive_len}") deviceId = payload["d"] iccid = payload["cid"] type = payload["t"] res = [] # 查看设备是否已经创建 a_device = await get_device(None, deviceId) if a_device.code == 200: v = a_device.data["values"] t = a_device.data["timestamps"] if len(v) != 0: timestamp = t[0] theIccid = v[0] # 如果设备上传的注册iccid和数据库查询的iccid一致,则说明已经注册过了 if theIccid == iccid: return BaseResponse(code=302, msg="该设备已经注册过了") # 如果不一致,则直接进行更新注册 send_json = update_register_sql(timestamp, iccid, deviceId, type) r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) else: # 创建该设备 send_json = create_register_sql(iccid, deviceId, type) r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) # 插入设备状态表,如果状态表有,那么不插入设备表,如果状态表没有,则插入 send_json = has_device_status_sql(clientid) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code != 200: return BaseResponse(data=r.json(), code=500, msg="500 Internal Error") data = r.json() if len(data["values"]) == 0: send_json = insert_device_status_sql(iccid, clientid, deviceId) r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) else: return BaseResponse(code=500, msg="500 Internal error") # 创建相应的数据库和字段 sql_list = farm_sql(deviceId, type) for sql in sql_list: send_json = { "sql": sql } r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) # 创建相应的数据库和字段 sql_list = warning_sql(deviceId, type) for sql in sql_list: send_json = { "sql": sql } r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) res.append(r.json()) return BaseResponse(data=res) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 数据发布接口 async def message_publish(data): try: payload = data["payload"].strip() receive_len = len(payload) payload = json.loads(payload) send_len = payload["l"] if receive_len != send_len: return BaseResponse(code=301, msg=f"data valid error,receive_len:{receive_len}") m = payload["m"] v = payload["v"] t = payload["t"] deviceId = v[0] m.append("t") v.append(t) res = [] # 判断t是否为0,0则代表是RFID设备,需要做一个映射 if t == 0: rfid = v[1] r = await has_rfid(rfid) if r.code != 200: sql_list = rfid_deviceId(rfid, deviceId) sql = sql_list[0] send_json = { "sql": sql } r = requests.post(baseHost + nonQueryUri, headers=headers, json=send_json) if r.status_code == 200: r = r.json() if r["code"] != 200: return BaseResponse(code=500, msg="RFID创建映射失败", data=r) res.append(r) send_json = message_publish_sql(deviceId, m, t, v) r = requests.post(baseHost + insertUri, headers=headers, json=send_json) r = r.json() res.append(r) if is_warning(deviceId, v, t): # 如果是报警数据,则发送到报警数据表 send_json = insert_to_warning_sql(deviceId, v, t) r = requests.post(baseHost + insertUri, headers=headers, json=send_json) r = r.json() res.append(r) return BaseResponse(data=res) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 改变设备状态接口 async def client_change_status(clientid, status): try: send_json = get_has_clientId_sql(clientid) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) if r.status_code == 200: r = r.json() values = r["timestamps"] if len(values) > 0: timestamp = values[0] sql_update_list = get_client_change_status_sql(timestamp, status) sql_update = sql_update_list[0] r = requests.post(baseHost + nonQueryUri, headers=headers, json={"sql": sql_update}) return BaseResponse(data=r.json()) except Exception as e: return BaseResponse(code=500, msg=str(e)) @app.post("/api/xumu/data/collect") async def process_data(request: Request): try: data = await request.body() data = data.decode("utf-8") data = json.loads(data) event = data["event"] clientid = data["clientid"] # print(clientid + "======" + event) pattern = r'^f\d{1,10}$' amatch1 = re.match(pattern, clientid) # 如果两个都不满足,那就错误 if not amatch1: return BaseResponse(code=500, msg="clientid error") # 连接成功处理 if event == "client.connected": return await client_change_status(clientid, True) # 断开连接处理 elif event == "client.disconnected": return await client_change_status(clientid, False) # 消息体处理 elif event == "message.publish": return await message_publish(data) return BaseResponse() except Exception as e: return BaseResponse(code=500, msg=str(e)) # 报警数据最近7天的数据 @app.get("/api/xumu/warning/past_seven_days") async def past_seven_days(deviceId): try: the_decvice = await get_device(None, deviceId) device_type = None if the_decvice.code == 200: device_values = the_decvice.data["values"] if len(device_values) != 0: device_type = device_values[1][0] # 如果是空气设备,则有两个值 if device_type == 1: send_json = air_device_past_seven_days_sql(deviceId) # 如果是其他设备,则有一个值,RFID不算 else: send_json = common_device_past_seven_days_sql(deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) data = r.json() timestamps = data["timestamps"] times = [] for timestamp in timestamps: dt = datetime.fromtimestamp(timestamp / 1000) # 因为时间戳是以毫秒为单位的,所以需要除以1000 # 将datetime对象格式化为只包含年月日的字符串 year_month_day_str = dt.strftime('%Y-%m-%d') times.append(year_month_day_str) data["timestamps"] = times return BaseResponse(data=data) except Exception as e: return BaseResponse(code=500, msg=str(e)) # 报警数据统计接口 @app.get("/api/xumu/warning/statistics") async def warning_statistics(deviceId): try: if deviceId is None or deviceId == "" or len(deviceId) != 4: return BaseResponse(code=500, msg="参数错误") # 获取设备总报警数 send_json = get_device_all_warning_sql(deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) data = r.json() values = data["values"] total_count = 0 if len(values) != 0: total_count = values[0][0] # 获取设备当天报警数 send_json = get_device_today_warning_sql(deviceId) r = requests.post(baseHost + queryUri, headers=headers, json=send_json) data = r.json() values = data["values"] today_count = 0 if len(values) != 0: today_count = values[0][0] return_data = {"totoal_warning_count": total_count, "today_warning_count": today_count} return BaseResponse(data=return_data) except Exception as e: return BaseResponse(code=500, msg=str(e)) if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8002)