2023-09-27 17:54:21 +08:00

1106 lines
42 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace GatewayWorker;
use GatewayWorker\Lib\Context;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Autoloader;
use Workerman\Connection\AsyncTcpConnection;
use GatewayWorker\Protocols\GatewayProtocol;
/**
*
* Gateway基于Worker 开发
* 用于转发客户端的数据给Worker处理以及转发Worker的数据给客户端
*
* @author walkor<walkor@workerman.net>
*
*/
class Gateway extends Worker
{
/**
* 版本
*
* @var string
*/
const VERSION = '3.1.0';
/**
* 本机 IP
* 单机部署默认 127.0.0.1,如果是分布式部署,需要设置成本机 IP
*
* @var string
*/
public $lanIp = '127.0.0.1';
/**
* 如果宿主机为192.168.1.2 , gatewayworker in docker container (172.25.0.2)
* 此时 lanIp=192.68.1.2 GatewayClientSDK 能连上,但是$this->_innerTcpWorker stream_socket_server(): Unable to connect to tcp://192.168.1.2:2901 (Address not available) in
* 此时 lanIp=172.25.0.2 GatewayClientSDK stream_socket_server(): Unable to connect to tcp://172.25.0.2:2901 (Address not available) $this->_innerTcpWorker 正常监听
*
* solution:
* $gateway->lanIp=192.168.1.2 ;
* $gateway->innerTcpWorkerListen=172.25.0.2; // || 0.0.0.0
*
* GatewayClientSDK connect 192.168.1.2:lanPort
* $this->_innerTcpWorker listen $gateway->innerTcpWorkerListen:lanPort
*
*/
public $innerTcpWorkerListen='';
/**
* 本机端口
*
* @var string
*/
public $lanPort = 0;
/**
* gateway 内部通讯起始端口,每个 gateway 实例应该都不同步长1000
*
* @var int
*/
public $startPort = 2000;
/**
* 注册服务地址,用于注册 Gateway BusinessWorker使之能够通讯
*
* @var string|array
*/
public $registerAddress = '127.0.0.1:1236';
/**
* 是否可以平滑重启gateway 不能平滑重启,否则会导致连接断开
*
* @var bool
*/
public $reloadable = false;
/**
* 心跳时间间隔
*
* @var int
*/
public $pingInterval = 0;
/**
* $pingNotResponseLimit * $pingInterval 时间内,客户端未发送任何数据,断开客户端连接
*
* @var int
*/
public $pingNotResponseLimit = 0;
/**
* 服务端向客户端发送的心跳数据
*
* @var string
*/
public $pingData = '';
/**
* 秘钥
*
* @var string
*/
public $secretKey = '';
/**
* 路由函数
*
* @var callable|null
*/
public $router = null;
/**
* gateway进程转发给businessWorker进程的发送缓冲区大小
*
* @var int
*/
public $sendToWorkerBufferSize = 10240000;
/**
* gateway进程将数据发给客户端时每个客户端发送缓冲区大小
*
* @var int
*/
public $sendToClientBufferSize = 1024000;
/**
* 协议加速
*
* @var bool
*/
public $protocolAccelerate = false;
/**
* BusinessWorker 连接成功之后触发
*
* @var callable|null
*/
public $onBusinessWorkerConnected = null;
/**
* BusinessWorker 关闭时触发
*
* @var callable|null
*/
public $onBusinessWorkerClose = null;
/**
* 保存客户端的所有 connection 对象
*
* @var array
*/
protected $_clientConnections = array();
/**
* uid 到 connection 的映射,一对多关系
*/
protected $_uidConnections = array();
/**
* group 到 connection 的映射,一对多关系
*
* @var array
*/
protected $_groupConnections = array();
/**
* 保存所有 worker 的内部连接的 connection 对象
*
* @var array
*/
protected $_workerConnections = array();
/**
* gateway 内部监听 worker 内部连接的 worker
*
* @var Worker
*/
protected $_innerTcpWorker = null;
/**
* 当 worker 启动时
*
* @var callable|null
*/
protected $_onWorkerStart = null;
/**
* 当有客户端连接时
*
* @var callable|null
*/
protected $_onConnect = null;
/**
* 当客户端发来消息时
*
* @var callable|null
*/
protected $_onMessage = null;
/**
* 当客户端连接关闭时
*
* @var callable|null
*/
protected $_onClose = null;
/**
* 当 worker 停止时
*
* @var callable|null
*/
protected $_onWorkerStop = null;
/**
* 进程启动时间
*
* @var int
*/
protected $_startTime = 0;
/**
* gateway 监听的端口
*
* @var int
*/
protected $_gatewayPort = 0;
/**
* connectionId 记录器
* @var int
*/
protected static $_connectionIdRecorder = 0;
/**
* 用于保持长连接的心跳时间间隔
*
* @var int
*/
const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
/**
* 构造函数
*
* @param string $socket_name
* @param array $context_option
*/
public function __construct($socket_name, $context_option = array())
{
parent::__construct($socket_name, $context_option);
$this->_gatewayPort = substr(strrchr($socket_name,':'),1);
$this->router = array("\\GatewayWorker\\Gateway", 'routerBind');
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
}
/**
* {@inheritdoc}
*/
public function run()
{
// 保存用户的回调,当对应的事件发生时触发
$this->_onWorkerStart = $this->onWorkerStart;
$this->onWorkerStart = array($this, 'onWorkerStart');
// 保存用户的回调,当对应的事件发生时触发
$this->_onConnect = $this->onConnect;
$this->onConnect = array($this, 'onClientConnect');
// onMessage禁止用户设置回调
$this->onMessage = array($this, 'onClientMessage');
// 保存用户的回调,当对应的事件发生时触发
$this->_onClose = $this->onClose;
$this->onClose = array($this, 'onClientClose');
// 保存用户的回调,当对应的事件发生时触发
$this->_onWorkerStop = $this->onWorkerStop;
$this->onWorkerStop = array($this, 'onWorkerStop');
if (!is_array($this->registerAddress)) {
$this->registerAddress = array($this->registerAddress);
}
// 记录进程启动的时间
$this->_startTime = time();
// 运行父方法
parent::run();
}
/**
* 当客户端发来数据时转发给worker处理
*
* @param TcpConnection $connection
* @param mixed $data
*/
public function onClientMessage($connection, $data)
{
$connection->pingNotResponseCount = -1;
$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
}
/**
* 当客户端连接上来时,初始化一些客户端的数据
* 包括全局唯一的client_id、初始化session等
*
* @param TcpConnection $connection
*/
public function onClientConnect($connection)
{
$connection->id = self::generateConnectionId();
// 保存该连接的内部通讯的数据包报头,避免每次重新初始化
$connection->gatewayHeader = array(
'local_ip' => ip2long($this->lanIp),
'local_port' => $this->lanPort,
'client_ip' => ip2long($connection->getRemoteIp()),
'client_port' => $connection->getRemotePort(),
'gateway_port' => $this->_gatewayPort,
'connection_id' => $connection->id,
'flag' => 0,
);
// 连接的 session
$connection->session = '';
// 该连接的心跳参数
$connection->pingNotResponseCount = -1;
// 该链接发送缓冲区大小
$connection->maxSendBufferSize = $this->sendToClientBufferSize;
// 保存客户端连接 connection 对象
$this->_clientConnections[$connection->id] = $connection;
// 如果用户有自定义 onConnect 回调,则执行
if ($this->_onConnect) {
call_user_func($this->_onConnect, $connection);
if (isset($connection->onWebSocketConnect)) {
$connection->_onWebSocketConnect = $connection->onWebSocketConnect;
}
}
if ($connection->protocol === '\Workerman\Protocols\Websocket' || $connection->protocol === 'Workerman\Protocols\Websocket') {
$connection->onWebSocketConnect = array($this, 'onWebsocketConnect');
}
$this->sendToWorker(GatewayProtocol::CMD_ON_CONNECT, $connection);
}
/**
* websocket握手时触发
*
* @param $connection
* @param $request
*/
public function onWebsocketConnect($connection, $request)
{
if (isset($connection->_onWebSocketConnect)) {
call_user_func($connection->_onWebSocketConnect, $connection, $request);
unset($connection->_onWebSocketConnect);
}
if (is_object($request)) {
$server = [
'QUERY_STRING' => $request->queryString(),
'REQUEST_METHOD' => $request->method(),
'REQUEST_URI' => $request->uri(),
'SERVER_PROTOCOL' => "HTTP/" . $request->protocolVersion(),
'SERVER_NAME' => $request->host(false),
'CONTENT_TYPE' => $request->header('content-type'),
'REMOTE_ADDR' => $connection->getRemoteIp(),
'REMOTE_PORT' => $connection->getRemotePort(),
'SERVER_PORT' => $connection->getLocalPort(),
];
foreach ($request->header() as $key => $header) {
$key = str_replace('-', '_', strtoupper($key));
$server["HTTP_$key"] = $header;
}
$data = array('get' => $request->get(), 'server' => $server, 'cookie' => $request->cookie());
} else {
$data = array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE);
}
$this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, $data);
}
/**
* 生成connection id
* @return int
*/
protected function generateConnectionId()
{
$max_unsigned_int = 4294967295;
if (self::$_connectionIdRecorder >= $max_unsigned_int) {
self::$_connectionIdRecorder = 0;
}
while(++self::$_connectionIdRecorder <= $max_unsigned_int) {
if(!isset($this->_clientConnections[self::$_connectionIdRecorder])) {
break;
}
}
return self::$_connectionIdRecorder;
}
/**
* 发送数据给 worker 进程
*
* @param int $cmd
* @param TcpConnection $connection
* @param mixed $body
* @return bool
*/
protected function sendToWorker($cmd, $connection, $body = '')
{
$gateway_data = $connection->gatewayHeader;
$gateway_data['cmd'] = $cmd;
$gateway_data['body'] = $body;
$gateway_data['ext_data'] = $connection->session;
if ($this->_workerConnections) {
// 调用路由函数选择一个worker把请求转发给它
/** @var TcpConnection $worker_connection */
$worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
if (false === $worker_connection->send($gateway_data)) {
$msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://doc2.workerman.net/send-buffer-overflow.html";
static::log($msg);
return false;
}
} // 没有可用的 worker
else {
// gateway 启动后 1-2 秒内 SendBufferToWorker fail 是正常现象,因为与 worker 的连接还没建立起来,
// 所以不记录日志,只是关闭连接
$time_diff = 2;
if (time() - $this->_startTime >= $time_diff) {
$msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://doc2.workerman.net/send-buffer-to-worker-fail.html';
static::log($msg);
}
$connection->destroy();
return false;
}
return true;
}
/**
* 随机路由,返回 worker connection 对象
*
* @param array $worker_connections
* @param TcpConnection $client_connection
* @param int $cmd
* @param mixed $buffer
* @return TcpConnection
*/
public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
{
return $worker_connections[array_rand($worker_connections)];
}
/**
* client_id 与 worker 绑定
*
* @param array $worker_connections
* @param TcpConnection $client_connection
* @param int $cmd
* @param mixed $buffer
* @return TcpConnection
*/
public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
{
if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
$client_connection->businessworker_address = array_rand($worker_connections);
}
return $worker_connections[$client_connection->businessworker_address];
}
/**
* 当客户端关闭时
*
* @param TcpConnection $connection
*/
public function onClientClose($connection)
{
// 尝试通知 worker触发 Event::onClose
$this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
unset($this->_clientConnections[$connection->id]);
// 清理 uid 数据
if (!empty($connection->uid)) {
$uid = $connection->uid;
unset($this->_uidConnections[$uid][$connection->id]);
if (empty($this->_uidConnections[$uid])) {
unset($this->_uidConnections[$uid]);
}
}
// 清理 group 数据
if (!empty($connection->groups)) {
foreach ($connection->groups as $group) {
unset($this->_groupConnections[$group][$connection->id]);
if (empty($this->_groupConnections[$group])) {
unset($this->_groupConnections[$group]);
}
}
}
// 触发 onClose
if ($this->_onClose) {
call_user_func($this->_onClose, $connection);
}
}
/**
* 当 Gateway 启动的时候触发的回调函数
*
* @return void
*/
public function onWorkerStart()
{
// 分配一个内部通讯端口
$this->lanPort = $this->startPort + $this->id;
// 如果有设置心跳,则定时执行
if ($this->pingInterval > 0) {
$timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
Timer::add($timer_interval, array($this, 'ping'));
}
// 如果BusinessWorker ip不是127.0.0.1则需要加gateway到BusinessWorker的心跳
if ($this->lanIp !== '127.0.0.1') {
Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));
}
if (!class_exists('\Protocols\GatewayProtocol')) {
class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
}
//如为公网IP监听直接换成0.0.0.0 否则用内网IP
$listen_ip=filter_var($this->lanIp,FILTER_VALIDATE_IP,FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE)?'0.0.0.0':$this->lanIp;
//Use scenario to see line 64
if($this->innerTcpWorkerListen != '') {
$listen_ip = $this->innerTcpWorkerListen;
}
// 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
$this->_innerTcpWorker = new Worker("GatewayProtocol://{$listen_ip}:{$this->lanPort}");
$this->_innerTcpWorker->reusePort = false;
$this->_innerTcpWorker->listen();
$this->_innerTcpWorker->name = 'GatewayInnerWorker';
if ($this->_autoloadRootPath && class_exists(Autoloader::class)) {
Autoloader::setRootPath($this->_autoloadRootPath);
}
// 设置内部监听的相关回调
$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
$this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
$this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
// 注册 gateway 的内部通讯地址worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接
$this->registerAddress();
if ($this->_onWorkerStart) {
call_user_func($this->_onWorkerStart, $this);
}
}
/**
* 当 worker 通过内部通讯端口连接到 gateway 时
*
* @param TcpConnection $connection
*/
public function onWorkerConnect($connection)
{
$connection->maxSendBufferSize = $this->sendToWorkerBufferSize;
$connection->authorized = $this->secretKey ? false : true;
}
/**
* 当 worker 发来数据时
*
* @param TcpConnection $connection
* @param mixed $data
* @throws \Exception
*
* @return void
*/
public function onWorkerMessage($connection, $data)
{
$cmd = $data['cmd'];
if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
$connection->close();
return;
}
switch ($cmd) {
// BusinessWorker连接Gateway
case GatewayProtocol::CMD_WORKER_CONNECT:
$worker_info = json_decode($data['body'], true);
if ($worker_info['secret_key'] !== $this->secretKey) {
self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));
$connection->close();
return;
}
$key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];
// 在一台服务器上businessWorker->name不能相同
if (isset($this->_workerConnections[$key])) {
self::log("Gateway: Worker->name conflict. Key:{$key}");
$connection->close();
return;
}
$connection->key = $key;
$this->_workerConnections[$key] = $connection;
$connection->authorized = true;
if ($this->onBusinessWorkerConnected) {
call_user_func($this->onBusinessWorkerConnected, $connection);
}
return;
// GatewayClient连接Gateway
case GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT:
$worker_info = json_decode($data['body'], true);
if ($worker_info['secret_key'] !== $this->secretKey) {
self::log("Gateway: GatewayClient key does not match ".var_export($this->secretKey, true)." !== ".var_export($this->secretKey, true));
$connection->close();
return;
}
$connection->authorized = true;
return;
// 向某客户端发送数据Gateway::sendToClient($client_id, $message);
case GatewayProtocol::CMD_SEND_TO_ONE:
if (isset($this->_clientConnections[$data['connection_id']])) {
$raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
$body = $data['body'];
if (!$raw && $this->protocolAccelerate && $this->protocol) {
$body = $this->preEncodeForClient($body);
$raw = true;
}
$this->_clientConnections[$data['connection_id']]->send($body, $raw);
}
return;
// 踢出用户Gateway::closeClient($client_id, $message);
case GatewayProtocol::CMD_KICK:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->close($data['body']);
}
return;
// 立即销毁用户连接, Gateway::destroyClient($client_id);
case GatewayProtocol::CMD_DESTROY:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->destroy();
}
return;
// 广播, Gateway::sendToAll($message, $client_id_array)
case GatewayProtocol::CMD_SEND_TO_ALL:
$raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
$body = $data['body'];
if (!$raw && $this->protocolAccelerate && $this->protocol) {
$body = $this->preEncodeForClient($body);
$raw = true;
}
$ext_data = $data['ext_data'] ? json_decode($data['ext_data'], true) : '';
// $client_id_array 不为空时,只广播给 $client_id_array 指定的客户端
if (isset($ext_data['connections'])) {
foreach ($ext_data['connections'] as $connection_id) {
if (isset($this->_clientConnections[$connection_id])) {
$this->_clientConnections[$connection_id]->send($body, $raw);
}
}
} // $client_id_array 为空时,广播给所有在线客户端
else {
$exclude_connection_id = !empty($ext_data['exclude']) ? $ext_data['exclude'] : null;
foreach ($this->_clientConnections as $client_connection) {
if (!isset($exclude_connection_id[$client_connection->id])) {
$client_connection->send($body, $raw);
}
}
}
return;
case GatewayProtocol::CMD_SELECT:
$client_info_array = array();
$ext_data = json_decode($data['ext_data'], true);
if (!$ext_data) {
echo 'CMD_SELECT ext_data=' . var_export($data['ext_data'], true) . '\r\n';
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
}
$fields = $ext_data['fields'];
$where = $ext_data['where'];
if ($where) {
$connection_box_map = array(
'groups' => $this->_groupConnections,
'uid' => $this->_uidConnections
);
// $where = ['groups'=>[x,x..], 'uid'=>[x,x..], 'connection_id'=>[x,x..]]
foreach ($where as $key => $items) {
if ($key !== 'connection_id') {
$connections_box = $connection_box_map[$key];
foreach ($items as $item) {
if (isset($connections_box[$item])) {
foreach ($connections_box[$item] as $connection_id => $client_connection) {
if (!isset($client_info_array[$connection_id])) {
$client_info_array[$connection_id] = array();
// $fields = ['groups', 'uid', 'session']
foreach ($fields as $field) {
$client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
}
}
}
}
}
} else {
foreach ($items as $connection_id) {
if (isset($this->_clientConnections[$connection_id])) {
$client_connection = $this->_clientConnections[$connection_id];
$client_info_array[$connection_id] = array();
// $fields = ['groups', 'uid', 'session']
foreach ($fields as $field) {
$client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
}
}
}
}
}
} else {
foreach ($this->_clientConnections as $connection_id => $client_connection) {
foreach ($fields as $field) {
$client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
}
}
}
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
// 获取在线群组列表
case GatewayProtocol::CMD_GET_GROUP_ID_LIST:
$buffer = serialize(array_keys($this->_groupConnections));
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
// 重新赋值 session
case GatewayProtocol::CMD_SET_SESSION:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
}
return;
// session合并
case GatewayProtocol::CMD_UPDATE_SESSION:
if (!isset($this->_clientConnections[$data['connection_id']])) {
return;
} else {
if (!$this->_clientConnections[$data['connection_id']]->session) {
$this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
return;
}
$session = Context::sessionDecode($this->_clientConnections[$data['connection_id']]->session);
$session_for_merge = Context::sessionDecode($data['ext_data']);
$session = array_replace_recursive($session, $session_for_merge);
$this->_clientConnections[$data['connection_id']]->session = Context::sessionEncode($session);
}
return;
case GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID:
if (!isset($this->_clientConnections[$data['connection_id']])) {
$session = serialize(null);
} else {
if (!$this->_clientConnections[$data['connection_id']]->session) {
$session = serialize(array());
} else {
$session = $this->_clientConnections[$data['connection_id']]->session;
}
}
$connection->send(pack('N', strlen($session)) . $session, true);
return;
// 获得客户端sessions
case GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS:
$client_info_array = array();
foreach ($this->_clientConnections as $connection_id => $client_connection) {
$client_info_array[$connection_id] = $client_connection->session;
}
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
// 判断某个 client_id 是否在线 Gateway::isOnline($client_id)
case GatewayProtocol::CMD_IS_ONLINE:
$buffer = serialize((int)isset($this->_clientConnections[$data['connection_id']]));
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
// 将 client_id 与 uid 绑定
case GatewayProtocol::CMD_BIND_UID:
$uid = $data['ext_data'];
if (empty($uid)) {
echo "bindUid(client_id, uid) uid empty, uid=" . var_export($uid, true);
return;
}
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (isset($client_connection->uid)) {
$current_uid = $client_connection->uid;
unset($this->_uidConnections[$current_uid][$connection_id]);
if (empty($this->_uidConnections[$current_uid])) {
unset($this->_uidConnections[$current_uid]);
}
}
$client_connection->uid = $uid;
$this->_uidConnections[$uid][$connection_id] = $client_connection;
return;
// client_id 与 uid 解绑 Gateway::unbindUid($client_id, $uid);
case GatewayProtocol::CMD_UNBIND_UID:
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (isset($client_connection->uid)) {
$current_uid = $client_connection->uid;
unset($this->_uidConnections[$current_uid][$connection_id]);
if (empty($this->_uidConnections[$current_uid])) {
unset($this->_uidConnections[$current_uid]);
}
$client_connection->uid_info = '';
$client_connection->uid = null;
}
return;
// 发送数据给 uid Gateway::sendToUid($uid, $msg);
case GatewayProtocol::CMD_SEND_TO_UID:
$raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
$body = $data['body'];
if (!$raw && $this->protocolAccelerate && $this->protocol) {
$body = $this->preEncodeForClient($body);
$raw = true;
}
$uid_array = json_decode($data['ext_data'], true);
foreach ($uid_array as $uid) {
if (!empty($this->_uidConnections[$uid])) {
foreach ($this->_uidConnections[$uid] as $connection) {
/** @var TcpConnection $connection */
$connection->send($body, $raw);
}
}
}
return;
// 将 $client_id 加入用户组 Gateway::joinGroup($client_id, $group);
case GatewayProtocol::CMD_JOIN_GROUP:
$group = $data['ext_data'];
if (empty($group)) {
echo "join(group) group empty, group=" . var_export($group, true);
return;
}
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (!isset($client_connection->groups)) {
$client_connection->groups = array();
}
$client_connection->groups[$group] = $group;
$this->_groupConnections[$group][$connection_id] = $client_connection;
return;
// 将 $client_id 从某个用户组中移除 Gateway::leaveGroup($client_id, $group);
case GatewayProtocol::CMD_LEAVE_GROUP:
$group = $data['ext_data'];
if (empty($group)) {
echo "leave(group) group empty, group=" . var_export($group, true);
return;
}
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (!isset($client_connection->groups[$group])) {
return;
}
unset($client_connection->groups[$group], $this->_groupConnections[$group][$connection_id]);
if (empty($this->_groupConnections[$group])) {
unset($this->_groupConnections[$group]);
}
return;
// 解散分组
case GatewayProtocol::CMD_UNGROUP:
$group = $data['ext_data'];
if (empty($group)) {
echo "leave(group) group empty, group=" . var_export($group, true);
return;
}
if (empty($this->_groupConnections[$group])) {
return;
}
foreach ($this->_groupConnections[$group] as $client_connection) {
unset($client_connection->groups[$group]);
}
unset($this->_groupConnections[$group]);
return;
// 向某个用户组发送消息 Gateway::sendToGroup($group, $msg);
case GatewayProtocol::CMD_SEND_TO_GROUP:
$raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
$body = $data['body'];
if (!$raw && $this->protocolAccelerate && $this->protocol) {
$body = $this->preEncodeForClient($body);
$raw = true;
}
$ext_data = json_decode($data['ext_data'], true);
$group_array = $ext_data['group'];
$exclude_connection_id = $ext_data['exclude'];
foreach ($group_array as $group) {
if (!empty($this->_groupConnections[$group])) {
foreach ($this->_groupConnections[$group] as $connection) {
if(!isset($exclude_connection_id[$connection->id]))
{
/** @var TcpConnection $connection */
$connection->send($body, $raw);
}
}
}
}
return;
// 获取某用户组成员信息 Gateway::getClientSessionsByGroup($group);
case GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP:
$group = $data['ext_data'];
if (!isset($this->_groupConnections[$group])) {
$buffer = serialize(array());
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
}
$client_info_array = array();
foreach ($this->_groupConnections[$group] as $connection_id => $client_connection) {
$client_info_array[$connection_id] = $client_connection->session;
}
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
// 获取用户组成员数 Gateway::getClientCountByGroup($group);
case GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP:
$group = $data['ext_data'];
$count = 0;
if ($group !== '') {
if (isset($this->_groupConnections[$group])) {
$count = count($this->_groupConnections[$group]);
}
} else {
$count = count($this->_clientConnections);
}
$buffer = serialize($count);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
// 获取与某个 uid 绑定的所有 client_id Gateway::getClientIdByUid($uid);
case GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID:
$uid = $data['ext_data'];
if (empty($this->_uidConnections[$uid])) {
$buffer = serialize(array());
} else {
$buffer = serialize(array_keys($this->_uidConnections[$uid]));
}
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
default :
$err_msg = "gateway inner pack err cmd=$cmd";
echo $err_msg;
}
}
/**
* 当worker连接关闭时
*
* @param TcpConnection $connection
*/
public function onWorkerClose($connection)
{
if (isset($connection->key)) {
unset($this->_workerConnections[$connection->key]);
if ($this->onBusinessWorkerClose) {
call_user_func($this->onBusinessWorkerClose, $connection);
}
}
}
/**
* 存储当前 Gateway 的内部通信地址
*
* @return bool
*/
public function registerAddress()
{
$address = $this->lanIp . ':' . $this->lanPort;
foreach ($this->registerAddress as $register_address) {
$register_connection = new AsyncTcpConnection("text://{$register_address}");
$secret_key = $this->secretKey;
$register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){
$register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}');
// 如果Register服务器不在本地服务器则需要保持心跳
if (strpos($register_address, '127.0.0.1') !== 0) {
$register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
$register_connection->send('{"event":"ping"}');
});
}
};
$register_connection->onClose = function ($register_connection) {
if(!empty($register_connection->ping_timer)) {
Timer::del($register_connection->ping_timer);
}
$register_connection->reconnect(1);
};
$register_connection->connect();
}
}
/**
* 心跳逻辑
*
* @return void
*/
public function ping()
{
$ping_data = $this->pingData ? (string)$this->pingData : null;
$raw = false;
if ($this->protocolAccelerate && $ping_data && $this->protocol) {
$ping_data = $this->preEncodeForClient($ping_data);
$raw = true;
}
// 遍历所有客户端连接
foreach ($this->_clientConnections as $connection) {
// 上次发送的心跳还没有回复次数大于限定值就断开
if ($this->pingNotResponseLimit > 0 &&
$connection->pingNotResponseCount >= $this->pingNotResponseLimit * 2
) {
$connection->destroy();
continue;
}
// $connection->pingNotResponseCount 为 -1 说明最近客户端有发来消息,则不给客户端发送心跳
$connection->pingNotResponseCount++;
if ($ping_data) {
if ($connection->pingNotResponseCount === 0 ||
($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount % 2 === 1)
) {
continue;
}
$connection->send($ping_data, $raw);
}
}
}
/**
* 向 BusinessWorker 发送心跳数据,用于保持长连接
*
* @return void
*/
public function pingBusinessWorker()
{
$gateway_data = GatewayProtocol::$empty;
$gateway_data['cmd'] = GatewayProtocol::CMD_PING;
foreach ($this->_workerConnections as $connection) {
$connection->send($gateway_data);
}
}
/**
* @param mixed $data
*
* @return string
*/
protected function preEncodeForClient($data)
{
foreach ($this->_clientConnections as $client_connection) {
return call_user_func(array($client_connection->protocol, 'encode'), $data, $client_connection);
}
}
/**
* 当 gateway 关闭时触发,清理数据
*
* @return void
*/
public function onWorkerStop()
{
// 尝试触发用户设置的回调
if ($this->_onWorkerStop) {
call_user_func($this->_onWorkerStop, $this);
}
}
/**
* Log.
* @param string $msg
*/
public static function log($msg){
Timer::add(1, function() use ($msg) {
Worker::log($msg);
}, null, false);
}
}