diff --git a/app/api/controller/IndexController.php b/app/api/controller/IndexController.php index 4b143bd..69ac922 100644 --- a/app/api/controller/IndexController.php +++ b/app/api/controller/IndexController.php @@ -6,6 +6,8 @@ use app\admin\validate\tools\GenerateTableValidate; use app\admin\logic\tools\GeneratorLogic; use app\common\service\pay\PayService; use app\common\service\wechat\WeChatOaService; +use Webman\Config; +use Yansongda\Pay\Pay; class IndexController extends BaseApiController { @@ -13,43 +15,24 @@ class IndexController extends BaseApiController public function index() { - $template=[ - 'touser'=>'ocqhF6UfFQXE-SbzbP5YVQJlQAh0', - 'template_id'=>'hfvTch-DcP_UQ83VkD6Z-eMimeRrK8P8zpWC9j2dOKc', - 'miniprogram'=>'data', - 'appid'=>'wxdee751952c8c2027', - 'pagepath'=>'pages/index/index', - 'data'=>[ - 'thing2' => ['value'=>'圆珠笔采购订单'], - 'time6' => ['value'=>date('Y-m-d H:i:s')], - 'thing4' =>['value'=>'张三'], - 'phone_number5' =>['value'=>18982406440], - ] - ]; - $token='80_u14-Yey-hmXxx-GPD0Pk8X0l_0OQ0B8U-VWNw8h9cs3RYCefviQMK_idliIZ5qUvBCcfuW7NPUUL3CTCB87gJgTFpwRWULWG-nqq9gTDxtX_Su7EjVcW9QJsjuYENNbAAAQZZ'; - $url="https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=".$token; - $a=new WeChatOaService(); - d($a->getClient()->post($url,$template)); + Pay::config(Config::get('payment')); $order = [ 'description' => '测试 - yansongda - 1', 'out_trade_no' => time().'', 'payer' => [ - 'auth_code' => '123456789123456789' + 'auth_code' => 'xxxxxxxxxxx' ], 'amount' => [ 'total' => 1, ], 'scene_info' => [ - 'device_id' => '013467007045764', - 'device_ip' => '128.0.0.1', - "store_info"=>[ - "id" => "0001", - "out_id" => "A1111" + 'store_info'=>[ + 'id' => '5678' ] ], ]; - d($a->wechat->pos($order)); + $result = Pay::wechat()->pos($order); return json(['msg' =>create_password(123456, '11d3')]); } diff --git a/app/api/controller/order/RetailOrderController.php b/app/api/controller/order/RetailOrderController.php index 26a488e..d92845e 100644 --- a/app/api/controller/order/RetailOrderController.php +++ b/app/api/controller/order/RetailOrderController.php @@ -11,6 +11,7 @@ use app\api\service\WechatUserService; use app\common\logic\order\RetailOrderLogic; use app\common\enum\PayEnum; use app\common\logic\PaymentLogic; +use app\common\logic\PayNotifyLogic; use app\common\model\order\Cart; use app\common\model\retail\Cashierclass; use app\common\model\user\User; @@ -192,7 +193,10 @@ class RetailOrderController extends BaseApiController if (PaymentLogic::hasError()) { return $this->fail(PaymentLogic::getError(), $params); } - return $this->success('', $result); + if($result['trade_state_desc']=='支付成功'){ + PayNotifyLogic::handle('cashierclass', $result['out_trade_no'], $result); + } + return $this->success('支付成功',['order_id'=>$order['order_id']]); break; default: return $this->fail('支付方式错误'); diff --git a/app/common/logic/PaymentLogic.php b/app/common/logic/PaymentLogic.php index 62d0545..3972453 100644 --- a/app/common/logic/PaymentLogic.php +++ b/app/common/logic/PaymentLogic.php @@ -7,6 +7,10 @@ namespace app\common\logic; use app\common\enum\PayEnum; use app\common\model\user\UserAuth; use app\common\service\pay\PayService; +use Yansongda\Artful\Exception\Exception as ExceptionException; +use Yansongda\Pay\Exception\Exception; + +use function DI\string; /** * 支付逻辑 @@ -75,22 +79,26 @@ class PaymentLogic extends BaseLogic 'description' => '条码商品', 'out_trade_no' => $order['number'], 'payer' => [ - 'auth_code' => $auth_code + 'auth_code' => (string)$auth_code ], 'amount' => [ 'total' => intval($order['actual'] * 100), ], 'scene_info' => [ "store_info" => [ - 'id' => $order['merchant'] + 'id' => (string)$order['merchant'] ] ], ]; $wechat = new PayService(1); try { - $result = $wechat->wechat->pot($order)->toArray(); - } catch (\Exception $e) { - self::$error = $e->getMessage(); + $result = $wechat->wechat->pos($order)->toArray(); + } catch (ExceptionException $e) { + if(getenv('APP_DEBUG')==true){ + self::$error=$e->extra['message']; + }else{ + self::$error = $e->getMessage(); + } return false; } return $result; diff --git a/config/plugin/webman/redis-queue/app.php b/config/plugin/webman/redis-queue/app.php new file mode 100644 index 0000000..8f9c426 --- /dev/null +++ b/config/plugin/webman/redis-queue/app.php @@ -0,0 +1,4 @@ + true, +]; \ No newline at end of file diff --git a/config/plugin/webman/redis-queue/command.php b/config/plugin/webman/redis-queue/command.php new file mode 100644 index 0000000..8bfe2a1 --- /dev/null +++ b/config/plugin/webman/redis-queue/command.php @@ -0,0 +1,7 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +return [ + 'default' => [ + 'handlers' => [ + [ + 'class' => Monolog\Handler\RotatingFileHandler::class, + 'constructor' => [ + runtime_path() . '/logs/redis-queue/queue.log', + 7, //$maxFiles + Monolog\Logger::DEBUG, + ], + 'formatter' => [ + 'class' => Monolog\Formatter\LineFormatter::class, + 'constructor' => [null, 'Y-m-d H:i:s', true], + ], + ] + ], + ] +]; diff --git a/config/plugin/webman/redis-queue/process.php b/config/plugin/webman/redis-queue/process.php new file mode 100644 index 0000000..c8d4da1 --- /dev/null +++ b/config/plugin/webman/redis-queue/process.php @@ -0,0 +1,11 @@ + [ + 'handler' => Webman\RedisQueue\Process\Consumer::class, + 'count' => 8, // 可以设置多进程同时消费 + 'constructor' => [ + // 消费者类目录 + 'consumer_dir' => app_path() . '/queue/redis' + ] + ] +]; \ No newline at end of file diff --git a/config/plugin/webman/redis-queue/redis.php b/config/plugin/webman/redis-queue/redis.php new file mode 100644 index 0000000..0876302 --- /dev/null +++ b/config/plugin/webman/redis-queue/redis.php @@ -0,0 +1,13 @@ + [ + 'host' => 'redis://127.0.0.1:6379', + 'options' => [ + 'auth' => null, // 密码,字符串类型,可选参数 + 'db' => 0, // 数据库 + 'prefix' => '', // key 前缀 + 'max_attempts' => 5, // 消费失败后,重试次数 + 'retry_seconds' => 5, // 重试间隔,单位秒 + ] + ], +]; diff --git a/vendor/composer/autoload_psr4.php b/vendor/composer/autoload_psr4.php index 4ef02bc..5aaad5a 100644 --- a/vendor/composer/autoload_psr4.php +++ b/vendor/composer/autoload_psr4.php @@ -16,9 +16,12 @@ return array( 'Yansongda\\Supports\\' => array($vendorDir . '/yansongda/supports/src'), 'Yansongda\\Pay\\' => array($vendorDir . '/yansongda/pay/src'), 'Yansongda\\Artful\\' => array($vendorDir . '/yansongda/artful/src'), + 'Workerman\\Redis\\' => array($vendorDir . '/workerman/redis/src'), + 'Workerman\\RedisQueue\\' => array($vendorDir . '/workerman/redis-queue/src'), 'Workerman\\' => array($vendorDir . '/workerman/workerman'), 'Webmozart\\Assert\\' => array($vendorDir . '/webmozart/assert/src'), 'Webman\\ThinkOrm\\' => array($vendorDir . '/webman/think-orm/src'), + 'Webman\\RedisQueue\\' => array($vendorDir . '/webman/redis-queue/src'), 'Webman\\Log\\' => array($vendorDir . '/webman/log/src'), 'Webman\\Console\\' => array($vendorDir . '/webman/console/src'), 'Webman\\' => array($vendorDir . '/workerman/webman-framework/src'), @@ -56,7 +59,7 @@ return array( 'Qcloud\\Cos\\' => array($vendorDir . '/qcloud/cos-sdk-v5/src'), 'Psr\\SimpleCache\\' => array($vendorDir . '/psr/simple-cache/src'), 'Psr\\Log\\' => array($vendorDir . '/psr/log/src'), - 'Psr\\Http\\Message\\' => array($vendorDir . '/psr/http-message/src', $vendorDir . '/psr/http-factory/src'), + 'Psr\\Http\\Message\\' => array($vendorDir . '/psr/http-factory/src', $vendorDir . '/psr/http-message/src'), 'Psr\\Http\\Client\\' => array($vendorDir . '/psr/http-client/src'), 'Psr\\EventDispatcher\\' => array($vendorDir . '/psr/event-dispatcher/src'), 'Psr\\Container\\' => array($vendorDir . '/psr/container/src'), @@ -74,7 +77,7 @@ return array( 'Matrix\\' => array($vendorDir . '/markbaker/matrix/classes/src'), 'Laravel\\SerializableClosure\\' => array($vendorDir . '/laravel/serializable-closure/src'), 'Invoker\\' => array($vendorDir . '/php-di/invoker/src'), - 'Illuminate\\Support\\' => array($vendorDir . '/illuminate/macroable', $vendorDir . '/illuminate/conditionable', $vendorDir . '/illuminate/collections', $vendorDir . '/illuminate/support'), + 'Illuminate\\Support\\' => array($vendorDir . '/illuminate/collections', $vendorDir . '/illuminate/conditionable', $vendorDir . '/illuminate/macroable', $vendorDir . '/illuminate/support'), 'Illuminate\\Redis\\' => array($vendorDir . '/illuminate/redis'), 'Illuminate\\Contracts\\' => array($vendorDir . '/illuminate/contracts'), 'Hyperf\\Pimple\\' => array($vendorDir . '/hyperf/pimple/src'), diff --git a/vendor/composer/autoload_static.php b/vendor/composer/autoload_static.php index 38fc226..976c0d9 100644 --- a/vendor/composer/autoload_static.php +++ b/vendor/composer/autoload_static.php @@ -74,9 +74,12 @@ class ComposerStaticInitb985d5bd8942750003fe2a54df074341 ), 'W' => array ( + 'Workerman\\Redis\\' => 16, + 'Workerman\\RedisQueue\\' => 21, 'Workerman\\' => 10, 'Webmozart\\Assert\\' => 17, 'Webman\\ThinkOrm\\' => 16, + 'Webman\\RedisQueue\\' => 18, 'Webman\\Log\\' => 11, 'Webman\\Console\\' => 15, 'Webman\\' => 7, @@ -255,6 +258,14 @@ class ComposerStaticInitb985d5bd8942750003fe2a54df074341 array ( 0 => __DIR__ . '/..' . '/yansongda/artful/src', ), + 'Workerman\\Redis\\' => + array ( + 0 => __DIR__ . '/..' . '/workerman/redis/src', + ), + 'Workerman\\RedisQueue\\' => + array ( + 0 => __DIR__ . '/..' . '/workerman/redis-queue/src', + ), 'Workerman\\' => array ( 0 => __DIR__ . '/..' . '/workerman/workerman', @@ -267,6 +278,10 @@ class ComposerStaticInitb985d5bd8942750003fe2a54df074341 array ( 0 => __DIR__ . '/..' . '/webman/think-orm/src', ), + 'Webman\\RedisQueue\\' => + array ( + 0 => __DIR__ . '/..' . '/webman/redis-queue/src', + ), 'Webman\\Log\\' => array ( 0 => __DIR__ . '/..' . '/webman/log/src', @@ -417,8 +432,8 @@ class ComposerStaticInitb985d5bd8942750003fe2a54df074341 ), 'Psr\\Http\\Message\\' => array ( - 0 => __DIR__ . '/..' . '/psr/http-message/src', - 1 => __DIR__ . '/..' . '/psr/http-factory/src', + 0 => __DIR__ . '/..' . '/psr/http-factory/src', + 1 => __DIR__ . '/..' . '/psr/http-message/src', ), 'Psr\\Http\\Client\\' => array ( @@ -490,9 +505,9 @@ class ComposerStaticInitb985d5bd8942750003fe2a54df074341 ), 'Illuminate\\Support\\' => array ( - 0 => __DIR__ . '/..' . '/illuminate/macroable', + 0 => __DIR__ . '/..' . '/illuminate/collections', 1 => __DIR__ . '/..' . '/illuminate/conditionable', - 2 => __DIR__ . '/..' . '/illuminate/collections', + 2 => __DIR__ . '/..' . '/illuminate/macroable', 3 => __DIR__ . '/..' . '/illuminate/support', ), 'Illuminate\\Redis\\' => diff --git a/vendor/composer/installed.json b/vendor/composer/installed.json index b1ee798..911aed5 100644 --- a/vendor/composer/installed.json +++ b/vendor/composer/installed.json @@ -6592,6 +6592,40 @@ }, "install-path": "../webman/log" }, + { + "name": "webman/redis-queue", + "version": "v1.3.2", + "version_normalized": "1.3.2.0", + "source": { + "type": "git", + "url": "https://github.com/webman-php/redis-queue.git", + "reference": "80b9ddca0405bbb6d02e6b368e8036b3b1a13814" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/webman-php/redis-queue/zipball/80b9ddca0405bbb6d02e6b368e8036b3b1a13814", + "reference": "80b9ddca0405bbb6d02e6b368e8036b3b1a13814", + "shasum": "" + }, + "require": { + "workerman/redis-queue": "^1.2" + }, + "time": "2024-04-03T02:00:20+00:00", + "type": "library", + "installation-source": "dist", + "autoload": { + "psr-4": { + "Webman\\RedisQueue\\": "./src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "description": "Redis message queue plugin for webman.", + "support": { + "issues": "https://github.com/webman-php/redis-queue/issues", + "source": "https://github.com/webman-php/redis-queue/tree/v1.3.2" + }, + "install-path": "../webman/redis-queue" + }, { "name": "webman/think-orm", "version": "v1.1.1", @@ -6690,6 +6724,84 @@ }, "install-path": "../webmozart/assert" }, + { + "name": "workerman/redis", + "version": "v2.0.2", + "version_normalized": "2.0.2.0", + "source": { + "type": "git", + "url": "https://github.com/walkor/redis.git", + "reference": "542f10c243ba846f1f3b4c07a26136c5fa80d972" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/walkor/redis/zipball/542f10c243ba846f1f3b4c07a26136c5fa80d972", + "reference": "542f10c243ba846f1f3b4c07a26136c5fa80d972", + "shasum": "" + }, + "require": { + "php": ">=7", + "workerman/workerman": "^4.1.0||^5.0.0" + }, + "time": "2023-06-08T01:39:47+00:00", + "type": "library", + "installation-source": "dist", + "autoload": { + "psr-4": { + "Workerman\\Redis\\": "./src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "homepage": "http://www.workerman.net", + "support": { + "issues": "https://github.com/walkor/redis/issues", + "source": "https://github.com/walkor/redis/tree/v2.0.2" + }, + "install-path": "../workerman/redis" + }, + { + "name": "workerman/redis-queue", + "version": "v1.2.0", + "version_normalized": "1.2.0.0", + "source": { + "type": "git", + "url": "https://github.com/walkor/redis-queue.git", + "reference": "7b6aee70d69e5c9427c0411d85f8398027831b42" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/walkor/redis-queue/zipball/7b6aee70d69e5c9427c0411d85f8398027831b42", + "reference": "7b6aee70d69e5c9427c0411d85f8398027831b42", + "shasum": "" + }, + "require": { + "php": ">=7.0", + "workerman/redis": "^1.0||^2.0", + "workerman/workerman": ">=4.0.20" + }, + "time": "2024-02-28T07:00:03+00:00", + "type": "library", + "installation-source": "dist", + "autoload": { + "psr-4": { + "Workerman\\RedisQueue\\": "./src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Message queue system written in PHP based on workerman and backed by Redis.", + "homepage": "http://www.workerman.net", + "support": { + "issues": "https://github.com/walkor/redis-queue/issues", + "source": "https://github.com/walkor/redis-queue/tree/v1.2.0" + }, + "install-path": "../workerman/redis-queue" + }, { "name": "workerman/webman-framework", "version": "v1.5.16", diff --git a/vendor/composer/installed.php b/vendor/composer/installed.php index 43d4142..eb274d9 100644 --- a/vendor/composer/installed.php +++ b/vendor/composer/installed.php @@ -929,6 +929,15 @@ 'aliases' => array(), 'dev_requirement' => false, ), + 'webman/redis-queue' => array( + 'pretty_version' => 'v1.3.2', + 'version' => '1.3.2.0', + 'reference' => '80b9ddca0405bbb6d02e6b368e8036b3b1a13814', + 'type' => 'library', + 'install_path' => __DIR__ . '/../webman/redis-queue', + 'aliases' => array(), + 'dev_requirement' => false, + ), 'webman/think-orm' => array( 'pretty_version' => 'v1.1.1', 'version' => '1.1.1.0', @@ -947,6 +956,24 @@ 'aliases' => array(), 'dev_requirement' => false, ), + 'workerman/redis' => array( + 'pretty_version' => 'v2.0.2', + 'version' => '2.0.2.0', + 'reference' => '542f10c243ba846f1f3b4c07a26136c5fa80d972', + 'type' => 'library', + 'install_path' => __DIR__ . '/../workerman/redis', + 'aliases' => array(), + 'dev_requirement' => false, + ), + 'workerman/redis-queue' => array( + 'pretty_version' => 'v1.2.0', + 'version' => '1.2.0.0', + 'reference' => '7b6aee70d69e5c9427c0411d85f8398027831b42', + 'type' => 'library', + 'install_path' => __DIR__ . '/../workerman/redis-queue', + 'aliases' => array(), + 'dev_requirement' => false, + ), 'workerman/webman' => array( 'pretty_version' => '1.0.0+no-version-set', 'version' => '1.0.0.0', diff --git a/vendor/webman/redis-queue/README.md b/vendor/webman/redis-queue/README.md new file mode 100644 index 0000000..277ddf9 --- /dev/null +++ b/vendor/webman/redis-queue/README.md @@ -0,0 +1,5 @@ +# redis-queue +Message queue system written in PHP for [webman](https://github.com/walkor/webman). + +# Document +https://www.workerman.net/doc/webman#/queue/redis diff --git a/vendor/webman/redis-queue/composer.json b/vendor/webman/redis-queue/composer.json new file mode 100644 index 0000000..39e3f0d --- /dev/null +++ b/vendor/webman/redis-queue/composer.json @@ -0,0 +1,10 @@ +{ + "name": "webman/redis-queue", + "description": "Redis message queue plugin for webman.", + "require": { + "workerman/redis-queue": "^1.2" + }, + "autoload": { + "psr-4": {"Webman\\RedisQueue\\": "./src"} + } +} diff --git a/vendor/webman/redis-queue/src/Client.php b/vendor/webman/redis-queue/src/Client.php new file mode 100644 index 0000000..82779b0 --- /dev/null +++ b/vendor/webman/redis-queue/src/Client.php @@ -0,0 +1,64 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ +namespace Webman\RedisQueue; + +use support\Log; +use Workerman\RedisQueue\Client as RedisClient; + +/** + * Class RedisQueue + * @package support + * + * Strings methods + * @method static void send($queue, $data, $delay=0) + */ +class Client +{ + /** + * @var Client[] + */ + protected static $_connections = null; + + + /** + * @param string $name + * @return RedisClient + */ + public static function connection($name = 'default') { + if (!isset(static::$_connections[$name])) { + $config = config('redis_queue', config('plugin.webman.redis-queue.redis', [])); + if (!isset($config[$name])) { + throw new \RuntimeException("RedisQueue connection $name not found"); + } + $host = $config[$name]['host']; + $options = $config[$name]['options']; + $client = new RedisClient($host, $options); + if (method_exists($client, 'logger')) { + $client->logger(Log::channel('plugin.webman.redis-queue.default')); + } + static::$_connections[$name] = $client; + } + return static::$_connections[$name]; + } + + /** + * @param $name + * @param $arguments + * @return mixed + */ + public static function __callStatic($name, $arguments) + { + return static::connection('default')->{$name}(... $arguments); + } +} diff --git a/vendor/webman/redis-queue/src/Command/MakeConsumerCommand.php b/vendor/webman/redis-queue/src/Command/MakeConsumerCommand.php new file mode 100644 index 0000000..d9c2c04 --- /dev/null +++ b/vendor/webman/redis-queue/src/Command/MakeConsumerCommand.php @@ -0,0 +1,92 @@ +addArgument('name', InputArgument::REQUIRED, 'Consumer name'); + } + + /** + * @param InputInterface $input + * @param OutputInterface $output + * @return int + */ + protected function execute(InputInterface $input, OutputInterface $output): int + { + $name = $input->getArgument('name'); + $output->writeln("Make consumer $name"); + + $path = ''; + $namespace = 'app\\queue\\redis'; + if ($pos = strrpos($name, DIRECTORY_SEPARATOR)) { + $path = substr($name, 0, $pos + 1); + $name = substr($name, $pos + 1); + $namespace .= '\\' . str_replace(DIRECTORY_SEPARATOR, '\\', trim($path, DIRECTORY_SEPARATOR)); + } + $class = Util::nameToClass($name); + $queue = Util::classToName($name); + + $file = app_path() . "/queue/redis/{$path}$class.php"; + $this->createConsumer($namespace, $class, $queue, $file); + + return self::SUCCESS; + } + + /** + * @param $class + * @param $queue + * @param $file + * @return void + */ + protected function createConsumer($namspace, $class, $queue, $file) + { + $path = pathinfo($file, PATHINFO_DIRNAME); + if (!is_dir($path)) { + mkdir($path, 0777, true); + } + $controller_content = << + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +namespace Webman\RedisQueue; + + +/** + * Interface Consumer + * @package Webman\RedisQueue + */ +interface Consumer +{ + public function consume($data); +} \ No newline at end of file diff --git a/vendor/webman/redis-queue/src/Install.php b/vendor/webman/redis-queue/src/Install.php new file mode 100644 index 0000000..3a2e9f5 --- /dev/null +++ b/vendor/webman/redis-queue/src/Install.php @@ -0,0 +1,72 @@ + 'config/plugin/webman/redis-queue', +); + + /** + * Install + * @return void + */ + public static function install() + { + static::installByRelation(); + if (!is_dir(app_path() . '/queue/redis')){ + mkdir(app_path() . '/queue/redis', 0777, true); + } + } + + /** + * Uninstall + * @return void + */ + public static function uninstall() + { + self::uninstallByRelation(); + } + + /** + * installByRelation + * @return void + */ + public static function installByRelation() + { + foreach (static::$pathRelation as $source => $dest) { + if ($pos = strrpos($dest, '/')) { + $parent_dir = base_path().'/'.substr($dest, 0, $pos); + if (!is_dir($parent_dir)) { + mkdir($parent_dir, 0777, true); + } + } + //symlink(__DIR__ . "/$source", base_path()."/$dest"); + copy_dir(__DIR__ . "/$source", base_path()."/$dest"); + } + } + + /** + * uninstallByRelation + * @return void + */ + public static function uninstallByRelation() + { + foreach (static::$pathRelation as $source => $dest) { + $path = base_path()."/$dest"; + if (!is_dir($path) && !is_file($path)) { + continue; + } + /*if (is_link($path) { + unlink($path); + }*/ + remove_dir($path); + } + } + +} diff --git a/vendor/webman/redis-queue/src/Process/Consumer.php b/vendor/webman/redis-queue/src/Process/Consumer.php new file mode 100644 index 0000000..653d64e --- /dev/null +++ b/vendor/webman/redis-queue/src/Process/Consumer.php @@ -0,0 +1,88 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +namespace Webman\RedisQueue\Process; + +use support\Container; +use Webman\RedisQueue\Client; + +/** + * Class Consumer + * @package process + */ +class Consumer +{ + /** + * @var string + */ + protected $_consumerDir = ''; + + /** + * @var array + */ + protected $_consumers = []; + + /** + * StompConsumer constructor. + * @param string $consumer_dir + */ + public function __construct($consumer_dir = '') + { + $this->_consumerDir = $consumer_dir; + } + + /** + * onWorkerStart. + */ + public function onWorkerStart() + { + if (!is_dir($this->_consumerDir)) { + echo "Consumer directory {$this->_consumerDir} not exists\r\n"; + return; + } + $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir); + $iterator = new \RecursiveIteratorIterator($dir_iterator); + foreach ($iterator as $file) { + if (is_dir($file)) { + continue; + } + $fileinfo = new \SplFileInfo($file); + $ext = $fileinfo->getExtension(); + if ($ext === 'php') { + $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4)); + if (is_a($class, 'Webman\RedisQueue\Consumer', true)) { + $consumer = Container::get($class); + $connection_name = $consumer->connection ?? 'default'; + $queue = $consumer->queue; + if (!$queue) { + echo "Consumer {$class} queue not exists\r\n"; + continue; + } + $this->_consumers[$queue] = $consumer; + $connection = Client::connection($connection_name); + $connection->subscribe($queue, [$consumer, 'consume']); + if (method_exists($connection, 'onConsumeFailure')) { + $connection->onConsumeFailure(function ($exeption, $package) { + $consumer = $this->_consumers[$package['queue']] ?? null; + if ($consumer && method_exists($consumer, 'onConsumeFailure')) { + return call_user_func([$consumer, 'onConsumeFailure'], $exeption, $package); + } + }); + } + } + } + } + } +} diff --git a/vendor/webman/redis-queue/src/Redis.php b/vendor/webman/redis-queue/src/Redis.php new file mode 100644 index 0000000..3c202a6 --- /dev/null +++ b/vendor/webman/redis-queue/src/Redis.php @@ -0,0 +1,78 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ +namespace Webman\RedisQueue; + +use Workerman\Timer; + +/** + * Class RedisQueue + * @package support + * + * Strings methods + * @method static bool send($queue, $data, $delay=0) + */ +class Redis +{ + /** + * @var RedisConnection[] + */ + protected static $_connections = []; + + /** + * @param string $name + * @return RedisConnection + */ + public static function connection($name = 'default') { + if (!isset(static::$_connections[$name])) { + $configs = config('redis_queue', config('plugin.webman.redis-queue.redis', [])); + if (!isset($configs[$name])) { + throw new \RuntimeException("RedisQueue connection $name not found"); + } + $config = $configs[$name]; + static::$_connections[$name] = static::connect($config); + } + return static::$_connections[$name]; + } + + protected static function connect($config) + { + if (!extension_loaded('redis')) { + throw new \RuntimeException('Please make sure the PHP Redis extension is installed and enabled.'); + } + + $redis = new RedisConnection(); + $address = $config['host']; + $config = [ + 'host' => parse_url($address, PHP_URL_HOST), + 'port' => parse_url($address, PHP_URL_PORT), + 'db' => $config['options']['database'] ?? $config['options']['db'] ?? 0, + 'auth' => $config['options']['auth'] ?? '', + 'timeout' => $config['options']['timeout'] ?? 2, + 'ping' => $config['options']['ping'] ?? 55, + 'prefix' => $config['options']['prefix'] ?? '', + ]; + $redis->connectWithConfig($config); + return $redis; + } + + /** + * @param $name + * @param $arguments + * @return mixed + */ + public static function __callStatic($name, $arguments) + { + return static::connection('default')->{$name}(... $arguments); + } +} diff --git a/vendor/webman/redis-queue/src/RedisConnection.php b/vendor/webman/redis-queue/src/RedisConnection.php new file mode 100644 index 0000000..157e55e --- /dev/null +++ b/vendor/webman/redis-queue/src/RedisConnection.php @@ -0,0 +1,99 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ +namespace Webman\RedisQueue; + +use Workerman\Timer; +use Workerman\Worker; + +class RedisConnection extends \Redis +{ + /** + * @var array + */ + protected $config = []; + + /** + * @param array $config + * @return void + */ + public function connectWithConfig(array $config = []) + { + static $timer; + if ($config) { + $this->config = $config; + } + if (false === $this->connect($this->config['host'], $this->config['port'], $this->config['timeout'] ?? 2)) { + throw new \RuntimeException("Redis connect {$this->config['host']}:{$this->config['port']} fail."); + } + if (!empty($this->config['auth'])) { + $this->auth($this->config['auth']); + } + if (!empty($this->config['db'])) { + $this->select($this->config['db']); + } + if (!empty($this->config['prefix'])) { + $this->setOption(\Redis::OPT_PREFIX, $this->config['prefix']); + } + if (Worker::getAllWorkers() && !$timer) { + $timer = Timer::add($this->config['ping'] ?? 55, function () { + $this->execCommand('ping'); + }); + } + } + + /** + * @param $command + * @param ...$args + * @return mixed + * @throws \Throwable + */ + protected function execCommand($command, ...$args) + { + try { + return $this->{$command}(...$args); + } catch (\Throwable $e) { + $msg = strtolower($e->getMessage()); + if ($msg === 'connection lost' || strpos($msg, 'went away')) { + $this->connectWithConfig(); + return $this->{$command}(...$args); + } + throw $e; + } + } + + /** + * @param $queue + * @param $data + * @param $delay + * @return bool + */ + public function send($queue, $data, $delay = 0) + { + $queue_waiting = '{redis-queue}-waiting'; + $queue_delay = '{redis-queue}-delayed'; + $now = time(); + $package_str = json_encode([ + 'id' => time().rand(), + 'time' => $now, + 'delay' => $delay, + 'attempts' => 0, + 'queue' => $queue, + 'data' => $data + ]); + if ($delay) { + return (bool)$this->execCommand('zAdd' ,$queue_delay, $now + $delay, $package_str); + } + return (bool)$this->execCommand('lPush', $queue_waiting.$queue, $package_str); + } +} diff --git a/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/app.php b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/app.php new file mode 100644 index 0000000..8f9c426 --- /dev/null +++ b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/app.php @@ -0,0 +1,4 @@ + true, +]; \ No newline at end of file diff --git a/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/command.php b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/command.php new file mode 100644 index 0000000..8bfe2a1 --- /dev/null +++ b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/command.php @@ -0,0 +1,7 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +return [ + 'default' => [ + 'handlers' => [ + [ + 'class' => Monolog\Handler\RotatingFileHandler::class, + 'constructor' => [ + runtime_path() . '/logs/redis-queue/queue.log', + 7, //$maxFiles + Monolog\Logger::DEBUG, + ], + 'formatter' => [ + 'class' => Monolog\Formatter\LineFormatter::class, + 'constructor' => [null, 'Y-m-d H:i:s', true], + ], + ] + ], + ] +]; diff --git a/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/process.php b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/process.php new file mode 100644 index 0000000..c8d4da1 --- /dev/null +++ b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/process.php @@ -0,0 +1,11 @@ + [ + 'handler' => Webman\RedisQueue\Process\Consumer::class, + 'count' => 8, // 可以设置多进程同时消费 + 'constructor' => [ + // 消费者类目录 + 'consumer_dir' => app_path() . '/queue/redis' + ] + ] +]; \ No newline at end of file diff --git a/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/redis.php b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/redis.php new file mode 100644 index 0000000..0876302 --- /dev/null +++ b/vendor/webman/redis-queue/src/config/plugin/webman/redis-queue/redis.php @@ -0,0 +1,13 @@ + [ + 'host' => 'redis://127.0.0.1:6379', + 'options' => [ + 'auth' => null, // 密码,字符串类型,可选参数 + 'db' => 0, // 数据库 + 'prefix' => '', // key 前缀 + 'max_attempts' => 5, // 消费失败后,重试次数 + 'retry_seconds' => 5, // 重试间隔,单位秒 + ] + ], +]; diff --git a/vendor/workerman/redis-queue/README.md b/vendor/workerman/redis-queue/README.md new file mode 100644 index 0000000..f8a7e1a --- /dev/null +++ b/vendor/workerman/redis-queue/README.md @@ -0,0 +1,97 @@ +# redis-queue +Message queue system written in PHP based on [workerman](https://github.com/walkor/workerman) and backed by Redis. + +# Install +``` +composer require workerman/redis-queue +``` + +# Usage +test.php +```php +onWorkerStart = function () { + $client = new Client('redis://127.0.0.1:6379'); + $client->subscribe('user-1', function($data){ + echo "user-1\n"; + var_export($data); + }); + $client->subscribe('user-2', function($data){ + echo "user-2\n"; + var_export($data); + }); + $client->onConsumeFailure(function (\Throwable $exception, $package) { + echo "consume failure\n"; + echo $exception->getMessage(), "\n"; + var_export($package); + }); + Timer::add(1, function()use($client){ + $client->send('user-1', ['some', 'data']); + }); +}; + +Worker::runAll(); +``` + +Run with command `php test.php start` or `php test.php start -d`. + +# API + + * Client::__construct() + * Client::send() + * Client::subscribe() + * Client::unsubscribe() + +------------------------------------------------------- + + +### __construct (string $address, [array $options]) + +Create an instance by $address and $options. + + * `$address` for example `redis://ip:6379`. + + * `$options` is the client connection options. Defaults: + * `auth`: default '' + * `db`: default 0 + * `retry_seconds`: Retry interval after consumption failure + * `max_attempts`: Maximum number of retries after consumption failure + +------------------------------------------------------- + + +### send(String $queue, Mixed $data, [int $dely=0]) + +Send a message to a queue + +* `$queue` is the queue to publish to, `String` +* `$data` is the message to publish, `Mixed` +* `$dely` is delay seconds for delayed consumption, `Int` + +------------------------------------------------------- + + +### subscribe(mixed $queue, callable $callback) + +Subscribe to a queue or queues + +* `$queue` is a `String` queue or an `Array` which has as keys the queue name to subscribe. +* `$callback` - `function (Mixed $data)`, `$data` is the data sent by `send($queue, $data)`. + +------------------------------------------------------- + + +### unsubscribe(mixed $queue) + +Unsubscribe from a queue or queues + +* `$queue` is a `String` queue or an array of queue to unsubscribe from + +------------------------------------------------------- diff --git a/vendor/workerman/redis-queue/composer.json b/vendor/workerman/redis-queue/composer.json new file mode 100644 index 0000000..cd9bb84 --- /dev/null +++ b/vendor/workerman/redis-queue/composer.json @@ -0,0 +1,15 @@ +{ + "name" : "workerman/redis-queue", + "type" : "library", + "homepage": "http://www.workerman.net", + "license" : "MIT", + "description": "Message queue system written in PHP based on workerman and backed by Redis.", + "require": { + "php": ">=7.0", + "workerman/redis" : "^1.0||^2.0", + "workerman/workerman" : ">=4.0.20" + }, + "autoload": { + "psr-4": {"Workerman\\RedisQueue\\": "./src"} + } +} diff --git a/vendor/workerman/redis-queue/examples/test.php b/vendor/workerman/redis-queue/examples/test.php new file mode 100644 index 0000000..60546c3 --- /dev/null +++ b/vendor/workerman/redis-queue/examples/test.php @@ -0,0 +1,35 @@ +onWorkerStart = function () { + $client = new Client('redis://127.0.0.1:6379'); + $client->subscribe('user-1', function ($data) { + echo "user-1\n"; + var_export($data); + }, function ($data) { + echo "user-1 failed\n"; + var_export($data); + }); + $client->subscribe('user-2', function ($data) { + echo "user-2\n"; + var_export($data); + }, function ($data) { + echo "user-2 failed\n"; + var_export($data); + }); + $client->onConsumeFailure(function ($package) { + echo "consume failure\n"; + var_export($package); + }); + Timer::add(1, function () use ($client) { + $client->send('user-1', [666, 777]); + }); +}; + +Worker::runAll(); diff --git a/vendor/workerman/redis-queue/src/Client.php b/vendor/workerman/redis-queue/src/Client.php new file mode 100644 index 0000000..449023e --- /dev/null +++ b/vendor/workerman/redis-queue/src/Client.php @@ -0,0 +1,321 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +namespace Workerman\RedisQueue; + +use RuntimeException; +use Workerman\Timer; +use Workerman\Redis\Client as Redis; +use Psr\Log\LoggerInterface; + +/** + * Class Client + * @package Workerman\RedisQueue + */ +#[\AllowDynamicProperties] +class Client +{ + /** + * Queue waiting for consumption + */ + const QUEUE_WAITING = '{redis-queue}-waiting'; + + /** + * Queue with delayed consumption + */ + const QUEUE_DELAYED = '{redis-queue}-delayed'; + + /** + * Queue with consumption failure + */ + const QUEUE_FAILED = '{redis-queue}-failed'; + + /** + * @var Redis + */ + protected $_redisSubscribe; + + /** + * @var Redis + */ + protected $_redisSend; + + /** + * @var array + */ + protected $_subscribeQueues = []; + + /** + * @var LoggerInterface + */ + protected $_logger = null; + + /** + * consume failure callback + * @var callable + */ + protected $_consumeFailure = null; + /** + * @var array + */ + protected $_options = [ + 'retry_seconds' => 5, + 'max_attempts' => 5, + 'auth' => '', + 'db' => 0, + 'prefix' => '', + ]; + + /** + * Client constructor. + * @param $address + * @param array $options + */ + public function __construct($address, $options = []) + { + $this->_redisSubscribe = new Redis($address, $options); + $this->_redisSubscribe->brPoping = 0; + $this->_redisSend = new Redis($address, $options); + if (isset($options['auth']) && $options['auth'] !== '') { + $this->_redisSubscribe->auth($options['auth']); + $this->_redisSend->auth($options['auth']); + } + if (isset($options['db'])) { + $this->_redisSubscribe->select($options['db']); + $this->_redisSend->select($options['db']); + } + $this->_options = array_merge($this->_options, $options); + } + + /** + * Send. + * + * @param $queue + * @param $data + * @param int $delay + * @param callable $cb + */ + public function send($queue, $data, $delay = 0, $cb = null) + { + static $_id = 0; + $id = \microtime(true) . '.' . (++$_id); + $now = time(); + $package_str = \json_encode([ + 'id' => $id, + 'time' => $now, + 'delay' => $delay, + 'attempts' => 0, + 'queue' => $queue, + 'data' => $data + ]); + if (\is_callable($delay)) { + $cb = $delay; + $delay = 0; + } + if ($cb) { + $cb = function ($ret) use ($cb) { + $cb((bool)$ret); + }; + if ($delay == 0) { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $queue, $package_str, $cb); + } else { + $this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $now + $delay, $package_str, $cb); + } + return; + } + if ($delay == 0) { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $queue, $package_str); + } else { + $this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $now + $delay, $package_str); + } + } + + /** + * Set the consume failure callback. + * + * @param callable $callback + */ + public function onConsumeFailure(callable $callback) + { + $this->_consumeFailure = $callback; + } + + /** + * Subscribe. + * + * @param string|array $queue + * @param callable $callback + */ + public function subscribe($queue, callable $callback) + { + $queue = (array)$queue; + foreach ($queue as $q) { + $redis_key = $this->_options['prefix'] . static::QUEUE_WAITING . $q; + $this->_subscribeQueues[$redis_key] = $callback; + } + $this->pull(); + } + + /** + * Unsubscribe. + * + * @param string|array $queue + * @return void + */ + public function unsubscribe($queue) + { + $queue = (array)$queue; + foreach ($queue as $q) { + $redis_key = $this->_options['prefix'] . static::QUEUE_WAITING . $q; + unset($this->_subscribeQueues[$redis_key]); + } + } + + /** + * tryToPullDelayQueue. + */ + protected function tryToPullDelayQueue() + { + static $retry_timer = 0; + if ($retry_timer) { + return; + } + $retry_timer = Timer::add(1, function () { + $now = time(); + $options = ['LIMIT', 0, 128]; + $this->_redisSend->zrevrangebyscore($this->_options['prefix'] . static::QUEUE_DELAYED, $now, '-inf', $options, function ($items) { + if ($items === false) { + throw new RuntimeException($this->_redisSend->error()); + } + foreach ($items as $package_str) { + $this->_redisSend->zRem($this->_options['prefix'] . static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) { + if ($result !== 1) { + return; + } + $package = \json_decode($package_str, true); + if (!$package) { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str); + return; + } + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $package['queue'], $package_str); + }); + } + }); + }); + } + + /** + * pull. + */ + public function pull() + { + $this->tryToPullDelayQueue(); + if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) { + return; + } + $cb = function ($data) use (&$cb) { + if ($data) { + $this->_redisSubscribe->brPoping = 0; + $redis_key = $data[0]; + $package_str = $data[1]; + $package = json_decode($package_str, true); + if (!$package) { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str); + } else { + if (!isset($this->_subscribeQueues[$redis_key])) { + // 取消订阅,放回队列 + $this->_redisSend->rPush($redis_key, $package_str); + } else { + $callback = $this->_subscribeQueues[$redis_key]; + try { + \call_user_func($callback, $package['data']); + } catch (\Throwable $e) { + $this->log((string)$e); + $package['max_attempts'] = $this->_options['max_attempts']; + $package['error'] = $e->getMessage(); + $package_modified = null; + if ($this->_consumeFailure) { + try { + $package_modified = \call_user_func($this->_consumeFailure, $e, $package); + } catch (\Throwable $ta) { + $this->log((string)$ta); + } + } + if (is_array($package_modified)) { + $package['data'] = $package_modified['data'] ?? $package['data']; + $package['attempts'] = $package_modified['attempts'] ?? $package['attempts']; + $package['max_attempts'] = $package_modified['max_attempts'] ?? $package['max_attempts']; + $package['error'] = $package_modified['error'] ?? $package['error']; + } + if (++$package['attempts'] > $package['max_attempts']) { + $this->fail($package); + } else { + $this->retry($package); + } + } + } + } + } + if ($this->_subscribeQueues) { + $this->_redisSubscribe->brPoping = 1; + Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb], false); + } + }; + $this->_redisSubscribe->brPoping = 1; + $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb); + } + + /** + * @param $package + */ + protected function retry($package) + { + $delay = time() + $this->_options['retry_seconds'] * ($package['attempts']); + $this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $delay, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); + } + + /** + * @param $package + */ + protected function fail($package) + { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); + } + + /** + * @param $message + * @return void + */ + protected function log($message) + { + if ($this->_logger) { + $this->_logger->info($message); + return; + } + echo $message . PHP_EOL; + } + + /** + * @param $logger + * @return mixed|LoggerInterface + */ + public function logger($logger = null) + { + if ($logger) { + $this->_logger = $logger; + } + return $this->_logger; + } +} diff --git a/vendor/workerman/redis/.gitignore b/vendor/workerman/redis/.gitignore new file mode 100644 index 0000000..f3f9e18 --- /dev/null +++ b/vendor/workerman/redis/.gitignore @@ -0,0 +1,6 @@ +logs +.buildpath +.project +.settings +.idea +.DS_Store diff --git a/vendor/workerman/redis/LICENSE b/vendor/workerman/redis/LICENSE new file mode 100644 index 0000000..667e8cd --- /dev/null +++ b/vendor/workerman/redis/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 walkor + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/workerman/redis/README.md b/vendor/workerman/redis/README.md new file mode 100644 index 0000000..badde39 --- /dev/null +++ b/vendor/workerman/redis/README.md @@ -0,0 +1,37 @@ +# redis +Asynchronous redis client for PHP based on workerman. + +# Install + +``` +composer require workerman/redis +``` + +# Usage +```php + +require_once __DIR__ . '/vendor/autoload.php'; +use Workerman\Redis\Client; +use Workerman\Worker; + +$worker = new Worker('http://0.0.0.0:6161'); + +$worker->onWorkerStart = function() { + global $redis; + $redis = new Client('redis://127.0.0.1:6379'); +}; + +$worker->onMessage = function($connection, $data) { + global $redis; + $redis->set('key', 'hello world'); + $redis->get('key', function ($result) use ($connection) { + $connection->send($result); + }); +}; + +Worker::runAll(); +``` + +## Document + +http://doc.workerman.net/components/workerman-redis.html diff --git a/vendor/workerman/redis/composer.json b/vendor/workerman/redis/composer.json new file mode 100644 index 0000000..4bbdbaf --- /dev/null +++ b/vendor/workerman/redis/composer.json @@ -0,0 +1,13 @@ +{ + "name" : "workerman/redis", + "type" : "library", + "homepage": "http://www.workerman.net", + "license" : "MIT", + "require": { + "php": ">=7", + "workerman/workerman": "^4.1.0||^5.0.0" + }, + "autoload": { + "psr-4": {"Workerman\\Redis\\": "./src"} + } +} diff --git a/vendor/workerman/redis/src/Client.php b/vendor/workerman/redis/src/Client.php new file mode 100644 index 0000000..483af75 --- /dev/null +++ b/vendor/workerman/redis/src/Client.php @@ -0,0 +1,1014 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ +namespace Workerman\Redis; + +use Revolt\EventLoop; +use Workerman\Connection\AsyncTcpConnection; +use Workerman\Redis\Protocols\Redis; +use Workerman\Timer; + +/** + * Class Client + * @package Workerman\Redis + * + * Strings methods + * @method static int append($key, $value, $cb = null) + * @method static int bitCount($key, $cb = null) + * @method static int decrBy($key, $value, $cb = null) + * @method static string|bool get($key, $cb = null) + * @method static int getBit($key, $offset, $cb = null) + * @method static string getRange($key, $start, $end, $cb = null) + * @method static string getSet($key, $value, $cb = null) + * @method static int incrBy($key, $value, $cb = null) + * @method static float incrByFloat($key, $value, $cb = null) + * @method static array mGet(array $keys, $cb = null) + * @method static array getMultiple(array $keys, $cb = null) + * @method static bool setBit($key, $offset, $value, $cb = null) + * @method static bool setEx($key, $ttl, $value, $cb = null) + * @method static bool pSetEx($key, $ttl, $value, $cb = null) + * @method static bool setNx($key, $value, $cb = null) + * @method static string setRange($key, $offset, $value, $cb = null) + * @method static int strLen($key, $cb = null) + * Keys methods + * @method static int del(...$keys, $cb = null) + * @method static int unlink(...$keys, $cb = null) + * @method static false|string dump($key, $cb = null) + * @method static int exists(...$keys, $cb = null) + * @method static bool expire($key, $ttl, $cb = null) + * @method static bool pexpire($key, $ttl, $cb = null) + * @method static bool expireAt($key, $timestamp, $cb = null) + * @method static bool pexpireAt($key, $timestamp, $cb = null) + * @method static array keys($pattern, $cb = null) + * @method static void migrate($host, $port, $keys, $dbIndex, $timeout, $copy = false, $replace = false, $cb = null) + * @method static bool move($key, $dbIndex, $cb = null) + * @method static string|int|bool object($information, $key, $cb = null) + * @method static bool persist($key, $cb = null) + * @method static string randomKey($cb = null) + * @method static bool rename($srcKey, $dstKey, $cb = null) + * @method static bool renameNx($srcKey, $dstKey, $cb = null) + * @method static string type($key, $cb = null) + * @method static int ttl($key, $cb = null) + * @method static int pttl($key, $cb = null) + * @method static void restore($key, $ttl, $value, $cb = null) + * Hashes methods + * @method static false|int hSet($key, $hashKey, $value, $cb = null) + * @method static bool hSetNx($key, $hashKey, $value, $cb = null) + * @method static false|string hGet($key, $hashKey, $cb = null) + * @method static false|int hLen($key, $cb = null) + * @method static false|int hDel($key, ...$hashKeys, $cb = null) + * @method static array hKeys($key, $cb = null) + * @method static array hVals($key, $cb = null) + * @method static bool hExists($key, $hashKey, $cb = null) + * @method static int hIncrBy($key, $hashKey, $value, $cb = null) + * @method static float hIncrByFloat($key, $hashKey, $value, $cb = null) + * @method static int hStrLen($key, $hashKey, $cb = null) + * Lists methods + * @method static array blPop($keys, $timeout, $cb = null) + * @method static array brPop($keys, $timeout, $cb = null) + * @method static false|string bRPopLPush($srcKey, $dstKey, $timeout, $cb = null) + * @method static false|string lIndex($key, $index, $cb = null) + * @method static int lInsert($key, $position, $pivot, $value, $cb = null) + * @method static false|string lPop($key, $cb = null) + * @method static false|int lPush($key, ...$entries, $cb = null) + * @method static false|int lPushx($key, $value, $cb = null) + * @method static array lRange($key, $start, $end, $cb = null) + * @method static false|int lRem($key, $value, $count, $cb = null) + * @method static bool lSet($key, $index, $value, $cb = null) + * @method static false|array lTrim($key, $start, $end, $cb = null) + * @method static false|string rPop($key, $cb = null) + * @method static false|string rPopLPush($srcKey, $dstKey, $cb = null) + * @method static false|int rPush($key, ...$entries, $cb = null) + * @method static false|int rPushX($key, $value, $cb = null) + * @method static false|int lLen($key, $cb = null) + * Sets methods + * @method static int sAdd($key, $value, $cb = null) + * @method static int sCard($key, $cb = null) + * @method static array sDiff($keys, $cb = null) + * @method static false|int sDiffStore($dst, $keys, $cb = null) + * @method static false|array sInter($keys, $cb = null) + * @method static false|int sInterStore($dst, $keys, $cb = null) + * @method static bool sIsMember($key, $member, $cb = null) + * @method static array sMembers($key, $cb = null) + * @method static bool sMove($src, $dst, $member, $cb = null) + * @method static false|string|array sPop($key, $count = 0, $cb = null) + * @method static false|string|array sRandMember($key, $count = 0, $cb = null) + * @method static int sRem($key, ...$members, $cb = null) + * @method static array sUnion(...$keys, $cb = null) + * @method static false|int sUnionStore($dst, ...$keys, $cb = null) + * Sorted sets methods + * @method static array bzPopMin($keys, $timeout, $cb = null) + * @method static array bzPopMax($keys, $timeout, $cb = null) + * @method static int zAdd($key, $score, $value, $cb = null) + * @method static int zCard($key, $cb = null) + * @method static int zCount($key, $start, $end, $cb = null) + * @method static double zIncrBy($key, $value, $member, $cb = null) + * @method static int zinterstore($keyOutput, $arrayZSetKeys, $arrayWeights = [], $aggregateFunction = '', $cb = null) + * @method static array zPopMin($key, $count, $cb = null) + * @method static array zPopMax($key, $count, $cb = null) + * @method static array zRange($key, $start, $end, $withScores = false, $cb = null) + * @method static array zRangeByScore($key, $start, $end, $options = [], $cb = null) + * @method static array zRevRangeByScore($key, $start, $end, $options = [], $cb = null) + * @method static array zRangeByLex($key, $min, $max, $offset = 0, $limit = 0, $cb = null) + * @method static int zRank($key, $member, $cb = null) + * @method static int zRevRank($key, $member, $cb = null) + * @method static int zRem($key, ...$members, $cb = null) + * @method static int zRemRangeByRank($key, $start, $end, $cb = null) + * @method static int zRemRangeByScore($key, $start, $end, $cb = null) + * @method static array zRevRange($key, $start, $end, $withScores = false, $cb = null) + * @method static double zScore($key, $member, $cb = null) + * @method static int zunionstore($keyOutput, $arrayZSetKeys, $arrayWeights = [], $aggregateFunction = '', $cb = null) + * HyperLogLogs methods + * @method static int pfAdd($key, $values, $cb = null) + * @method static int pfCount($keys, $cb = null) + * @method static bool pfMerge($dstKey, $srcKeys, $cb = null) + * Geocoding methods + * @method static int geoAdd($key, $longitude, $latitude, $member, ...$items, $cb = null) + * @method static array geoHash($key, ...$members, $cb = null) + * @method static array geoPos($key, ...$members, $cb = null) + * @method static double geoDist($key, $members, $unit = '', $cb = null) + * @method static int|array geoRadius($key, $longitude, $latitude, $radius, $unit, $options = [], $cb = null) + * @method static array geoRadiusByMember($key, $member, $radius, $units, $options = [], $cb = null) + * Streams methods + * @method static int xAck($stream, $group, $arrMessages, $cb = null) + * @method static string xAdd($strKey, $strId, $arrMessage, $iMaxLen = 0, $booApproximate = false, $cb = null) + * @method static array xClaim($strKey, $strGroup, $strConsumer, $minIdleTime, $arrIds, $arrOptions = [], $cb = null) + * @method static int xDel($strKey, $arrIds, $cb = null) + * @method static mixed xGroup($command, $strKey, $strGroup, $strMsgId, $booMKStream = null, $cb = null) + * @method static mixed xInfo($command, $strStream, $strGroup = null, $cb = null) + * @method static int xLen($stream, $cb = null) + * @method static array xPending($strStream, $strGroup, $strStart = 0, $strEnd = 0, $iCount = 0, $strConsumer = null, $cb = null) + * @method static array xRange($strStream, $strStart, $strEnd, $iCount = 0, $cb = null) + * @method static array xRead($arrStreams, $iCount = 0, $iBlock = null, $cb = null) + * @method static array xReadGroup($strGroup, $strConsumer, $arrStreams, $iCount = 0, $iBlock = null, $cb = null) + * @method static array xRevRange($strStream, $strEnd, $strStart, $iCount = 0, $cb = null) + * @method static int xTrim($strStream, $iMaxLen, $booApproximate = null, $cb = null) + * Pub/sub methods + * @method static mixed publish($channel, $message, $cb = null) + * @method static mixed pubSub($keyword, $argument = null, $cb = null) + * Generic methods + * @method static mixed rawCommand(...$commandAndArgs, $cb = null) + * Transactions methods + * @method static multi($cb = null) + * @method static mixed exec($cb = null) + * @method static mixed discard($cb = null) + * @method static mixed watch($keys, $cb = null) + * @method static mixed unwatch($keys, $cb = null) + * Scripting methods + * @method static mixed eval($script, $args = [], $numKeys = 0, $cb = null) + * @method static mixed evalSha($sha, $args = [], $numKeys = 0, $cb = null) + * @method static mixed script($command, ...$scripts, $cb = null) + * @method static mixed client(...$args, $cb = null) + * @method static null|string getLastError($cb = null) + * @method static bool clearLastError($cb = null) + * @method static mixed _prefix($value, $cb = null) + * @method static mixed _serialize($value, $cb = null) + * @method static mixed _unserialize($value, $cb = null) + * Introspection methods + * @method static bool isConnected($cb = null) + * @method static mixed getHost($cb = null) + * @method static mixed getPort($cb = null) + * @method static false|int getDbNum($cb = null) + * @method static false|double getTimeout($cb = null) + * @method static mixed getReadTimeout($cb = null) + * @method static mixed getPersistentID($cb = null) + * @method static mixed getAuth($cb = null) + */ +#[\AllowDynamicProperties] +class Client +{ + /** + * @var AsyncTcpConnection + */ + protected $_connection = null; + + /** + * @var array + */ + protected $_options = []; + + /** + * @var string + */ + protected $_address = ''; + + /** + * @var array + */ + protected $_queue = []; + + /** + * @var int + */ + protected $_db = 0; + + /** + * @var string|array + */ + protected $_auth = null; + + /** + * @var bool + */ + protected $_waiting = true; + + /** + * @var Timer + */ + protected $_connectTimeoutTimer = null; + + /** + * @var Timer + */ + protected $_reconnectTimer = null; + + /** + * @var callable + */ + protected $_connectionCallback = null; + + /** + * @var Timer + */ + protected $_waitTimeoutTimer = null; + + /** + * @var string + */ + protected $_error = ''; + + /** + * @var bool + */ + protected $_subscribe = false; + + /** + * @var bool + */ + protected $_firstConnect = true; + + /** + * Client constructor. + * @param $address + * @param array $options + * @param null $callback + */ + public function __construct($address, $options = [], $callback = null) + { + if (!\class_exists('Protocols\Redis')) { + \class_alias('Workerman\Redis\Protocols\Redis', 'Protocols\Redis'); + } + $this->_address = $address; + $this->_options = $options; + $this->_connectionCallback = $callback; + $this->connect(); + $timer = Timer::add(1, function () use (&$timer) { + if (empty($this->_queue)) { + return; + } + if ($this->_subscribe) { + Timer::del($timer); + return; + } + reset($this->_queue); + $current_queue = current($this->_queue); + $current_command = $current_queue[0][0]; + $ignore_first_queue = in_array($current_command, ['BLPOP', 'BRPOP']); + $time = time(); + $timeout = isset($this->_options['wait_timeout']) ? $this->_options['wait_timeout'] : 600; + $has_timeout = false; + $first_queue = true; + foreach ($this->_queue as $key => $queue) { + if ($first_queue && $ignore_first_queue) { + $first_queue = false; + continue; + } + if ($time - $queue[1] > $timeout) { + $has_timeout = true; + unset($this->_queue[$key]); + $msg = "Workerman Redis Wait Timeout ($timeout seconds)"; + if ($queue[2]) { + $this->_error = $msg; + \call_user_func($queue[2], false, $this); + } else { + echo new Exception($msg); + } + } + } + if ($has_timeout && !$ignore_first_queue) { + $this->closeConnection(); + $this->connect(); + } + }); + } + + /** + * connect + */ + public function connect() + { + if ($this->_connection) { + return; + } + + $timeout = isset($this->_options['connect_timeout']) ? $this->_options['connect_timeout'] : 5; + $context = isset($this->_options['context']) ? $this->_options['context'] : []; + $this->_connection = new AsyncTcpConnection($this->_address, $context); + $this->_connection->protocol = Redis::class; + if(!empty($this->_options['ssl'])){ + $this->_connection->transport = 'ssl'; + } + + $this->_connection->onConnect = function () { + $this->_waiting = false; + Timer::del($this->_connectTimeoutTimer); + if ($this->_reconnectTimer) { + Timer::del($this->_reconnectTimer); + $this->_reconnectTimer = null; + } + + if ($this->_db) { + $this->_queue = \array_merge([[['SELECT', $this->_db], time(), null]], $this->_queue); + } + + if ($this->_auth) { + $this->_queue = \array_merge([[['AUTH', $this->_auth], time(), null]], $this->_queue); + } + + $this->_connection->onError = function ($connection, $code, $msg) { + echo new \Exception("Workerman Redis Connection Error $code $msg"); + }; + $this->process(); + $this->_firstConnect && $this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this); + $this->_firstConnect = false; + }; + + $time_start = microtime(true); + $this->_connection->onError = function ($connection) use ($time_start) { + $time = microtime(true) - $time_start; + $msg = "Workerman Redis Connection Failed ($time seconds)"; + $this->_error = $msg; + $exception = new \Exception($msg); + if (!$this->_connectionCallback) { + echo $exception; + return; + } + $this->_firstConnect && \call_user_func($this->_connectionCallback, false, $this); + }; + + $this->_connection->onClose = function () use ($time_start) { + $this->_subscribe = false; + if ($this->_connectTimeoutTimer) { + Timer::del($this->_connectTimeoutTimer); + } + if ($this->_reconnectTimer) { + Timer::del($this->_reconnectTimer); + $this->_reconnectTimer = null; + } + $this->closeConnection(); + if (microtime(true) - $time_start > 5) { + $this->connect(); + } else { + $this->_reconnectTimer = Timer::add(5, function () { + $this->connect(); + }, null, false); + } + }; + + $this->_connection->onMessage = function ($connection, $data) { + $this->_error = ''; + $this->_waiting = false; + reset($this->_queue); + $queue = current($this->_queue); + $cb = $queue[2]; + $type = $data[0]; + if (!$this->_subscribe) { + unset($this->_queue[key($this->_queue)]); + } + if (empty($this->_queue)) { + $this->_queue = []; + } + $success = !($type === '-' || $type === '!'); + $exception = false; + $result = false; + if ($success) { + $result = $data[1]; + if ($type === '+' && $result === 'OK') { + $result = true; + } + } else { + $this->_error = $data[1]; + } + if (!$cb) { + $this->process(); + return; + } + // format. + if (!empty($queue[3])) { + $result = \call_user_func($queue[3], $result); + } + try { + \call_user_func($cb, $result, $this); + } catch (\Exception $exception) { + } + + if ($type === '!') { + $this->closeConnection(); + $this->connect(); + } else { + $this->process(); + } + if ($exception) { + throw $exception; + } + }; + + $this->_connectTimeoutTimer = Timer::add($timeout, function () use ($timeout) { + $this->_connectTimeoutTimer = null; + if ($this->_connection && $this->_connection->getStatus(false) === 'ESTABLISHED') { + return; + } + $this->closeConnection(); + $this->_error = "Workerman Redis Connection to {$this->_address} timeout ({$timeout} seconds)"; + if ($this->_firstConnect && $this->_connectionCallback) { + \call_user_func($this->_connectionCallback, false, $this); + } else { + echo $this->_error . "\n"; + } + + }); + $this->_connection->connect(); + } + + /** + * process + */ + public function process() + { + if (!$this->_connection || $this->_waiting || empty($this->_queue) || $this->_subscribe) { + return; + } + \reset($this->_queue); + $queue = \current($this->_queue); + if ($queue[0][0] === 'SUBSCRIBE' || $queue[0][0] === 'PSUBSCRIBE') { + $this->_subscribe = true; + } + $this->_waiting = true; + $this->_connection->send($queue[0]); + $this->_error = ''; + } + + /** + * subscribe + * + * @param $channels + * @param $cb + */ + public function subscribe($channels, $cb) + { + $new_cb = function ($result) use ($cb) { + if (!$result) { + echo $this->error(); + return; + } + $response_type = $result[0]; + switch ($response_type) { + case 'subscribe': + return; + case 'message': + \call_user_func($cb, $result[1], $result[2], $this); + return; + default: + echo 'unknow response type for subscribe. buffer:' . serialize($result) . "\n"; + } + }; + $this->_queue[] = [['SUBSCRIBE', $channels], time(), $new_cb]; + $this->process(); + } + + /** + * psubscribe + * + * @param $patterns + * @param $cb + */ + public function pSubscribe($patterns, $cb) + { + $new_cb = function ($result) use ($cb) { + if (!$result) { + echo $this->error(); + return; + } + $response_type = $result[0]; + switch ($response_type) { + case 'psubscribe': + return; + case 'pmessage': + \call_user_func($cb, $result[1], $result[2], $result[3], $this); + return; + default: + echo 'unknow response type for psubscribe. buffer:' . serialize($result) . "\n"; + } + }; + $this->_queue[] = [['PSUBSCRIBE', $patterns], time(), $new_cb]; + $this->process(); + } + + /** + * select + * + * @param $db + * @param null $cb + * @return mixed + */ + public function select($db, $cb = null) + { + $format = function ($result) use ($db) { + $this->_db = $db; + return $result; + }; + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['SELECT', $db], time(), $cb, $format]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * auth + * + * @param string|array $auth + * @param null $cb + * @return mixed + */ + public function auth($auth, $cb = null) + { + $format = function ($result) use ($auth) { + $this->_auth = $auth; + return $result; + }; + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['AUTH', $auth], time(), $cb, $format]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * set + * + * @param $key + * @param $value + * @param null $cb + * @return mixed + */ + public function set($key, $value, $cb = null) + { + $args = func_get_args(); + if ($cb !== null && !\is_callable($cb)) { + $timeout = $cb; + $cb = null; + if (\count($args) > 3) { + $cb = $args[3]; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['SETEX', $key, $timeout, $value], time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['SET', $key, $value], time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * incr + * + * @param $key + * @param null $cb + * @return mixed + */ + public function incr($key, $cb = null) + { + $args = func_get_args(); + if ($cb !== null && !\is_callable($cb)) { + $num = $cb; + $cb = null; + if (\count($args) > 2) { + $cb = $args[2]; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['INCRBY', $key, $num], time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['INCR', $key], time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + + /** + * decr + * + * @param $key + * @param null $cb + * @return mixed + */ + public function decr($key, $cb = null) + { + $args = func_get_args(); + if ($cb !== null && !\is_callable($cb)) { + $num = $cb; + $cb = null; + if (\count($args) > 2) { + $cb = $args[2]; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['DECRBY', $key, $num], time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['DECR', $key], time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * sort + * + * @param $key + * @param $options + * @param null $cb + * @return mixed + */ + function sort($key, $options, $cb = null) + { + $args = []; + if (isset($options['sort'])) { + $args[] = $options['sort']; + unset($options['sort']); + } + + foreach ($options as $op => $value) { + $args[] = $op; + if (!is_array($value)) { + $args[] = $value; + continue; + } + foreach ($value as $sub_value) { + $args[] = $sub_value; + } + } + \array_unshift($args, 'SORT', $key); + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [$args, time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * mSet + * + * @param array $array + * @param null $cb + */ + public function mSet(array $array, $cb = null) + { + return $this->mapCb('MSET', $array, $cb); + } + + /** + * mSetNx + * + * @param array $array + * @param null $cb + */ + public function mSetNx(array $array, $cb = null) + { + return $this->mapCb('MSETNX', $array, $cb); + } + + /** + * mapCb + * + * @param $command + * @param array $array + * @param $cb + * @return mixed + */ + protected function mapCb($command, array $array, $cb) + { + $args = [$command]; + foreach ($array as $key => $value) { + $args[] = $key; + $args[] = $value; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [$args, time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * hMSet + * + * @param $key + * @param array $array + * @param null $cb + * @return mixed + */ + public function hMSet($key, array $array, $cb = null) + { + return $this->keyMapCb('HMSET', $key, $array, $cb); + } + + /** + * hMGet + * + * @param $key + * @param array $array + * @param null $cb + * @return mixed + */ + public function hMGet($key, array $array, $cb = null) + { + $format = function ($result) use ($array) { + if (!is_array($result)) { + return $result; + } + return \array_combine($array, $result); + }; + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['HMGET', $key, $array], time(), $cb, $format]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * hGetAll + * + * @param $key + * @param null $cb + * @return mixed + */ + public function hGetAll($key, $cb = null) + { + $format = function ($result) { + if (!\is_array($result)) { + return $result; + } + $return = []; + $key = ''; + foreach ($result as $index => $item) { + if ($index % 2 == 0) { + $key = $item; + continue; + } + $return[$key] = $item; + } + return $return; + }; + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [['HGETALL', $key], time(), $cb, $format]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * keyMapCb + * + * @param $command + * @param $key + * @param array $array + * @param $cb + * @return mixed + */ + protected function keyMapCb($command, $key, array $array, $cb) + { + $args = [$command, $key]; + foreach ($array as $key => $value) { + $args[] = $key; + $args[] = $value; + } + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [$args, time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * __call + * + * @param $method + * @param $args + * @return mixed + */ + public function __call($method, $args) + { + $cb = null; + if (count($args) > 1 || in_array($method, ['randomKey', 'multi', 'exec', 'discard'])) { + if (\is_callable(end($args))) { + $cb = array_pop($args); + } + } + + \array_unshift($args, \strtoupper($method)); + $need_suspend = !$cb && class_exists(EventLoop::class, false); + if ($need_suspend) { + [$suspension, $cb] = $this->suspenstion(); + } + $this->_queue[] = [$args, time(), $cb]; + $this->process(); + if ($need_suspend) { + return $suspension->suspend(); + } + return null; + } + + /** + * @return array + */ + protected function suspenstion() + { + $suspension = EventLoop::getSuspension(); + $cb = function ($result) use ($suspension) { + $suspension->resume($result); + }; + return [$suspension, $cb]; + } + + /** + * closeConnection + */ + public function closeConnection() + { + if (!$this->_connection) { + return; + } + $this->_subscribe = false; + $this->_connection->onConnect = $this->_connection->onError = $this->_connection->onClose = + $this->_connection->onMessage = null; + $this->_connection->close(); + $this->_connection = null; + if ($this->_connectTimeoutTimer) { + Timer::del($this->_connectTimeoutTimer); + } + if ($this->_reconnectTimer) { + Timer::del($this->_reconnectTimer); + } + } + + /** + * error + * + * @return string + */ + function error() + { + return $this->_error; + } + + /** + * close + */ + public function close() + { + $this->closeConnection(); + $this->_queue = []; + gc_collect_cycles(); + if (function_exists('gc_mem_caches')) { + gc_mem_caches(); + } + } + + /** + * scan + * + * @throws Exception + */ + public function scan() + { + throw new Exception('Not implemented'); + } + + /** + * hScan + * + * @throws Exception + */ + public function hScan() + { + throw new Exception('Not implemented'); + } + + /** + * hScan + * + * @throws Exception + */ + public function sScan() + { + throw new Exception('Not implemented'); + } + + /** + * hScan + * + * @throws Exception + */ + public function zScan() + { + throw new Exception('Not implemented'); + } + +} diff --git a/vendor/workerman/redis/src/Exception.php b/vendor/workerman/redis/src/Exception.php new file mode 100644 index 0000000..391fe0a --- /dev/null +++ b/vendor/workerman/redis/src/Exception.php @@ -0,0 +1,19 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ +namespace Workerman\Redis; + +class Exception extends \Exception +{ + +} \ No newline at end of file diff --git a/vendor/workerman/redis/src/Protocols/Redis.php b/vendor/workerman/redis/src/Protocols/Redis.php new file mode 100644 index 0000000..64c05a2 --- /dev/null +++ b/vendor/workerman/redis/src/Protocols/Redis.php @@ -0,0 +1,179 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ +namespace Workerman\Redis\Protocols; + +use Workerman\Connection\ConnectionInterface; +use Workerman\Redis\Exception; + +/** + * Redis Protocol. + */ +class Redis +{ + /** + * Check the integrity of the package. + * + * @param string $buffer + * @param ConnectionInterface $connection + * @return int + */ + public static function input($buffer, ConnectionInterface $connection) { + $type = $buffer[0]; + $pos = \strpos($buffer, "\r\n"); + if (false === $pos) { + return 0; + } + switch ($type) { + case ':': + case '+': + case '-': + return $pos + 2; + case '$': + if(0 === strpos($buffer, '$-1')) { + return 5; + } + return $pos + 4 + (int)substr($buffer, 1, $pos); + case '*': + if(0 === strpos($buffer, '*-1')) { + return 5; + } + $count = (int)substr($buffer, 1, $pos - 1); + while ($count --) { + $next_pos = strpos($buffer, "\r\n", $pos + 2); + if (!$next_pos) { + return 0; + } + $sub_type = $buffer[$pos + 2]; + switch ($sub_type) { + case ':': + case '+': + case '-': + $pos = $next_pos; + break; + case '$': + if($pos + 2 === strpos($buffer, '$-1', $pos)) { + $pos = $next_pos; + break; + } + $length = (int)substr($buffer, $pos + 3, $next_pos - $pos -3); + $pos = $next_pos + $length + 2; + if (strlen($buffer) < $pos) { + return 0; + } + break; + default: + return \strlen($buffer); + } + } + return $pos + 2; + default: + return \strlen($buffer); + } + } + + + /** + * Encode. + * + * @param array $data + * @return string + */ + public static function encode(array $data) + { + $cmd = ''; + $count = \count($data); + foreach ($data as $item) + { + if (\is_array($item)) { + $count += \count($item) - 1; + foreach ($item as $str) + { + $cmd .= '$' . \strlen($str) . "\r\n$str\r\n"; + } + continue; + } + $cmd .= '$' . \strlen($item) . "\r\n$item\r\n"; + } + return "*$count\r\n$cmd"; + } + + /** + * Decode. + * + * @param string $buffer + * @return string + */ + public static function decode($buffer) + { + $type = $buffer[0]; + switch ($type) { + case ':': + return [$type ,(int) substr($buffer, 1)]; + case '+': + return [$type, \substr($buffer, 1, strlen($buffer) - 3)]; + case '-': + return [$type, \substr($buffer, 1, strlen($buffer) - 3)]; + case '$': + if(0 === strpos($buffer, '$-1')) { + return [$type, null]; + } + $pos = \strpos($buffer, "\r\n"); + return [$type, \substr($buffer, $pos + 2, (int)substr($buffer, 1, $pos))]; + case '*': + if(0 === strpos($buffer, '*-1')) { + return [$type, null]; + } + $pos = \strpos($buffer, "\r\n"); + $value = []; + $count = (int)substr($buffer, 1, $pos - 1); + while ($count --) { + $next_pos = strpos($buffer, "\r\n", $pos + 2); + if (!$next_pos) { + return 0; + } + $sub_type = $buffer[$pos + 2]; + switch ($sub_type) { + case ':': + $value[] = (int) substr($buffer, $pos + 3, $next_pos - $pos - 3); + $pos = $next_pos; + break; + case '+': + $value[] = substr($buffer, $pos + 3, $next_pos - $pos - 3); + $pos = $next_pos; + break; + case '-': + $value[] = substr($buffer, $pos + 3, $next_pos - $pos - 3); + $pos = $next_pos; + break; + case '$': + if($pos + 2 === strpos($buffer, '$-1', $pos)) { + $pos = $next_pos; + $value[] = null; + break; + } + $length = (int)substr($buffer, $pos + 3, $next_pos - $pos -3); + $value[] = substr($buffer, $next_pos + 2, $length); + $pos = $next_pos + $length + 2; + break; + default: + return ['!', "protocol error, got '$sub_type' as reply type byte. buffer:".bin2hex($buffer)." pos:$pos"]; + } + } + return [$type, $value]; + default: + return ['!', "protocol error, got '$type' as reply type byte. buffer:".bin2hex($buffer)]; + + } + } +}