mqtt_client = new MqttClient($this->host, $this->port); $this->settings = new ConnectionSettings(); $this->settings = $this->settings->setUsername($this->username)->setPassword($this->password); } /** * @throws ConfigurationInvalidException * @throws ConnectingToBrokerFailedException */ public function upload(): void { try { # 获取mqtt请求体 $msg = $this->request->post(); $payload = json_decode($msg["payload"], true); # 获取json字符串长度,用于校验是否丢失包 $payload_receive_len = strlen(json_encode($payload)); # 获取l参数,得到原始发送包长度 $payload_send_len = $payload["l"]; # 如果不相等,name返回 if ($payload_receive_len != $payload_send_len) { return; } # 获取measurements参数,表示字段的名称 $measurements = $payload["m"]; # 获取字段相应的值 $values = $payload["v"]; # 拼接字符串,得到iotdb的数据库设备名称 $device = sprintf("%s.%s", "root.farm", $payload["v"][0]); # 构造json,通过mqtt发送到iotdb数据库 $send_array = array( "device" => $device, "timestamp" => intval(microtime(true) * 1000), "measurements" => $measurements, "values" => $values ); # 数组转为json字符串 $send_json = json_encode($send_array); dump($send_json); # 连接mqtt $this->mqtt_client->connect($this->settings); # 发送消息 $this->mqtt_client->publish("xumu", $send_json); } catch (Exception $e) { dump($e); } } }