"demo2", "password" => "123456", "client_id" => "admin_123", )); $mqtt->connect(); $connection->mqtt = $mqtt; } /** * {"topic":"demo","type":"add"} */ public function onMessage(TcpConnection $connection, $data) { $data = json_decode($data, true); if ($data == null) { $connection->send("参数不能为空"); return false; } if (!isset($data['topic']) || $data['topic'] == '') { $connection->send("topic为空"); return false; } if (!isset($data['type']) || $data['type'] == '') { $connection->send("type为空"); return false; } $topic = $data['topic']; $type = $data['type']; if ($type == 'add') { $res = $connection->mqtt->subscribe($topic); if ($res == null) { $connection->send("添加成功"); } else { $connection->send("添加失败"); } } else { $res = $connection->mqtt->unsubscribe($topic); if ($res == null) { $connection->send("删除成功"); } else { $connection->send("删除失败"); } } } }