redis-queue
Message queue system written in PHP based on workerman and backed by Redis.
Install
composer require workerman/redis-queue
Usage
test.php
<?php
require __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\RedisQueue\Client;
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "consume failure\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
Run with command php test.php start
or php test.php start -d
.
API
__construct (string $address, [array $options])
Create an instance by $address and $options.
-
$address
for exampleredis://ip:6379
. -
$options
is the client connection options. Defaults:auth
: default ''db
: default 0retry_seconds
: Retry interval after consumption failuremax_attempts
: Maximum number of retries after consumption failure
send(String $queue, Mixed $data, [int $dely=0])
Send a message to a queue
$queue
is the queue to publish to,String
$data
is the message to publish,Mixed
$dely
is delay seconds for delayed consumption,Int
subscribe(mixed $queue, callable $callback)
Subscribe to a queue or queues
$queue
is aString
queue or anArray
which has as keys the queue name to subscribe.$callback
-function (Mixed $data)
,$data
is the data sent bysend($queue, $data)
.
unsubscribe(mixed $queue)
Unsubscribe from a queue or queues
$queue
is aString
queue or an array of queue to unsubscribe from