新增push

This commit is contained in:
mkm 2023-06-29 14:22:30 +08:00
parent 95dc8691ce
commit 94400f3715
20 changed files with 3273 additions and 2 deletions

View File

@ -0,0 +1,79 @@
<?php
namespace app\api\controller;
use support\Request;
use plugin\admin\app\controller\Base;
use support\Redis;
use Webman\Push\Api;
class Push extends Base
{
public function index(Request $request)
{
$api = new Api(
'http://127.0.0.1:3232',
config('plugin.webman.push.app.app_key'),
config('plugin.webman.push.app.app_secret')
);
$name = $request->get('name', 'ok');
$uid = $request->get('uid', 1);
// 给订阅 user-1 的所有客户端推送 message 事件的消息
$api->trigger('user-1', 'message', [
'from_uid' => $uid,
'content' => $name
]);
return $this->json(200, 'ok');
}
/**
* 点击通话操作
*/
public function clicks(Request $request)
{
$name = $request->get('name', '');
Redis::set('tonghua-url', $name, 10);
return $this->json(200, 'ok');
}
/**
* 查询通话操作
*/
function querys(Request $request)
{
$a = Redis::get('tonghua-url');
return $this->json(200, 'ok', ['data' => $a]);
}
/**
* 前端事件回调删除redis数据
*/
public function del(Request $request)
{
Redis::del('tonghua-url');
return $this->json(200, 'ok');
}
/**
* 结束通话操作
*/
public function clicksEnd(Request $request)
{
$name = $request->get('name', '');
Redis::set('tonghua-end', $name, 10);
return $this->json(200, 'ok');
}
/**
* 查询通话操作
*/
function clicksQuerys(Request $request)
{
$a = Redis::get('tonghua-end');
return $this->json(200, 'ok', ['data' => $a]);
}
/**
* 结束通话操作
*/
public function clicksDel(Request $request)
{
Redis::del('tonghua-end');
return $this->json(200, 'ok');
}
}

View File

@ -31,7 +31,8 @@
"webman/think-orm": "^1.1",
"vlucas/phpdotenv": "^5.5",
"illuminate/redis": "^8.83",
"illuminate/events": "^8.83"
"illuminate/events": "^8.83",
"webman/push": "^1.0"
},
"suggest": {
"ext-event": "For better performance. "

35
composer.lock generated
View File

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "ed766318d6b5fdc02c786ab7f27bc874",
"content-hash": "048b304db3d9141f5fcfc9460874678d",
"packages": [
{
"name": "doctrine/inflector",
@ -3236,6 +3236,39 @@
},
"time": "2023-03-28T04:01:23+00:00"
},
{
"name": "webman/push",
"version": "v1.0.16",
"source": {
"type": "git",
"url": "https://github.com/webman-php/push.git",
"reference": "cd838ea76ffd90ef165564e4f35cebbe87462e77"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/webman-php/push/zipball/cd838ea76ffd90ef165564e4f35cebbe87462e77",
"reference": "cd838ea76ffd90ef165564e4f35cebbe87462e77",
"shasum": ""
},
"require": {
"php": ">=7.2"
},
"type": "library",
"autoload": {
"psr-4": {
"Webman\\Push\\": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"support": {
"issues": "https://github.com/webman-php/push/issues",
"source": "https://github.com/webman-php/push/tree/v1.0.16"
},
"time": "2023-06-17T13:34:32+00:00"
},
{
"name": "webman/think-orm",
"version": "v1.1.1",

View File

@ -0,0 +1,10 @@
<?php
return [
'enable' => true,
'websocket' => 'websocket://0.0.0.0:3131',
'api' => 'http://0.0.0.0:3232',
'app_key' => '40130278692df9bd1a77f3ff6d64edb6',
'app_secret' => '526598e8713e4ed50ea4cd8ff5e77f23',
'channel_hook' => 'http://127.0.0.1:8787/plugin/webman/push/hook',
'auth' => '/plugin/webman/push/auth'
];

View File

@ -0,0 +1,21 @@
<?php
use Webman\Push\Server;
return [
'server' => [
'handler' => Server::class,
'listen' => config('plugin.webman.push.app.websocket'),
'count' => 1, // 必须是1
'reloadable' => false, // 执行reload不重启
'constructor' => [
'api_listen' => config('plugin.webman.push.app.api'),
'app_info' => [
config('plugin.webman.push.app.app_key') => [
'channel_hook' => config('plugin.webman.push.app.channel_hook'),
'app_secret' => config('plugin.webman.push.app.app_secret'),
],
]
]
]
];

View File

@ -0,0 +1,87 @@
<?php
/**
* This file is part of webman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
use support\Request;
use Webman\Route;
use Webman\Push\Api;
/**
* 推送js客户端文件
*/
Route::get('/plugin/webman/push/push.js', function (Request $request) {
return response()->file(base_path().'/vendor/webman/push/src/push.js');
});
/**
* 私有频道鉴权这里应该使用session辨别当前用户身份然后确定该用户是否有权限监听channel_name
*/
Route::post(config('plugin.webman.push.app.auth'), function (Request $request) {
$pusher = new Api(str_replace('0.0.0.0', '127.0.0.1', config('plugin.webman.push.app.api')), config('plugin.webman.push.app.app_key'), config('plugin.webman.push.app.app_secret'));
$channel_name = $request->post('channel_name');
$session = $request->session();
// 这里应该通过session和channel_name判断当前用户是否有权限监听channel_name
$has_authority = true;
if ($has_authority) {
return response($pusher->socketAuth($channel_name, $request->post('socket_id')));
} else {
return response('Forbidden', 403);
}
});
/**
* 当频道上线以及下线时触发的回调
* 频道上线:是指某个频道从没有连接在线到有连接在线的事件
* 频道下线:是指某个频道的所有连接都断开触发的事件
*/
Route::post(parse_url(config('plugin.webman.push.app.channel_hook'), PHP_URL_PATH), function (Request $request) {
// 没有x-pusher-signature头视为伪造请求
if (!$webhook_signature = $request->header('x-pusher-signature')) {
return response('401 Not authenticated', 401);
}
$body = $request->rawBody();
// 计算签名,$app_secret 是双方使用的密钥,是保密的,外部无从得知
$expected_signature = hash_hmac('sha256', $body, config('plugin.webman.push.app.app_secret'), false);
// 安全校验如果签名不一致可能是伪造的请求返回401状态码
if ($webhook_signature !== $expected_signature) {
return response('401 Not authenticated', 401);
}
// 这里存储这上线 下线的channel数据
$payload = json_decode($body, true);
$channels_online = $channels_offline = [];
foreach ($payload['events'] as $event) {
if ($event['name'] === 'channel_added') {
$channels_online[] = $event['channel'];
} else if ($event['name'] === 'channel_removed') {
$channels_offline[] = $event['channel'];
}
}
// 业务根据需要处理上下线的channel例如将在线状态写入数据库通知其它channel等
// 上线的所有channel
echo 'online channels: ' . implode(',', $channels_online) . "\n";
// 下线的所有channel
echo 'offline channels: ' . implode(',', $channels_offline) . "\n";
return 'OK';
});

View File

@ -16,6 +16,8 @@ use Webman\Route;
Route::group('/api', function () {
Route::any('/push',[app\api\controller\Push::class,'index']);
Route::group('/goview', function () {
Route::group('/sys', function () {
Route::any('/login',[app\api\controller\IndexController::class,'login']);

View File

@ -13,6 +13,7 @@ return array(
'app\\' => array($baseDir . '/app'),
'Workerman\\' => array($vendorDir . '/workerman/workerman'),
'Webman\\ThinkOrm\\' => array($vendorDir . '/webman/think-orm/src'),
'Webman\\Push\\' => array($vendorDir . '/webman/push/src'),
'Webman\\Event\\' => array($vendorDir . '/webman/event/src'),
'Webman\\Captcha\\' => array($vendorDir . '/webman/captcha/src'),
'Webman\\Admin\\' => array($vendorDir . '/webman/admin/src'),

View File

@ -49,6 +49,7 @@ class ComposerStaticInitd74e1fcdbb076080cce04a84d4bbcd73
array (
'Workerman\\' => 10,
'Webman\\ThinkOrm\\' => 16,
'Webman\\Push\\' => 12,
'Webman\\Event\\' => 13,
'Webman\\Captcha\\' => 15,
'Webman\\Admin\\' => 13,
@ -154,6 +155,10 @@ class ComposerStaticInitd74e1fcdbb076080cce04a84d4bbcd73
array (
0 => __DIR__ . '/..' . '/webman/think-orm/src',
),
'Webman\\Push\\' =>
array (
0 => __DIR__ . '/..' . '/webman/push/src',
),
'Webman\\Event\\' =>
array (
0 => __DIR__ . '/..' . '/webman/event/src',

View File

@ -3371,6 +3371,42 @@
},
"install-path": "../webman/event"
},
{
"name": "webman/push",
"version": "v1.0.16",
"version_normalized": "1.0.16.0",
"source": {
"type": "git",
"url": "https://github.com/webman-php/push.git",
"reference": "cd838ea76ffd90ef165564e4f35cebbe87462e77"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/webman-php/push/zipball/cd838ea76ffd90ef165564e4f35cebbe87462e77",
"reference": "cd838ea76ffd90ef165564e4f35cebbe87462e77",
"shasum": ""
},
"require": {
"php": ">=7.2"
},
"time": "2023-06-17T13:34:32+00:00",
"type": "library",
"installation-source": "dist",
"autoload": {
"psr-4": {
"Webman\\Push\\": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"support": {
"issues": "https://github.com/webman-php/push/issues",
"source": "https://github.com/webman-php/push/tree/v1.0.16"
},
"install-path": "../webman/push"
},
{
"name": "webman/think-orm",
"version": "v1.1.1",

View File

@ -470,6 +470,15 @@
'aliases' => array(),
'dev_requirement' => false,
),
'webman/push' => array(
'pretty_version' => 'v1.0.16',
'version' => '1.0.16.0',
'reference' => 'cd838ea76ffd90ef165564e4f35cebbe87462e77',
'type' => 'library',
'install_path' => __DIR__ . '/../webman/push',
'aliases' => array(),
'dev_requirement' => false,
),
'webman/think-orm' => array(
'pretty_version' => 'v1.1.1',
'version' => '1.1.1.0',

3
vendor/webman/push/README.md vendored Normal file
View File

@ -0,0 +1,3 @@
# push
webman push plugin
https://www.workerman.net/plugin/2

240
vendor/webman/push/src/Api.php vendored Normal file
View File

@ -0,0 +1,240 @@
<?php
namespace Webman\Push;
/**
* Modified from https://github.com/pusher/pusher-http-php
*/
class Api
{
/**
* @var array
*/
protected $_settings = [
'timeout' => 2,
];
/**
* @param $api_address
* @param $auth_key
* @param $secret
* @throws PushException
*/
public function __construct($api_address, $auth_key, $secret)
{
$this->checkCompatibility();
$this->_settings['api_address'] = $api_address;
$this->_settings['auth_key'] = $auth_key;
$this->_settings['secret'] = $secret;
$this->_settings['base_path'] = '/apps/1024';
}
/**
* trigger an event by providing event name and payload.
* Optionally provide a socket ID to exclude a client (most likely the sender).
*
* @param array|string $channels An array of channel names to publish the event on.
* @param string $event
* @param mixed $data Event data
* @param string $socket_id [optional]
* @return bool|string
*/
public function trigger($channels, $event, $data, $socket_id = null)
{
if (is_string($channels)) {
$channels = array($channels);
}
$query_params = array();
$s_url = $this->_settings['base_path'] . '/events';
$data_encoded = json_encode($data);
$post_params = array();
$post_params['name'] = $event;
$post_params['data'] = $data_encoded;
$post_params['channels'] = $channels;
if ($socket_id !== null) {
$post_params['socket_id'] = $socket_id;
}
$post_value = json_encode($post_params);
$query_params['body_md5'] = md5($post_value);
$ch = $this->createCurl($this->_settings['api_address'], $s_url, 'POST', $query_params);
curl_setopt($ch, CURLOPT_POSTFIELDS, $post_value);
$response = $this->execCurl($ch);
if ($response['status'] === 200) {
return true;
} else {
return false;
}
}
public function getChannelInfo($channel, $params = array())
{
$this->validateChannel($channel);
$response = $this->get('/channels/' . $channel, $params);
if ($response['status'] === 200) {
$response = json_decode($response['body']);
} else {
$response = false;
}
return $response;
}
public function getChannels($params = array())
{
return $this->get('/channels', $params);
}
private function checkCompatibility()
{
if (!in_array('sha256', hash_algos())) {
throw new PushException('SHA256 appears to be unsupported - make sure you have support for it, or upgrade your version of PHP.');
}
}
private function validateChannel($channel)
{
if (!preg_match('/\A[-a-zA-Z0-9_=@,.;]+\z/', $channel)) {
throw new PushException('Invalid channel name ' . $channel);
}
}
private function validateSocketId($socket_id)
{
if ($socket_id !== null && !preg_match('/\A\d+\.\d+\z/', $socket_id)) {
throw new PushException('Invalid socket ID ' . $socket_id);
}
}
private function createCurl($domain, $s_url, $request_method = 'GET', $query_params = array())
{
static $ch = null;
$signed_query = self::buildAuthQueryString(
$this->_settings['auth_key'],
$this->_settings['secret'],
$request_method,
$s_url,
$query_params);
$full_url = $domain . $s_url . '?' . $signed_query;
if (null === $ch) {
$ch = curl_init();
if ($ch === false) {
throw new PushException('Could not initialise cURL!');
}
}
if (function_exists('curl_reset')) {
curl_reset($ch);
}
curl_setopt($ch, CURLOPT_URL, $full_url);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: application/json',
'Expect:',
));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->_settings['timeout']);
if ($request_method === 'POST') {
curl_setopt($ch, CURLOPT_POST, 1);
} elseif ($request_method === 'GET') {
curl_setopt($ch, CURLOPT_POST, 0);
} // Otherwise let the user configure it
return $ch;
}
private function execCurl($ch)
{
$response = array();
$response['body'] = curl_exec($ch);
$response['status'] = curl_getinfo($ch, CURLINFO_HTTP_CODE);
return $response;
}
public static function buildAuthQueryString($auth_key, $auth_secret, $request_method, $request_path,
$query_params = array())
{
$params = array();
$params['auth_key'] = $auth_key;
$params['auth_timestamp'] = time();
$params = array_merge($params, $query_params);
ksort($params);
$string_to_sign = "$request_method\n" . $request_path . "\n" . self::arrayImplode('=', '&', $params);
$auth_signature = hash_hmac('sha256', $string_to_sign, $auth_secret, false);
$params['auth_signature'] = $auth_signature;
ksort($params);
$auth_query_string = self::arrayImplode('=', '&', $params);
return $auth_query_string;
}
public static function arrayImplode($glue, $separator, $array)
{
if (!is_array($array)) {
return $array;
}
$string = array();
foreach ($array as $key => $val) {
if (is_array($val)) {
$val = implode(',', $val);
}
$string[] = "{$key}{$glue}{$val}";
}
return implode($separator, $string);
}
public function get($path, $params = array())
{
$ch = $this->createCurl($this->_settings['api_address'], $this->_settings['base_path'] . $path, 'GET', $params);
$response = $this->execCurl($ch);
if ($response['status'] === 200) {
$response['result'] = json_decode($response['body'], true);
}
return $response;
}
public function socketAuth($channel, $socket_id, $custom_data = null)
{
$this->validateChannel($channel);
$this->validateSocketId($socket_id);
if ($custom_data) {
$signature = hash_hmac('sha256', $socket_id . ':' . $channel . ':' . $custom_data, $this->_settings['secret'], false);
} else {
$signature = hash_hmac('sha256', $socket_id . ':' . $channel, $this->_settings['secret'], false);
}
$signature = array('auth' => $this->_settings['auth_key'] . ':' . $signature);
if ($custom_data) {
$signature['channel_data'] = $custom_data;
}
return json_encode($signature);
}
public function presenceAuth($channel, $socket_id, $user_id, $user_info = null)
{
$user_data = array('user_id' => $user_id);
if ($user_info) {
$user_data['user_info'] = $user_info;
}
return $this->socketAuth($channel, $socket_id, json_encode($user_data));
}
}
class PushException extends \Exception
{
}

77
vendor/webman/push/src/Install.php vendored Normal file
View File

@ -0,0 +1,77 @@
<?php
namespace Webman\Push;
class Install
{
const WEBMAN_PLUGIN = true;
/**
* @var array
*/
protected static $pathRelation = [
'config/plugin/webman/push' => 'config/plugin/webman/push'
];
/**
* Install
* @return void
*/
public static function install()
{
$config_app_path = __DIR__ . '/config/plugin/webman/push/app.php';
$config_app_content = file_get_contents($config_app_path);
$app_key = md5(microtime(true).rand(0, 2100000000));
$app_secret = md5($app_key.rand(0, 2100000000));
$config_app_content = str_replace([
'APP_KEY_TO_REPLACE',
'APP_SECRET_TO_REPLACE'
], [$app_key, $app_secret], $config_app_content);
file_put_contents($config_app_path, $config_app_content);
static::installByRelation();
}
/**
* Uninstall
* @return void
*/
public static function uninstall()
{
self::uninstallByRelation();
}
/**
* installByRelation
* @return void
*/
public static function installByRelation()
{
foreach (static::$pathRelation as $source => $dest) {
if ($pos = strrpos($dest, '/')) {
$parent_dir = base_path().'/'.substr($dest, 0, $pos);
if (!is_dir($parent_dir)) {
mkdir($parent_dir, 0777, true);
}
}
//symlink(__DIR__ . "/$source", base_path()."/$dest");
copy_dir(__DIR__ . "/$source", base_path()."/$dest");
}
}
/**
* uninstallByRelation
* @return void
*/
public static function uninstallByRelation()
{
foreach (static::$pathRelation as $source => $dest) {
$path = base_path()."/$dest";
if (!is_dir($path) && !is_file($path)) {
continue;
}
/*if (is_link($path) {
unlink($path);
}*/
remove_dir($path);
}
}
}

976
vendor/webman/push/src/Server.php vendored Normal file
View File

@ -0,0 +1,976 @@
<?php
/**
* This file is part of webman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Webman\Push;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;
use Workerman\Timer;
use Workerman\Worker;
class Server
{
/**
* 应用信息
*
* @var array
*/
public $appInfo = [];
/**
* 心跳时间
*
* @var int
*/
public $keepAliveTimeout = 60;
/**
* api监听的ip端口
*
* @var string
*/
public $apiListen = 'http://0.0.0.0:1080';
/**
* webhook 延迟设置
*
* @var int
*/
public $webHookDelay = 3;
/**
* @var array
*/
protected $_globalDataSnapshot = [];
/**
* 事件对应的客户端链接
*
* @var array
*/
protected $_eventClients = [];
/**
* 所有的客户端链接
*
* @var array
*/
protected $_allClients = [];
/**
* array(
* 'app_key1' => array(
* 'channel1' => array(
* 'users' => array(
* 'uid1' => array('user_info'=>[], 'ref_count' => x),
* 'uid2' => array('user_info'=>[], 'ref_count' => x),
* ),
* 'type' => 'presence',
* 'subscription_count' => x
* ),
* 'channel2' => array(
* 'users' => array(
* 'uid3' => array('user_info'=>[], 'ref_count' => x)
* ),
* 'type' => 'presence',
* 'subscription_count' => x
* ),
* ),
* 'app_key2' => array(
* 'channel1' => array(
* 'type' => 'private',
* 'subscription_count' => x
* ),
* 'channel2' => array(
* 'type' => 'public',
* 'subscription_count' => x
* ),
* )
* )
* @var array
*/
protected $_globalData = [];
/**
* 当前进程全局唯一订阅id
*
* @var string
*/
protected $_globalID = 1;
/**
* 构造函数
*
* @param string $socket_name
* @param array $context
*/
public function __construct($api_listen, $app_info)
{
$this->apiListen = $api_listen;
$this->appInfo = $app_info;
}
/**
* @param $worker
* @return void
* @throws \Exception
*/
public function onWorkerStart($worker)
{
$api_worker = new Worker($this->apiListen);
$api_worker->onMessage = array($this, 'onApiClientMessage');
$api_worker->listen();
Timer::add($this->keepAliveTimeout/2, array($this, 'checkHeartbeat'));
Timer::add($this->webHookDelay, array($this, 'webHookCheck'));
}
/**
* 客户端连接后
*
* @param $connection
*/
public function onConnect($connection) {
// 客户端有多少次没在规定时间发送心跳
$connection->clientNotSendPingCount = 0;
// 设置websocket握手事件回调
$connection->onWebSocketConnect = array($this, 'onWebSocketConnect');
}
/**
* 当websocket握手时
* @param $connection
* @return mixed
*/
public function onWebSocketConnect(TcpConnection $connection, $header)
{
// /app/1234567890abcdefghig?protocol=7&client=js&version=3.2.4&flash=false
if (!preg_match('/\/app\/([^\/^\?^ ]+)/', (string)$header, $match)) {
echo "app_key not found\n$header\n";
$connection->pauseRecv();
return;
}
$app_key = $match[1];
if (!isset($this->appInfo[$app_key])) {
echo "Invalid app_key $app_key\n";
$connection->pauseRecv();
return;
}
$socket_id = $this->createsocketID($connection);
$connection->appKey = $app_key;
$connection->socketID = $socket_id;
$connection->channels = array(''=>'');
$connection->channelUidMap = [];
$connection->clientNotSendPingCount = 0;
$this->_eventClients[$app_key][''][$socket_id] = $connection;
$this->_allClients[$socket_id] = $connection;
/*
* 向客户端发送链接成功的消息
* {"event":"pusher:connection_established","data":"{\"socket_id\":\"208836.27464492\",\"activity_timeout\":120}"}
*/
$data = array(
'event' => 'pusher:connection_established',
'data' => json_encode(array(
'socket_id' => $socket_id,
'activity_timeout' => 55
))
);
$connection->send(json_encode($data));
}
/**
* 客户端关闭链接时
*
* @param $connection
*/
public function onClose($connection)
{
if (!isset($connection->socketID)) {
return;
}
$socket_id = $connection->socketID;
$app_key = $connection->appKey;
unset($this->_allClients[$socket_id]);
unset($this->_eventClients[$app_key][''][$socket_id]);
if (isset($connection->channels)) {
$app_key = $connection->appKey;
foreach ($connection->channels as $channel => $uid) {
if ('' === $channel) {
continue;
}
if ($uid === '') {
$this->unsubscribePublicChannel($connection, $channel);
} else {
$this->unsubscribePresenceChannel($connection, $channel, $uid);
}
unset($this->_eventClients[$app_key][$channel][$socket_id]);
}
}
}
/**
* 客户端发来消息时
*
* @param $connection
* @param $data
*
* @return void
*/
public function onMessage($connection, $data)
{
$connection->clientNotSendPingCount = 0;
$data = json_decode($data, true);
if (!$data) {
return;
}
if (!isset($data['event'])) {
$connection->send($this->error(null, 'Empty event'));
return;
}
$event = $data['event'];
switch ($event) {
case 'pusher:ping':
$connection->send('{"event":"pusher:pong","data":"{}"}');
return;
// {"event":"pusher:subscribe","data":{"channel":"my-channel"}}
case 'pusher:subscribe':
$channel = $data['data']['channel'];
// private- 和 presence- 开头的channel需要验证
$channel_type = $this->getChannelType($channel);
if ($channel_type === 'presence') {
// {"event":"pusher:subscribe","data":{"auth":"b054014693241bcd9c26:10e3b628cb78e8bc4d1f44d47c9294551b446ae6ec10ef113d3d7e84e99763e6","channel_data":"{\"user_id\":100,\"user_info\":{\"name\":\"123\"}}","channel":"presence-channel"}}
$client_auth = $data['data']['auth'];
if (!isset($data['data']['channel_data'])) {
$connection->send($this->error(null, 'Empty channel_data'));
return;
}
$auth = $connection->appKey.':'.hash_hmac('sha256', $connection->socketID.':'.$channel.':'.$data['data']['channel_data'], $this->appInfo[$connection->appKey]['app_secret'], false);
// {"event":"pusher:error","data":{"code":null,"message":"Received invalid JSON"}}
if ($client_auth !== $auth) {
return $connection->send($this->error(null, 'Received invalid Auth '.$auth));
}
$user_data = json_decode($data['data']['channel_data'], true);
if (!$user_data || !isset($user_data['user_id']) || !isset($user_data['user_info'])) {
$connection->send($this->error(null, 'Bad channel_data'));
return;
}
$this->subscribePresence($connection, $channel, $user_data['user_id'], $user_data['user_info']);
return;
} elseif($channel_type === 'private') {
// {"event":"pusher:subscribe","data":{"auth":"b054014693241bcd9c26:10e3b628cb78e8bc4d1f44d47c9294551b446ae6ec10ef113d3d7e84e99763e6","channel_data":"{\"user_id\":100,\"user_info\":{\"name\":\"123\"}}","channel":"presence-channel"}}
$client_auth = $data['data']['auth'];
$auth = $connection->appKey.':'.hash_hmac('sha256', $connection->socketID.':'.$channel, $this->appInfo[$connection->appKey]['app_secret'], false);
// {"event":"pusher:error","data":{"code":null,"message":"Received invalid Auth"}}
if ($client_auth !== $auth) {
return $connection->send($this->error(null, 'Received invalid Auth '.$auth));
}
$this->subscribePrivateChannel($connection, $channel);
} else {
$this->subscribePublicChannel($connection, $channel);
}
// {"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"my-channel"}
$connection->send(json_encode(
array(
'event' => 'pusher_internal:subscription_succeeded',
'data' => '{}',
'channel' => $channel
), JSON_UNESCAPED_UNICODE
));
return;
// {"event":"pusher:unsubscribe","data":{"channel":"my-channel"}}
case 'pusher:unsubscribe':
$app_key = $connection->appKey;
$channel = $data['data']['channel'];
$channel_type = $this->getChannelType($channel);
switch ($channel_type) {
case 'public':
$this->unsubscribePublicChannel($connection, $channel);
break;
case 'private':
$this->unsubscribePrivateChannel($connection, $channel);
break;
case 'presence':
$uid = $connection->channels[$channel];
$this->unsubscribePresenceChannel($connection, $channel, $uid);
break;
}
return;
// {"event":"client-event","data":{"your":"hi"},"channel":"presence-channel"}
default:
if (strpos($event, 'pusher:') === 0) {
return $connection->send($this->error(null, 'Unknown event'));
}
if (!isset($data['channel'])) {
$connection->send($this->error(null, 'Empty channel'));
return;
}
$channel = $data['channel'];
// 客户端触发事件必须是private 或者 presence的channel
$channel_type = $this->getChannelType($channel);
if ($channel_type !== 'private' && $channel_type !== 'presence') {
// {"event":"pusher:error","data":{"code":null,"message":"Client event rejected - only supported on private and presence channels"}}
return $connection->send($this->error(null, 'Client event rejected - only supported on private and presence channels'));
}
// 当前链接没有订阅这个channel
if (!isset($connection->channels[$channel])) {
return $connection->send($this->error(null, 'Client event rejected - you didn\'t subscribe this channel'));
}
// 事件必须以client-为前缀
if (strpos($event, 'client-') !== 0) {
return $connection->send($this->error(null, 'Client event rejected - client events must be prefixed by \'client-\''));
}
// @todo 检查是否设置了可前端发布事件
// {"event":"pusher:error","data":{"code":null,"message":"To send client events, you must enable this feature in the Settings page of your dashboard."}}
// 全局发布事件
$this->publishToClients($connection->appKey, $channel, $event, json_encode($data['data'], JSON_UNESCAPED_UNICODE), $connection->socketID);
}
}
/**
* 获得channel类型
*
* @param $channel
* @return string
*/
protected function getChannelType($channel) {
if (strpos($channel, 'private-') === 0) {
return 'private';
} elseif (strpos($channel, 'presence-') === 0) {
return 'presence';
}
return 'public';
}
/**
* 组装失败信息
*
* @param $code
* @param $message
* @return string
*/
protected function error($code, $message)
{
return json_encode(array('event'=>'pusher:error', 'data' => array('code' => $code, 'message' => $message)), JSON_UNESCAPED_UNICODE);
}
/**
* 客户端订阅channel
*
* @param $connection
* @param $channel
*
* @return void
*/
public function subscribePublicChannel($connection, $channel) {
$app_key = $connection->appKey;
$connection->channels[$channel] = '';
$this->_eventClients[$app_key][$channel][$connection->socketID] = $connection;
if (!isset($this->_globalData[$app_key][$channel])) {
$this->_globalData[$app_key][$channel] = array(
'type' => 'presence',
'subscription_count' => 0
);
}
$this->_globalData[$app_key][$channel]['subscription_count'] += 1;
}
/**
* 客户端订阅channel
*
* @param $connection
* @param $channel
*
* @return void
*/
public function subscribePrivateChannel($connection, $channel) {
$this->subscribePublicChannel($connection, $channel);
}
/**
* 客户端订阅channel
*
* @param $connection
* @param $channel
*
* @return void
*/
public function subscribePresence($connection, $channel, $uid, $user_info) {
$app_key = $connection->appKey;
$connection->channels[$channel] = $uid;
$this->_eventClients[$app_key][$channel][$connection->socketID] = $connection;
if (!isset($this->_globalData[$app_key][$channel])) {
$this->_globalData[$app_key][$channel] = array(
'type' => 'presence',
'users' => [],
'subscription_count' => 0
);
}
$this->_globalData[$app_key][$channel]['subscription_count'] += 1;
$member_added = false;
if (!isset($this->_globalData[$app_key][$channel]['users'][$uid]['user_info'])) {
$this->_globalData[$app_key][$channel]['users'][$uid] = array('user_info'=>$user_info, 'ref_count' => 0);
$member_added = true;
}
$this->_globalData[$app_key][$channel]['users'][$uid]['ref_count'] += 1;
$presence_data = $this->getPresenceChannelDataForSubscribe($app_key, $channel);
if ($member_added) {
// {"event":"pusher_internal:member_added","data":"{\"user_id\":1488465780,\"user_info\":{\"name\":\"123\",\"sex\":\"1\"}}","channel":"presence-channel"}
$this->publishToClients($app_key, $channel, 'pusher_internal:member_added', json_encode(array(
'user_id' => $uid,
'user_info' => $user_info
), JSON_UNESCAPED_UNICODE), $connection->socketID);
}
// {"event":"pusher_internal:subscription_succeeded","data":"{\"presence\":{\"count\":2,\"ids\":[\"1488465780\",\"14884657802\"],\"hash\":{\"1488465780\":{\"name\":\"123\",\"sex\":\"1\"},\"14884657802\":{\"name\":\"123\",\"sex\":\"1\"}}}}","channel":"presence-channel"}
$connection->send(json_encode(array(
'event' => 'pusher_internal:subscription_succeeded',
'data' => json_encode($presence_data),
'channel' => $channel
),JSON_UNESCAPED_UNICODE
));
}
public function getPresenceChannelDataForSubscribe($app_key, $channel)
{
$hash = [];
$count = 100;
if (isset($this->_globalData[$app_key][$channel])) {
foreach ($this->_globalData[$app_key][$channel]['users'] as $uid => $item) {
$hash[$uid] = $item['user_info'];
if ($count-- <= 0) {
break;
}
}
//$hash = array_slice($this->_globalData[$app_key][$channel]['users'], 0, 100, true);
}
return array(
'presence' => array(
'count' => count($this->_globalData[$app_key][$channel]['users']),
'ids' => array_keys($hash),
'hash' => $hash
)
);
}
/**
* 客户端取消订阅channel
*
* @param $connection
* @param $channel
*
* @return void
*/
public function unsubscribePublicChannel($connection, $channel) {
$app_key = $connection->appKey;
$this->_globalData[$app_key][$channel]['subscription_count']--;
if ($this->_globalData[$app_key][$channel]['subscription_count'] <= 0) {
unset($this->_globalData[$app_key][$channel]);
}
unset($connection->channels[$channel], $this->_eventClients[$connection->appKey][$channel][$connection->socketID]);
}
/**
* 客户端取消订阅channel
*
* @param $connection
* @param $channel
*
* @return void
*/
public function unsubscribePrivateChannel($connection, $channel) {
$this->unsubscribePublicChannel($connection, $channel);
}
/**
* 客户端取消订阅channel
*
* @param $connection
* @param $channel
*
* @return void
*/
public function unsubscribePresenceChannel($connection, $channel, $uid) {
$app_key = $connection->appKey;
$member_removed = false;
$this->_globalData[$app_key][$channel]['subscription_count']--;
if ($this->_globalData[$app_key][$channel]['subscription_count'] <= 0) {
unset($this->_globalData[$app_key][$channel]);
$member_removed = true;
} else {
if (!isset($this->_globalData[$app_key][$channel]['users'][$uid]['ref_count'])) {
error_log("\$this->_globalData[$app_key][$channel]['users'][$uid]['ref_count'] not exist\n");
return;
}
$this->_globalData[$app_key][$channel]['users'][$uid]['ref_count']--;
$ref_count = $this->_globalData[$app_key][$channel]['users'][$uid]['ref_count'];
if ($ref_count <= 0) {
unset($this->_globalData[$app_key][$channel]['users'][$uid]);
$member_removed = true;
}
}
if ($member_removed) {
// {"event":"pusher_internal:member_removed","data":"{\"user_id\":\"14884657801\"}","channel":"presence-channel"}
$this->publishToClients($app_key, $channel, 'pusher_internal:member_removed', json_encode(array('user_id'=>$uid), JSON_UNESCAPED_UNICODE));
}
unset($connection->channels[$channel], $this->_eventClients[$connection->appKey][$channel][$connection->socketID]);
}
/**
* 发布事件
*
* @param $data
*/
public function publishToClients($app_key, $channel, $event, $data, $socket_id = null)
{
if (!isset($this->_eventClients[$app_key][$channel])) {
return;
}
$data = json_encode(array(
'event' => $event,
'data' => $data,
'channel' => $channel
), JSON_UNESCAPED_UNICODE);
foreach ($this->_eventClients[$app_key][$channel] as $connection) {
if ($connection->socketID === $socket_id) {
continue;
}
$connection->clientNotSendPingCount = 0;
// {"event":"my-event","data":"{\"message\":\"hello world\"}","channel":"my-channel"}
$connection->send($data);
}
}
/**
* 检查心跳,将心跳超时的客户端关闭
*
* @return void
*/
public function checkHeartbeat()
{
foreach ($this->_allClients as $connection) {
if ($connection->clientNotSendPingCount > 1) {
$connection->destroy();
}
$connection->clientNotSendPingCount ++;
}
}
/**
* 创建一个全局的客户端id
*
* @param $connection
* @return string
*/
protected function createsocketID($connection)
{
$socket_id = "{$this->_globalID}.{$connection->id}";
return $socket_id;
}
/**
* 创建channel key用于监听分发给该channel的事件
*
* @param $app_key
* @param $channel
* @return string
*/
protected function createChannelKey($app_key, $channel)
{
return "$app_key:$channel";
}
/**
* POST /apps/1024/events?auth_key=b054014693241bcd9c26&auth_signature=ed7f5b604e6bbd21a888a861ed536a430a9d5e4df210937a241a811bd17fcf97&auth_timestamp=1487428415&auth_version=1.0&body_md5=15d251b35306a6da7efa515a0e971f80 HTTP/1.1
* {"name":"my-event","data":"{\"message\":\"hello world\"}","channels":["my-channel"]}
* {"name":"my-event","data":"{\"message\":\"haha\"}","channels":["my-channel"],"socket_id":"123.456"}
*
* GET /apps/1024/channels/my-channel?auth_key=b054014693241bcd9c26&auth_signature=5226650be00a064b417d50d49229e42bbb918e969c42e63aaa63b9d1c6cf9803&auth_timestamp=1489898340&auth_version=1.0
*
* GET /apps/1024/channels/presence-channel?auth_key=b054014693241bcd9c26&auth_signature=d46281bf69ccadfe9da270176c85daa88d4b9da55b1f3c2570d48fa1236f0b2c&auth_timestamp=1489903433&auth_version=1.0&info=subscription_count,user_count
*
* GET /apps/1024/channels/presence-channel/users?auth_key=b054014693241bcd9c26&auth_signature=2eee0ca6292e17b00484bdcb0bba686a47e8a7365a1b190248946182fc926309&auth_timestamp=1489904560&auth_version=1.0
*/
public function onApiClientMessage($connection, Request $request)
{
if (!($app_key = $request->get('auth_key'))) {
return $connection->send(new Response(400, [], 'Bad Request'));
}
if (!isset($this->appInfo[$app_key])) {
return $connection->send(new Response(401, [], 'Invalid app_key'));
}
$path = $request->path();
$explode = explode('/', trim($path, '/'));
if (count($explode) < 3) {
return $connection->send(new Response(400, [], 'Bad Request'));
}
$auth_signature = $request->get('auth_signature');
$params = $request->get();
unset($params['auth_signature']);
ksort($params);
$string_to_sign = $request->method()."\n" . $path . "\n" . self::array_implode('=', '&', $params);
$real_auth_signature = hash_hmac('sha256', $string_to_sign, $this->appInfo[$app_key]['app_secret'], false);
if ($auth_signature !== $real_auth_signature) {
return $connection->send(new Response(401, [], 'Invalid signature'));
}
$type = $explode[2];
switch ($type) {
case 'batch_events':
$packages = json_decode($request->rawBody(), true);
if (!$packages || !isset($packages['batch'])) {
return $connection->send(new Response(400, [], 'Bad request'));
}
$packages = $packages['batch'];
foreach ($packages as $package) {
$channel = $package['channel'];
$event = $package['name'];
$data = $package['data'];
$socket_id = isset($package['socket_id']) ? isset($package['socket_id']) : null;
$this->publishToClients($app_key, $channel, $event, $data, $socket_id);
}
return $connection->send('{}');
break;
case 'events':
$package = json_decode($request->rawBody(), true);
if (!$package) {
return $connection->send(new Response(401, [], 'Invalid signature'));
}
$channels = $package['channels'];
$event = $package['name'];
$data = $package['data'];
foreach ($channels as $channel) {
$socket_id = isset($package['socket_id']) ? isset($package['socket_id']) : null;
$this->publishToClients($app_key, $channel, $event, $data, $socket_id);
}
return $connection->send('{}');
case 'channels':
// info
$request_info = explode(',', $request->get('info', ''));
if (!isset($explode[3])) {
$channels = [];
$prefix = $request->get('filter_by_prefix');
$return_subscription_count = in_array('subscription_count', $request_info);
foreach ($this->_globalData[$app_key] ?? [] as $channel => $item) {
if ($prefix !== null) {
if (strpos($channel, $prefix) !== 0) {
continue;
}
}
$channels[$channel] = [];
if ($return_subscription_count) {
$channels[$channel]['subscription_count'] = $item['subscription_count'];
}
}
return $connection->send(json_encode(['channels' => $channels], JSON_UNESCAPED_UNICODE));
}
$channel = $explode[3];
// users
if (isset($explode[4])) {
if ($explode[4] !== 'users') {
return $connection->send(new Response(400, [], 'Bad Request'));
}
$id_array = isset($this->_globalData[$app_key][$channel]['users']) ?
array_keys($this->_globalData[$app_key][$channel]['users']) : array();
$user_id_array = array();
foreach ($id_array as $id) {
$user_id_array[] = array('id' => $id);
}
$connection->send(json_encode($user_id_array, JSON_UNESCAPED_UNICODE));
}
$occupied = isset($this->_globalData[$app_key][$channel]);
$user_count = isset($this->_globalData[$app_key][$channel]['users']) ? count($this->_globalData[$app_key][$channel]['users']) : 0;
$subscription_count = $occupied ? $this->_globalData[$app_key][$channel]['subscription_count'] : 0;
$channel_info = array(
'occupied' => $occupied
);
foreach ($request_info as $name) {
switch ($name) {
case 'user_count':
$channel_info['user_count'] = $user_count;
break;
case 'subscription_count':
$channel_info['subscription_count'] = $subscription_count;
break;
}
}
$connection->send(json_encode($channel_info, JSON_UNESCAPED_UNICODE));
break;
default:
return $connection->send(new Response(400, [], 'Bad Request'));
}
}
public function webHookCheck()
{
$channel_events = [];
$user_events = [];
$all_app_keys = array_unique(array_merge(array_keys($this->_globalData), array_keys($this->_globalDataSnapshot)));
foreach($all_app_keys as $app_key)
{
if (empty($this->appInfo[$app_key])) {
continue;
}
$snapshot_items = isset($this->_globalDataSnapshot[$app_key]) ? $this->_globalDataSnapshot[$app_key] : [];
$items = isset($this->_globalData[$app_key]) ? $this->_globalData[$app_key] : [];
$channels_added = array_diff_key($items, $snapshot_items);
$channels_removed = array_diff_key($snapshot_items, $items);
if ($channels_added) {
$channel_events[$app_key]['channels_added'] = array_keys($channels_added);
}
if ($channels_removed) {
$channel_events[$app_key]['channels_removed'] = array_keys($channels_removed);
}
$all_channels = [];
foreach ($items as $channel => $foo) {
if ($foo['type'] === 'presence') {
$all_channels[$channel] = $channel;
}
}
foreach ($snapshot_items as $channel => $foo) {
if ($foo['type'] === 'presence' && !isset($all_channels[$channel])) {
$all_channels[$channel] = $channel;
}
}
foreach ($all_channels as $channel) {
$user_array_snapshot = isset($snapshot_items[$channel]['users']) ? $snapshot_items[$channel]['users'] : [];
$user_array = isset($items[$channel]['users']) ? $items[$channel]['users'] : [];
$user_added = array_diff_key($user_array, $user_array_snapshot);
$user_removed = array_diff_key($user_array_snapshot, $user_array);
if ($user_added) {
$user_events[$app_key][$channel]['user_added'] = array_keys($user_added);
}
if ($user_removed) {
$user_events[$app_key][$channel]['user_removed'] = array_keys($user_removed);
}
}
}
$this->_globalDataSnapshot = $this->_globalData;
$this->webHookSend(array('channel_events' => $channel_events, 'user_events' => $user_events));
}
protected function webHookSend($data)
{
$channel_events = $data['channel_events'];
$user_events = $data['user_events'];
$time_ms = microtime(true);
foreach ($user_events as $app_key => $items) {
// 没设置user_event回调则忽略
if (empty($this->appInfo[$app_key]['user_hook'])) {
continue;
}
// {"time_ms":1494300453609,"events":[{"channel":"presence-channel2","user_id":"59094971a","name":"member_added"}]}
$http_events_body = array(
'time_ms' => $time_ms,
'events' => []
);
foreach ($items as $channel => $item) {
if (isset($item['user_added'])) {
foreach ($item['user_added'] as $user_id) {
$http_events_body['events'][] = array(
'channel' => $channel,
'user_id' => $user_id,
'name' => 'user_added'
);
}
}
}
foreach ($items as $channel => $item) {
if (isset($item['user_removed'])) {
foreach ($item['user_removed'] as $user_id) {
$http_events_body['events'][] = array(
'channel' => $channel,
'user_id' => $user_id,
'name' => 'user_removed'
);
}
}
}
if ($http_events_body['events']) {
$this->sendHttpRequest($this->appInfo[$app_key]['user_hook'],
$app_key,
$this->appInfo[$app_key]['app_secret'],
json_encode($http_events_body, JSON_UNESCAPED_UNICODE));
}
}
foreach ($channel_events as $app_key => $item) {
// 没设置channel_event回调则忽略
if (empty($this->appInfo[$app_key]['channel_hook'])) {
continue;
}
// {"time_ms":1494300446592,"events":[{"channel":"presence-channel2","name":"channel_added"}]}
$http_events_body = array(
'time_ms' => $time_ms,
'events' => []
);
if (isset($item['channels_added'])) {
foreach ($item['channels_added'] as $channel) {
$http_events_body['events'][] = array(
'channel' => $channel,
'name' => 'channel_added'
);
}
}
if (isset($item['channels_removed'])) {
foreach ($item['channels_removed'] as $channel) {
$http_events_body['events'][] = array(
'channel' => $channel,
'name' => 'channel_removed'
);
}
}
if ($http_events_body['events']) {
$this->sendHttpRequest($this->appInfo[$app_key]['channel_hook'],
$app_key,
$this->appInfo[$app_key]['app_secret'],
json_encode($http_events_body, JSON_UNESCAPED_UNICODE));
}
}
}
protected function sendHttpRequest($address, $app_key, $secret, $body, $redirect_count = 0)
{
$address_info = parse_url($address);
if (!$address_info) {
echo new \Exception('bad remote_address');
return false;
}
$scheme = isset($address_info['scheme']) && $address_info['scheme'] === 'https' ? 'ssl' : 'tcp';
if (!isset($address_info['port'])) {
$address_info['port'] = $scheme == 'ssl' ? 443 : 80;
}
if (!isset($address_info['path'])) {
$address_info['path'] = '/';
}
if (!isset($address_info['query'])) {
$address_info['query'] = '';
} else {
$address_info['query'] = '?' . $address_info['query'];
}
$remote_address = "{$address_info['host']}:{$address_info['port']}";
$remote_host = $address_info['host'];
$remote_URI = "{$address_info['path']}{$address_info['query']}";
$signature = hash_hmac('sha256', $body, $secret, false);
$base_url = $scheme == 'ssl' ? "https://$remote_address/" : "http://$remote_address/";
$header = "POST $remote_URI HTTP/1.0\r\n";
$header .= "Host: $remote_host\r\n";
$header .= "Connection: close\r\n";
$header .= "X-Pusher-Key: $app_key\r\n";
$header .= "X-Pusher-Signature: $signature\r\n";
$header .= "Content-Type: application/json\r\n";
$header .= "Content-Length: " . strlen($body);
$http_buffer = $header . "\r\n\r\n" . $body;
$client = new AsyncTcpConnection('tcp://' . $remote_address);
if ($scheme == 'ssl') {
$client->transport = 'ssl';
}
$client->onConnect = function ($client) use ($http_buffer) {
$client->send($http_buffer);
};
$client->onMessage = function ($client, $buffer) use ($address, $app_key, $secret, $body, $base_url, $redirect_count) {
$client->close();
if (!preg_match("/HTTP\/1\.\d (\d*?) .*?\r\n/", $buffer, $match)) {
echo "http code not found $buffer\n";
return;
}
$http_code = $match[1];
$base_code = intval($http_code / 100);
if ($base_code == 3 && preg_match("/Location: (.*?)\r\n/", $buffer, $match)) {
if (++$redirect_count > 3) {
$msg = date('Y-m-d H:i:s') . "\nURL:$address\nAPP_KEY:$app_key ERR:too many redirect\n$buffer";
echo $msg;
return;
}
$location = $match[1];
if (strpos($location, 'http://') === 0 || strpos($location, 'https://') === 0) {
$this->sendHttpRequest($location, $app_key, $secret, $body, $redirect_count);
} else {
$this->sendHttpRequest($base_url . $location, $app_key, $secret, $body, $redirect_count);
}
}
if ($base_code !== 2) {
$msg = date('Y-m-d H:i:s') . "\nURL:$address\nAPP_KEY:$app_key\n$buffer";
echo $msg;
}
};
Timer::add(10, array($client, 'close'), null, false);
$client->connect();
}
/**
* array_implode
*
* @param $glue
* @param $separator
* @param $array
* @return string
*/
public static function array_implode($glue, $separator, $array)
{
if (!is_array($array)) {
return $array;
}
$string = array();
foreach ($array as $key => $val) {
if (is_array($val)) {
$val = implode(',', $val);
}
$string[] = "{$key}{$glue}{$val}";
}
return implode($separator, $string);
}
}

View File

@ -0,0 +1,10 @@
<?php
return [
'enable' => true,
'websocket' => 'websocket://0.0.0.0:3131',
'api' => 'http://0.0.0.0:3232',
'app_key' => '40130278692df9bd1a77f3ff6d64edb6',
'app_secret' => '526598e8713e4ed50ea4cd8ff5e77f23',
'channel_hook' => 'http://127.0.0.1:8787/plugin/webman/push/hook',
'auth' => '/plugin/webman/push/auth'
];

View File

@ -0,0 +1,21 @@
<?php
use Webman\Push\Server;
return [
'server' => [
'handler' => Server::class,
'listen' => config('plugin.webman.push.app.websocket'),
'count' => 1, // 必须是1
'reloadable' => false, // 执行reload不重启
'constructor' => [
'api_listen' => config('plugin.webman.push.app.api'),
'app_info' => [
config('plugin.webman.push.app.app_key') => [
'channel_hook' => config('plugin.webman.push.app.channel_hook'),
'app_secret' => config('plugin.webman.push.app.app_secret'),
],
]
]
]
];

View File

@ -0,0 +1,87 @@
<?php
/**
* This file is part of webman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
use support\Request;
use Webman\Route;
use Webman\Push\Api;
/**
* 推送js客户端文件
*/
Route::get('/plugin/webman/push/push.js', function (Request $request) {
return response()->file(base_path().'/vendor/webman/push/src/push.js');
});
/**
* 私有频道鉴权这里应该使用session辨别当前用户身份然后确定该用户是否有权限监听channel_name
*/
Route::post(config('plugin.webman.push.app.auth'), function (Request $request) {
$pusher = new Api(str_replace('0.0.0.0', '127.0.0.1', config('plugin.webman.push.app.api')), config('plugin.webman.push.app.app_key'), config('plugin.webman.push.app.app_secret'));
$channel_name = $request->post('channel_name');
$session = $request->session();
// 这里应该通过session和channel_name判断当前用户是否有权限监听channel_name
$has_authority = true;
if ($has_authority) {
return response($pusher->socketAuth($channel_name, $request->post('socket_id')));
} else {
return response('Forbidden', 403);
}
});
/**
* 当频道上线以及下线时触发的回调
* 频道上线:是指某个频道从没有连接在线到有连接在线的事件
* 频道下线:是指某个频道的所有连接都断开触发的事件
*/
Route::post(parse_url(config('plugin.webman.push.app.channel_hook'), PHP_URL_PATH), function (Request $request) {
// 没有x-pusher-signature头视为伪造请求
if (!$webhook_signature = $request->header('x-pusher-signature')) {
return response('401 Not authenticated', 401);
}
$body = $request->rawBody();
// 计算签名,$app_secret 是双方使用的密钥,是保密的,外部无从得知
$expected_signature = hash_hmac('sha256', $body, config('plugin.webman.push.app.app_secret'), false);
// 安全校验如果签名不一致可能是伪造的请求返回401状态码
if ($webhook_signature !== $expected_signature) {
return response('401 Not authenticated', 401);
}
// 这里存储这上线 下线的channel数据
$payload = json_decode($body, true);
$channels_online = $channels_offline = [];
foreach ($payload['events'] as $event) {
if ($event['name'] === 'channel_added') {
$channels_online[] = $event['channel'];
} else if ($event['name'] === 'channel_removed') {
$channels_offline[] = $event['channel'];
}
}
// 业务根据需要处理上下线的channel例如将在线状态写入数据库通知其它channel等
// 上线的所有channel
echo 'online channels: ' . implode(',', $channels_online) . "\n";
// 下线的所有channel
echo 'offline channels: ' . implode(',', $channels_offline) . "\n";
return 'OK';
});

827
vendor/webman/push/src/push-uniapp.js vendored Normal file
View File

@ -0,0 +1,827 @@
function Push(options) {
this.doNotConnect = 0;
options = options || {};
options.heartbeat = options.heartbeat || 25000;
options.pingTimeout = options.pingTimeout || 10000;
this.config = options;
this.uid = 0;
this.channels = {};
this.connection = null;
this.pingTimeoutTimer = 0;
Push.instances.push(this);
this.createConnection();
}
Push.prototype.checkoutPing = function() {
var _this = this;
_this.checkoutPingTimer && clearTimeout(_this.checkoutPingTimer);
_this.checkoutPingTimer = setTimeout(function () {
_this.checkoutPingTimer = 0;
if (_this.connection.state === 'connected') {
_this.connection.send('{"event":"pusher:ping","data":{}}');
if (_this.pingTimeoutTimer) {
clearTimeout(_this.pingTimeoutTimer);
_this.pingTimeoutTimer = 0;
}
_this.pingTimeoutTimer = setTimeout(function () {
_this.connection.closeAndClean();
if (!_this.connection.doNotConnect) {
_this.connection.waitReconnect();
}
}, _this.config.pingTimeout);
}
}, this.config.heartbeat);
};
Push.prototype.channel = function (name) {
return this.channels.find(name);
};
Push.prototype.allChannels = function () {
return this.channels.all();
};
Push.prototype.createConnection = function () {
if (this.connection) {
throw Error('Connection already exist');
}
var _this = this;
var url = this.config.url;
function updateSubscribed () {
for (var i in _this.channels) {
_this.channels[i].subscribed = false;
}
}
this.connection = new Connection({
url: url,
app_key: this.config.app_key,
onOpen: function () {
_this.connection.state = 'connecting';
_this.checkoutPing();
},
onMessage: function(params) {
if(_this.pingTimeoutTimer) {
clearTimeout(_this.pingTimeoutTimer);
_this.pingTimeoutTimer = 0;
}
params = JSON.parse(params.data);
var event = params.event;
var channel_name = params.channel;
if (event === 'pusher:pong') {
_this.checkoutPing();
return;
}
if (event === 'pusher:error') {
throw Error(params.data.message);
}
var data = JSON.parse(params.data), channel;
if (event === 'pusher_internal:subscription_succeeded') {
channel = _this.channels[channel_name];
channel.subscribed = true;
channel.processQueue();
channel.emit('pusher:subscription_succeeded');
return;
}
if (event === 'pusher:connection_established') {
_this.connection.socket_id = data.socket_id;
_this.connection.updateNetworkState('connected');
_this.subscribeAll();
}
if (event.indexOf('pusher_internal') !== -1) {
console.log("Event '"+event+"' not implement");
return;
}
channel = _this.channels[channel_name];
if (channel) {
channel.emit(event, data);
}
},
onClose: function () {
updateSubscribed();
},
onError: function () {
updateSubscribed();
}
});
};
Push.prototype.disconnect = function () {
this.connection.doNotConnect = 1;
this.connection.close();
};
Push.prototype.subscribeAll = function () {
if (this.connection.state !== 'connected') {
return;
}
for (var channel_name in this.channels) {
//this.connection.send(JSON.stringify({event:"pusher:subscribe", data:{channel:channel_name}}));
this.channels[channel_name].processSubscribe();
}
};
Push.prototype.unsubscribe = function (channel_name) {
if (this.channels[channel_name]) {
delete this.channels[channel_name];
if (this.connection.state === 'connected') {
this.connection.send(JSON.stringify({event:"pusher:unsubscribe", data:{channel:channel_name}}));
}
}
};
Push.prototype.unsubscribeAll = function () {
var channels = Object.keys(this.channels);
if (channels.length) {
if (this.connection.state === 'connected') {
for (var channel_name in this.channels) {
this.unsubscribe(channel_name);
}
}
}
this.channels = {};
};
Push.prototype.subscribe = function (channel_name) {
if (this.channels[channel_name]) {
return this.channels[channel_name];
}
if (channel_name.indexOf('private-') === 0) {
return createPrivateChannel(channel_name, this);
}
if (channel_name.indexOf('presence-') === 0) {
return createPresenceChannel(channel_name, this);
}
return createChannel(channel_name, this);
};
Push.instances = [];
function createChannel(channel_name, push)
{
var channel = new Channel(push.connection, channel_name);
push.channels[channel_name] = channel;
channel.subscribeCb = function () {
push.connection.send(JSON.stringify({event:"pusher:subscribe", data:{channel:channel_name}}));
}
channel.processSubscribe();
return channel;
}
function createPrivateChannel(channel_name, push)
{
var channel = new Channel(push.connection, channel_name);
push.channels[channel_name] = channel;
channel.subscribeCb = function () {
__ajax({
url: push.config.auth,
type: 'POST',
data: {channel_name: channel_name, socket_id: push.connection.socket_id},
success: function (data) {
data = JSON.parse(data);
data.channel = channel_name;
push.connection.send(JSON.stringify({event:"pusher:subscribe", data:data}));
},
error: function (e) {
throw Error(e);
}
});
};
channel.processSubscribe();
return channel;
}
function createPresenceChannel(channel_name, push)
{
return createPrivateChannel(channel_name, push);
}
uni.onNetworkStatusChange(function (res) {
if(res.isConnected) {
for (var i in Push.instances) {
con = Push.instances[i].connection;
con.reconnectInterval = 1;
if (con.state === 'connecting') {
con.connect();
}
}
}
});
function Connection(options) {
this.dispatcher = new Dispatcher();
__extends(this, this.dispatcher);
var properies = ['on', 'off', 'emit'];
for (var i in properies) {
this[properies[i]] = this.dispatcher[properies[i]];
}
this.options = options;
this.state = 'initialized'; //initialized connecting connected disconnected
this.doNotConnect = 0;
this.reconnectInterval = 1;
this.connection = null;
this.reconnectTimer = 0;
this.connect();
}
Connection.prototype.updateNetworkState = function(state){
var old_state = this.state;
this.state = state;
if (old_state !== state) {
this.emit('state_change', { previous: old_state, current: state });
}
};
Connection.prototype.connect = function () {
this.doNotConnect = 0;
if (this.networkState == 'connecting' || this.networkState == 'established') {
console.log('networkState is ' + this.networkState + ' and do not need connect');
return;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = 0;
}
this.closeAndClean();
var options = this.options;
var _this = this;
_this.updateNetworkState('connecting');
var cb = function(){
uni.onSocketOpen(function (res) {
_this.reconnectInterval = 1;
if (_this.doNotConnect) {
_this.updateNetworkState('closing');
uni.closeSocket();
return;
}
_this.updateNetworkState('established');
if (options.onOpen) {
options.onOpen(res);
}
});
if (options.onMessage) {
uni.onSocketMessage(options.onMessage);
}
uni.onSocketClose(function (res) {
_this.updateNetworkState('disconnected');
if (!_this.doNotConnect) {
_this.waitReconnect();
}
if (options.onClose) {
options.onClose(res);
}
});
uni.onSocketError(function (res) {
_this.close();
if (!_this.doNotConnect) {
_this.waitReconnect();
}
if (options.onError) {
options.onError(res);
}
});
};
uni.connectSocket({
url: options.url,
fail: function (res) {
console.log('uni.connectSocket fail');
console.log(res);
_this.updateNetworkState('disconnected');
_this.waitReconnect();
},
success: function() {
}
});
cb();
}
Connection.prototype.connect = function () {
this.doNotConnect = 0;
if (this.state === 'connected') {
console.log('networkState is "' + this.state + '" and do not need connect');
return;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = 0;
}
this.closeAndClean();
var options = this.options;
this.updateNetworkState('connecting');
var _this = this;
var cb = function(){
uni.onSocketOpen(function (res) {
_this.reconnectInterval = 1;
if (_this.doNotConnect) {
_this.updateNetworkState('disconnected');
uni.closeSocket();
return;
}
if (options.onOpen) {
options.onOpen(res);
}
});
if (options.onMessage) {
uni.onSocketMessage(options.onMessage);
}
uni.onSocketClose(function (res) {
_this.updateNetworkState('disconnected');
if (!_this.doNotConnect) {
_this.waitReconnect();
}
if (options.onClose) {
options.onClose(res);
}
});
uni.onSocketError(function (res) {
_this.close();
if (!_this.doNotConnect) {
_this.waitReconnect();
}
if (options.onError) {
options.onError(res);
}
});
};
uni.connectSocket({
url: options.url+'/app/'+options.app_key,
fail: function (res) {
console.log('uni.connectSocket fail');
console.log(res);
_this.updateNetworkState('disconnected');
_this.waitReconnect();
},
success: function() {
}
});
cb();
}
Connection.prototype.closeAndClean = function () {
if (this.state === 'connected') {
uni.closeSocket();
}
this.updateNetworkState('disconnected');
};
Connection.prototype.waitReconnect = function () {
if (this.state === 'connected' || this.state === 'connecting') {
return;
}
if (!this.doNotConnect) {
this.updateNetworkState('connecting');
var _this = this;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = setTimeout(function(){
_this.connect();
}, this.reconnectInterval);
if (this.reconnectInterval < 1000) {
this.reconnectInterval = 1000;
} else {
// 每次重连间隔增大一倍
this.reconnectInterval = this.reconnectInterval * 2;
}
// 有网络的状态下重连间隔最大2秒
if (this.reconnectInterval > 2000 && navigator.onLine) {
_this.reconnectInterval = 2000;
}
}
}
Connection.prototype.send = function(data) {
if (this.state !== 'connected') {
console.trace('networkState is "' + this.state + '", can not send ' + data);
return;
}
uni.sendSocketMessage({
data: data
});
}
Connection.prototype.close = function(){
this.updateNetworkState('disconnected');
uni.closeSocket();
}
var __extends = (this && this.__extends) || function (d, b) {
for (var p in b) if (b.hasOwnProperty(p)) {d[p] = b[p];}
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
function Channel(connection, channel_name) {
this.subscribed = false;
this.dispatcher = new Dispatcher();
this.connection = connection;
this.channelName = channel_name;
this.subscribeCb = null;
this.queue = [];
__extends(this, this.dispatcher);
var properies = ['on', 'off', 'emit'];
for (var i in properies) {
this[properies[i]] = this.dispatcher[properies[i]];
}
}
Channel.prototype.processSubscribe = function () {
if (this.connection.state !== 'connected') {
return;
}
this.subscribeCb();
};
Channel.prototype.processQueue = function () {
if (this.connection.state !== 'connected' || !this.subscribed) {
return;
}
for (var i in this.queue) {
this.queue[i]();
}
this.queue = [];
};
Channel.prototype.trigger = function (event, data) {
if (event.indexOf('client-') !== 0) {
throw new Error("Event '" + event + "' should start with 'client-'");
}
var _this = this;
this.queue.push(function () {
_this.connection.send(JSON.stringify({ event: event, data: data, channel: _this.channelName }));
});
this.processQueue();
};
////////////////
var Collections = (function () {
var exports = {};
function extend(target) {
var sources = [];
for (var _i = 1; _i < arguments.length; _i++) {
sources[_i - 1] = arguments[_i];
}
for (var i = 0; i < sources.length; i++) {
var extensions = sources[i];
for (var property in extensions) {
if (extensions[property] && extensions[property].constructor &&
extensions[property].constructor === Object) {
target[property] = extend(target[property] || {}, extensions[property]);
}
else {
target[property] = extensions[property];
}
}
}
return target;
}
exports.extend = extend;
function stringify() {
var m = ["Push"];
for (var i = 0; i < arguments.length; i++) {
if (typeof arguments[i] === "string") {
m.push(arguments[i]);
}
else {
m.push(safeJSONStringify(arguments[i]));
}
}
return m.join(" : ");
}
exports.stringify = stringify;
function arrayIndexOf(array, item) {
var nativeIndexOf = Array.prototype.indexOf;
if (array === null) {
return -1;
}
if (nativeIndexOf && array.indexOf === nativeIndexOf) {
return array.indexOf(item);
}
for (var i = 0, l = array.length; i < l; i++) {
if (array[i] === item) {
return i;
}
}
return -1;
}
exports.arrayIndexOf = arrayIndexOf;
function objectApply(object, f) {
for (var key in object) {
if (Object.prototype.hasOwnProperty.call(object, key)) {
f(object[key], key, object);
}
}
}
exports.objectApply = objectApply;
function keys(object) {
var keys = [];
objectApply(object, function (_, key) {
keys.push(key);
});
return keys;
}
exports.keys = keys;
function values(object) {
var values = [];
objectApply(object, function (value) {
values.push(value);
});
return values;
}
exports.values = values;
function apply(array, f, context) {
for (var i = 0; i < array.length; i++) {
f.call(context || (window), array[i], i, array);
}
}
exports.apply = apply;
function map(array, f) {
var result = [];
for (var i = 0; i < array.length; i++) {
result.push(f(array[i], i, array, result));
}
return result;
}
exports.map = map;
function mapObject(object, f) {
var result = {};
objectApply(object, function (value, key) {
result[key] = f(value);
});
return result;
}
exports.mapObject = mapObject;
function filter(array, test) {
test = test || function (value) {
return !!value;
};
var result = [];
for (var i = 0; i < array.length; i++) {
if (test(array[i], i, array, result)) {
result.push(array[i]);
}
}
return result;
}
exports.filter = filter;
function filterObject(object, test) {
var result = {};
objectApply(object, function (value, key) {
if ((test && test(value, key, object, result)) || Boolean(value)) {
result[key] = value;
}
});
return result;
}
exports.filterObject = filterObject;
function flatten(object) {
var result = [];
objectApply(object, function (value, key) {
result.push([key, value]);
});
return result;
}
exports.flatten = flatten;
function any(array, test) {
for (var i = 0; i < array.length; i++) {
if (test(array[i], i, array)) {
return true;
}
}
return false;
}
exports.any = any;
function all(array, test) {
for (var i = 0; i < array.length; i++) {
if (!test(array[i], i, array)) {
return false;
}
}
return true;
}
exports.all = all;
function encodeParamsObject(data) {
return mapObject(data, function (value) {
if (typeof value === "object") {
value = safeJSONStringify(value);
}
return encodeURIComponent(base64_1["default"](value.toString()));
});
}
exports.encodeParamsObject = encodeParamsObject;
function buildQueryString(data) {
var params = filterObject(data, function (value) {
return value !== undefined;
});
return map(flatten(encodeParamsObject(params)), util_1["default"].method("join", "=")).join("&");
}
exports.buildQueryString = buildQueryString;
function decycleObject(object) {
var objects = [], paths = [];
return (function derez(value, path) {
var i, name, nu;
switch (typeof value) {
case 'object':
if (!value) {
return null;
}
for (i = 0; i < objects.length; i += 1) {
if (objects[i] === value) {
return {$ref: paths[i]};
}
}
objects.push(value);
paths.push(path);
if (Object.prototype.toString.apply(value) === '[object Array]') {
nu = [];
for (i = 0; i < value.length; i += 1) {
nu[i] = derez(value[i], path + '[' + i + ']');
}
}
else {
nu = {};
for (name in value) {
if (Object.prototype.hasOwnProperty.call(value, name)) {
nu[name] = derez(value[name], path + '[' + JSON.stringify(name) + ']');
}
}
}
return nu;
case 'number':
case 'string':
case 'boolean':
return value;
}
}(object, '$'));
}
exports.decycleObject = decycleObject;
function safeJSONStringify(source) {
try {
return JSON.stringify(source);
}
catch (e) {
return JSON.stringify(decycleObject(source));
}
}
exports.safeJSONStringify = safeJSONStringify;
return exports;
})();
var Dispatcher = (function () {
function Dispatcher(failThrough) {
this.callbacks = new CallbackRegistry();
this.global_callbacks = [];
this.failThrough = failThrough;
}
Dispatcher.prototype.on = function (eventName, callback, context) {
this.callbacks.add(eventName, callback, context);
return this;
};
Dispatcher.prototype.on_global = function (callback) {
this.global_callbacks.push(callback);
return this;
};
Dispatcher.prototype.off = function (eventName, callback, context) {
this.callbacks.remove(eventName, callback, context);
return this;
};
Dispatcher.prototype.emit = function (eventName, data) {
var i;
for (i = 0; i < this.global_callbacks.length; i++) {
this.global_callbacks[i](eventName, data);
}
var callbacks = this.callbacks.get(eventName);
if (callbacks && callbacks.length > 0) {
for (i = 0; i < callbacks.length; i++) {
callbacks[i].fn.call(callbacks[i].context || (window), data);
}
}
else if (this.failThrough) {
this.failThrough(eventName, data);
}
return this;
};
return Dispatcher;
}());
var CallbackRegistry = (function () {
function CallbackRegistry() {
this._callbacks = {};
}
CallbackRegistry.prototype.get = function (name) {
return this._callbacks[prefix(name)];
};
CallbackRegistry.prototype.add = function (name, callback, context) {
var prefixedEventName = prefix(name);
this._callbacks[prefixedEventName] = this._callbacks[prefixedEventName] || [];
this._callbacks[prefixedEventName].push({
fn: callback,
context: context
});
};
CallbackRegistry.prototype.remove = function (name, callback, context) {
if (!name && !callback && !context) {
this._callbacks = {};
return;
}
var names = name ? [prefix(name)] : Collections.keys(this._callbacks);
if (callback || context) {
this.removeCallback(names, callback, context);
}
else {
this.removeAllCallbacks(names);
}
};
CallbackRegistry.prototype.removeCallback = function (names, callback, context) {
Collections.apply(names, function (name) {
this._callbacks[name] = Collections.filter(this._callbacks[name] || [], function (oning) {
return (callback && callback !== oning.fn) ||
(context && context !== oning.context);
});
if (this._callbacks[name].length === 0) {
delete this._callbacks[name];
}
}, this);
};
CallbackRegistry.prototype.removeAllCallbacks = function (names) {
Collections.apply(names, function (name) {
delete this._callbacks[name];
}, this);
};
return CallbackRegistry;
}());
function prefix(name) {
return "_" + name;
}
function __ajax(options){
options=options||{};
options.type=(options.type||'GET').toUpperCase();
options.dataType=options.dataType||'json';
params=formatParams(options.data);
var xhr;
if(window.XMLHttpRequest){
xhr=new XMLHttpRequest();
}else{
xhr=ActiveXObject('Microsoft.XMLHTTP');
}
xhr.onreadystatechange=function(){
if(xhr.readyState === 4){
var status=xhr.status;
if(status>=200 && status<300){
options.success&&options.success(xhr.responseText,xhr.responseXML);
}else{
options.error&&options.error(status);
}
}
}
if(options.type==='GET'){
xhr.open('GET',options.url+'?'+params,true);
xhr.send(null);
}else if(options.type==='POST'){
xhr.open('POST',options.url,true);
xhr.setRequestHeader("Content-Type", "application/x-www-form-urlencoded");
xhr.send(params);
}
}
function formatParams(data){
var arr=[];
for(var name in data){
arr.push(encodeURIComponent(name)+'='+encodeURIComponent(data[name]));
}
return arr.join('&');
}
export default Push

746
vendor/webman/push/src/push.js vendored Normal file
View File

@ -0,0 +1,746 @@
function Push(options) {
this.doNotConnect = 0;
options = options || {};
options.heartbeat = options.heartbeat || 25000;
options.pingTimeout = options.pingTimeout || 10000;
this.config = options;
this.uid = 0;
this.channels = {};
this.connection = null;
this.pingTimeoutTimer = 0;
Push.instances.push(this);
this.createConnection();
}
Push.prototype.checkoutPing = function() {
var _this = this;
_this.checkoutPingTimer && clearTimeout(_this.checkoutPingTimer);
_this.checkoutPingTimer = setTimeout(function () {
_this.checkoutPingTimer = 0;
if (_this.connection.state === 'connected') {
_this.connection.send('{"event":"pusher:ping","data":{}}');
if (_this.pingTimeoutTimer) {
clearTimeout(_this.pingTimeoutTimer);
_this.pingTimeoutTimer = 0;
}
_this.pingTimeoutTimer = setTimeout(function () {
_this.connection.closeAndClean();
if (!_this.connection.doNotConnect) {
_this.connection.waitReconnect();
}
}, _this.config.pingTimeout);
}
}, this.config.heartbeat);
};
Push.prototype.channel = function (name) {
return this.channels.find(name);
};
Push.prototype.allChannels = function () {
return this.channels.all();
};
Push.prototype.createConnection = function () {
if (this.connection) {
throw Error('Connection already exist');
}
var _this = this;
var url = this.config.url;
function updateSubscribed () {
for (var i in _this.channels) {
_this.channels[i].subscribed = false;
}
}
this.connection = new Connection({
url: url,
app_key: this.config.app_key,
onOpen: function () {
_this.connection.state ='connecting';
_this.checkoutPing();
},
onMessage: function(params) {
if(_this.pingTimeoutTimer) {
clearTimeout(_this.pingTimeoutTimer);
_this.pingTimeoutTimer = 0;
}
params = JSON.parse(params.data);
var event = params.event;
var channel_name = params.channel;
if (event === 'pusher:pong') {
_this.checkoutPing();
return;
}
if (event === 'pusher:error') {
throw Error(params.data.message);
}
var data = JSON.parse(params.data), channel;
if (event === 'pusher_internal:subscription_succeeded') {
channel = _this.channels[channel_name];
channel.subscribed = true;
channel.processQueue();
channel.emit('pusher:subscription_succeeded');
return;
}
if (event === 'pusher:connection_established') {
_this.connection.socket_id = data.socket_id;
_this.connection.updateNetworkState('connected');
_this.subscribeAll();
}
if (event.indexOf('pusher_internal') !== -1) {
console.log("Event '"+event+"' not implement");
return;
}
channel = _this.channels[channel_name];
if (channel) {
channel.emit(event, data);
}
},
onClose: function () {
updateSubscribed();
},
onError: function () {
updateSubscribed();
}
});
};
Push.prototype.disconnect = function () {
this.connection.doNotConnect = 1;
this.connection.close();
};
Push.prototype.subscribeAll = function () {
if (this.connection.state !== 'connected') {
return;
}
for (var channel_name in this.channels) {
//this.connection.send(JSON.stringify({event:"pusher:subscribe", data:{channel:channel_name}}));
this.channels[channel_name].processSubscribe();
}
};
Push.prototype.unsubscribe = function (channel_name) {
if (this.channels[channel_name]) {
delete this.channels[channel_name];
if (this.connection.state === 'connected') {
this.connection.send(JSON.stringify({event:"pusher:unsubscribe", data:{channel:channel_name}}));
}
}
};
Push.prototype.unsubscribeAll = function () {
var channels = Object.keys(this.channels);
if (channels.length) {
if (this.connection.state === 'connected') {
for (var channel_name in this.channels) {
this.unsubscribe(channel_name);
}
}
}
this.channels = {};
};
Push.prototype.subscribe = function (channel_name) {
if (this.channels[channel_name]) {
return this.channels[channel_name];
}
if (channel_name.indexOf('private-') === 0) {
return createPrivateChannel(channel_name, this);
}
if (channel_name.indexOf('presence-') === 0) {
return createPresenceChannel(channel_name, this);
}
return createChannel(channel_name, this);
};
Push.instances = [];
function createChannel(channel_name, push)
{
var channel = new Channel(push.connection, channel_name);
push.channels[channel_name] = channel;
channel.subscribeCb = function () {
push.connection.send(JSON.stringify({event:"pusher:subscribe", data:{channel:channel_name}}));
}
channel.processSubscribe();
return channel;
}
function createPrivateChannel(channel_name, push)
{
var channel = new Channel(push.connection, channel_name);
push.channels[channel_name] = channel;
channel.subscribeCb = function () {
__ajax({
url: push.config.auth,
type: 'POST',
data: {channel_name: channel_name, socket_id: push.connection.socket_id},
success: function (data) {
data = JSON.parse(data);
data.channel = channel_name;
push.connection.send(JSON.stringify({event:"pusher:subscribe", data:data}));
},
error: function (e) {
throw Error(e);
}
});
};
channel.processSubscribe();
return channel;
}
function createPresenceChannel(channel_name, push)
{
return createPrivateChannel(channel_name, push);
}
/*window.addEventListener('online', function(){
var con;
for (var i in Push.instances) {
con = Push.instances[i].connection;
con.reconnectInterval = 1;
if (con.state === 'connecting') {
con.connect();
}
}
});*/
function Connection(options) {
this.dispatcher = new Dispatcher();
__extends(this, this.dispatcher);
var properies = ['on', 'off', 'emit'];
for (var i in properies) {
this[properies[i]] = this.dispatcher[properies[i]];
}
this.options = options;
this.state = 'initialized'; //initialized connecting connected disconnected
this.doNotConnect = 0;
this.reconnectInterval = 1;
this.connection = null;
this.reconnectTimer = 0;
this.connect();
}
Connection.prototype.updateNetworkState = function(state){
var old_state = this.state;
this.state = state;
if (old_state !== state) {
this.emit('state_change', { previous: old_state, current: state });
}
};
Connection.prototype.connect = function () {
this.doNotConnect = 0;
if (this.state === 'connected') {
console.log('networkState is "' + this.state + '" and do not need connect');
return;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = 0;
}
this.closeAndClean();
var options = this.options;
var websocket = new WebSocket(options.url+'/app/'+options.app_key);
this.updateNetworkState('connecting');
var _this = this;
websocket.onopen = function (res) {
_this.reconnectInterval = 1;
if (_this.doNotConnect) {
_this.updateNetworkState('disconnected');
websocket.close();
return;
}
if (options.onOpen) {
options.onOpen(res);
}
};
if (options.onMessage) {
websocket.onmessage = options.onMessage;
}
websocket.onclose = function (res) {
websocket.onmessage = websocket.onopen = websocket.onclose = websocket.onerror = null;
_this.updateNetworkState('disconnected');
if (!_this.doNotConnect) {
_this.waitReconnect();
}
if (options.onClose) {
options.onClose(res);
}
};
websocket.onerror = function (res) {
_this.close();
if (!_this.doNotConnect) {
_this.waitReconnect();
}
if (options.onError) {
options.onError(res);
}
};
this.connection = websocket;
}
Connection.prototype.closeAndClean = function () {
if(this.connection) {
var websocket = this.connection;
websocket.onmessage = websocket.onopen = websocket.onclose = websocket.onerror = null;
try {
websocket.close();
} catch (e) {}
this.updateNetworkState('disconnected');
}
};
Connection.prototype.waitReconnect = function () {
if (this.state === 'connected' || this.state === 'connecting') {
return;
}
if (!this.doNotConnect) {
this.updateNetworkState('connecting');
var _this = this;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = setTimeout(function(){
_this.connect();
}, this.reconnectInterval);
if (this.reconnectInterval < 1000) {
this.reconnectInterval = 1000;
} else {
// 每次重连间隔增大一倍
this.reconnectInterval = this.reconnectInterval * 2;
}
// 有网络的状态下重连间隔最大2秒
if (this.reconnectInterval > 2000 && navigator.onLine) {
_this.reconnectInterval = 2000;
}
}
}
Connection.prototype.send = function(data) {
if (this.state !== 'connected') {
console.trace('networkState is "' + this.state + '", can not send ' + data);
return;
}
this.connection.send(data);
}
Connection.prototype.close = function(){
this.updateNetworkState('disconnected');
this.connection.close();
}
var __extends = (this && this.__extends) || function (d, b) {
for (var p in b) if (b.hasOwnProperty(p)) {d[p] = b[p];}
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
function Channel(connection, channel_name) {
this.subscribed = false;
this.dispatcher = new Dispatcher();
this.connection = connection;
this.channelName = channel_name;
this.subscribeCb = null;
this.queue = [];
__extends(this, this.dispatcher);
var properies = ['on', 'off', 'emit'];
for (var i in properies) {
this[properies[i]] = this.dispatcher[properies[i]];
}
}
Channel.prototype.processSubscribe = function () {
if (this.connection.state !== 'connected') {
return;
}
this.subscribeCb();
};
Channel.prototype.processQueue = function () {
if (this.connection.state !== 'connected' || !this.subscribed) {
return;
}
for (var i in this.queue) {
this.queue[i]();
}
this.queue = [];
};
Channel.prototype.trigger = function (event, data) {
if (event.indexOf('client-') !== 0) {
throw new Error("Event '" + event + "' should start with 'client-'");
}
var _this = this;
this.queue.push(function () {
_this.connection.send(JSON.stringify({ event: event, data: data, channel: _this.channelName }));
});
this.processQueue();
};
////////////////
var Collections = (function () {
var exports = {};
function extend(target) {
var sources = [];
for (var _i = 1; _i < arguments.length; _i++) {
sources[_i - 1] = arguments[_i];
}
for (var i = 0; i < sources.length; i++) {
var extensions = sources[i];
for (var property in extensions) {
if (extensions[property] && extensions[property].constructor &&
extensions[property].constructor === Object) {
target[property] = extend(target[property] || {}, extensions[property]);
}
else {
target[property] = extensions[property];
}
}
}
return target;
}
exports.extend = extend;
function stringify() {
var m = ["Push"];
for (var i = 0; i < arguments.length; i++) {
if (typeof arguments[i] === "string") {
m.push(arguments[i]);
}
else {
m.push(safeJSONStringify(arguments[i]));
}
}
return m.join(" : ");
}
exports.stringify = stringify;
function arrayIndexOf(array, item) {
var nativeIndexOf = Array.prototype.indexOf;
if (array === null) {
return -1;
}
if (nativeIndexOf && array.indexOf === nativeIndexOf) {
return array.indexOf(item);
}
for (var i = 0, l = array.length; i < l; i++) {
if (array[i] === item) {
return i;
}
}
return -1;
}
exports.arrayIndexOf = arrayIndexOf;
function objectApply(object, f) {
for (var key in object) {
if (Object.prototype.hasOwnProperty.call(object, key)) {
f(object[key], key, object);
}
}
}
exports.objectApply = objectApply;
function keys(object) {
var keys = [];
objectApply(object, function (_, key) {
keys.push(key);
});
return keys;
}
exports.keys = keys;
function values(object) {
var values = [];
objectApply(object, function (value) {
values.push(value);
});
return values;
}
exports.values = values;
function apply(array, f, context) {
for (var i = 0; i < array.length; i++) {
f.call(context || (window), array[i], i, array);
}
}
exports.apply = apply;
function map(array, f) {
var result = [];
for (var i = 0; i < array.length; i++) {
result.push(f(array[i], i, array, result));
}
return result;
}
exports.map = map;
function mapObject(object, f) {
var result = {};
objectApply(object, function (value, key) {
result[key] = f(value);
});
return result;
}
exports.mapObject = mapObject;
function filter(array, test) {
test = test || function (value) {
return !!value;
};
var result = [];
for (var i = 0; i < array.length; i++) {
if (test(array[i], i, array, result)) {
result.push(array[i]);
}
}
return result;
}
exports.filter = filter;
function filterObject(object, test) {
var result = {};
objectApply(object, function (value, key) {
if ((test && test(value, key, object, result)) || Boolean(value)) {
result[key] = value;
}
});
return result;
}
exports.filterObject = filterObject;
function flatten(object) {
var result = [];
objectApply(object, function (value, key) {
result.push([key, value]);
});
return result;
}
exports.flatten = flatten;
function any(array, test) {
for (var i = 0; i < array.length; i++) {
if (test(array[i], i, array)) {
return true;
}
}
return false;
}
exports.any = any;
function all(array, test) {
for (var i = 0; i < array.length; i++) {
if (!test(array[i], i, array)) {
return false;
}
}
return true;
}
exports.all = all;
function encodeParamsObject(data) {
return mapObject(data, function (value) {
if (typeof value === "object") {
value = safeJSONStringify(value);
}
return encodeURIComponent(base64_1["default"](value.toString()));
});
}
exports.encodeParamsObject = encodeParamsObject;
function buildQueryString(data) {
var params = filterObject(data, function (value) {
return value !== undefined;
});
return map(flatten(encodeParamsObject(params)), util_1["default"].method("join", "=")).join("&");
}
exports.buildQueryString = buildQueryString;
function decycleObject(object) {
var objects = [], paths = [];
return (function derez(value, path) {
var i, name, nu;
switch (typeof value) {
case 'object':
if (!value) {
return null;
}
for (i = 0; i < objects.length; i += 1) {
if (objects[i] === value) {
return {$ref: paths[i]};
}
}
objects.push(value);
paths.push(path);
if (Object.prototype.toString.apply(value) === '[object Array]') {
nu = [];
for (i = 0; i < value.length; i += 1) {
nu[i] = derez(value[i], path + '[' + i + ']');
}
}
else {
nu = {};
for (name in value) {
if (Object.prototype.hasOwnProperty.call(value, name)) {
nu[name] = derez(value[name], path + '[' + JSON.stringify(name) + ']');
}
}
}
return nu;
case 'number':
case 'string':
case 'boolean':
return value;
}
}(object, '$'));
}
exports.decycleObject = decycleObject;
function safeJSONStringify(source) {
try {
return JSON.stringify(source);
}
catch (e) {
return JSON.stringify(decycleObject(source));
}
}
exports.safeJSONStringify = safeJSONStringify;
return exports;
})();
var Dispatcher = (function () {
function Dispatcher(failThrough) {
this.callbacks = new CallbackRegistry();
this.global_callbacks = [];
this.failThrough = failThrough;
}
Dispatcher.prototype.on = function (eventName, callback, context) {
this.callbacks.add(eventName, callback, context);
return this;
};
Dispatcher.prototype.on_global = function (callback) {
this.global_callbacks.push(callback);
return this;
};
Dispatcher.prototype.off = function (eventName, callback, context) {
this.callbacks.remove(eventName, callback, context);
return this;
};
Dispatcher.prototype.emit = function (eventName, data) {
var i;
for (i = 0; i < this.global_callbacks.length; i++) {
this.global_callbacks[i](eventName, data);
}
var callbacks = this.callbacks.get(eventName);
if (callbacks && callbacks.length > 0) {
for (i = 0; i < callbacks.length; i++) {
callbacks[i].fn.call(callbacks[i].context || (window), data);
}
}
else if (this.failThrough) {
this.failThrough(eventName, data);
}
return this;
};
return Dispatcher;
}());
var CallbackRegistry = (function () {
function CallbackRegistry() {
this._callbacks = {};
}
CallbackRegistry.prototype.get = function (name) {
return this._callbacks[prefix(name)];
};
CallbackRegistry.prototype.add = function (name, callback, context) {
var prefixedEventName = prefix(name);
this._callbacks[prefixedEventName] = this._callbacks[prefixedEventName] || [];
this._callbacks[prefixedEventName].push({
fn: callback,
context: context
});
};
CallbackRegistry.prototype.remove = function (name, callback, context) {
if (!name && !callback && !context) {
this._callbacks = {};
return;
}
var names = name ? [prefix(name)] : Collections.keys(this._callbacks);
if (callback || context) {
this.removeCallback(names, callback, context);
}
else {
this.removeAllCallbacks(names);
}
};
CallbackRegistry.prototype.removeCallback = function (names, callback, context) {
Collections.apply(names, function (name) {
this._callbacks[name] = Collections.filter(this._callbacks[name] || [], function (oning) {
return (callback && callback !== oning.fn) ||
(context && context !== oning.context);
});
if (this._callbacks[name].length === 0) {
delete this._callbacks[name];
}
}, this);
};
CallbackRegistry.prototype.removeAllCallbacks = function (names) {
Collections.apply(names, function (name) {
delete this._callbacks[name];
}, this);
};
return CallbackRegistry;
}());
function prefix(name) {
return "_" + name;
}
function __ajax(options){
options=options||{};
options.type=(options.type||'GET').toUpperCase();
options.dataType=options.dataType||'json';
params=formatParams(options.data);
var xhr;
if(window.XMLHttpRequest){
xhr=new XMLHttpRequest();
}else{
xhr=ActiveXObject('Microsoft.XMLHTTP');
}
xhr.onreadystatechange=function(){
if(xhr.readyState === 4){
var status=xhr.status;
if(status>=200 && status<300){
options.success&&options.success(xhr.responseText,xhr.responseXML);
}else{
options.error&&options.error(status);
}
}
}
if(options.type==='GET'){
xhr.open('GET',options.url+'?'+params,true);
xhr.send(null);
}else if(options.type==='POST'){
xhr.open('POST',options.url,true);
xhr.setRequestHeader("Content-Type", "application/x-www-form-urlencoded");
xhr.send(params);
}
}
function formatParams(data){
var arr=[];
for(var name in data){
arr.push(encodeURIComponent(name)+'='+encodeURIComponent(data[name]));
}
return arr.join('&');
}