123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- <?php
- /**
- * 异步任务队列类
- */
- class Async
- {
- /**
- * Redis的服务地址
- * @var string
- */
- protected $host;
- /**
- * Redis的服务端口
- * @var int
- */
- protected $port;
- /**
- * Worker的数量
- * @var int
- */
- protected $workerNum;
- /**
- * Redis对象
- * @var Redis
- */
- protected $redis;
- /**
- * 管理命令列表
- */
- protected $adminCmdList = ['stop', 'restart', 'reload', 'quit', 'exit'];
- /**
- * 异步任务数据的队列KEY
- * @const string
- */
- const KEY_QUEUE = 'ASYNC_Q';
- /**
- * 发送管理命令的Redis队列KEY
- * @const string
- */
- const KEY_ADMIN = 'ADMIN_Q';
- /**
- * 管理命令的标识
- * @const string
- */
- const CMD_ADMIN = '!';
- /**
- * 构造函数
- * @param string $host Redis服务地址
- * @param string $port Redis服务端口
- * @param int $workerNum Worker的数量
- */
- public function __construct($host, $port, $workerNum)
- {
- $this->host = $host;
- $this->port = $port;
- $this->workerNum = $workerNum;
- $this->redis = new Redis();
- $this->redis->connect($host, $port, 3);
- }
- /**
- * 添加一条管理命令
- * @param int $workerId 指定操作的目标workerId
- * @param string $cmd 管理命令(stop,restart,reload,quit,exit)
- * @return bool 是否成功
- */
- public function admin($cmd, $workerId = -1)
- {
- if (!in_array($cmd, $this->adminCmdList)) {
- return false;
- }
- if ($workerId > -1) {
- $flag = $this->redis->rPush(self::KEY_ADMIN . $workerId, $cmd);
- return $flag !== false;
- } else {
- for ($i = 0; $i < $this->workerNum; $i++) {
- $flag = $this->redis->rPush(self::KEY_ADMIN . $i, $cmd);
- if ($flag === false) {
- return false;
- }
- }
- return true;
- }
- }
- /**
- * 向任务队列中添加一条任务数据
- * @param string $name 任务名
- * @param mixed $arg1 参数1
- * ...
- * @return bool|int 成功返回队列中的消息数量,失败返回false
- */
- public function pushJob()
- {
- $argc = func_num_args();
- $args = func_get_args();
- if ($argc < 1) {
- return false;
- }
- if ($argc == 1 && is_array($args)) {
- $args = $args[0];
- }
- return $this->redis->rPush(self::KEY_QUEUE, json_encode($args));
- }
- /**
- * 从任务队列中取一条任务数据出来(阻塞读取)
- * @param int $workerId 当前workerId,用于提取管理命令
- * @param int $timeout 超时时间
- * @return bool|array 成功返回任务数据,失败返回false
- */
- public function popJob($workerId = -1, $timeout = 5)
- {
- $keys = [self::KEY_QUEUE];
- if ($workerId > -1) {
- $keys[] = self::KEY_ADMIN . $workerId;
- }
- $ret = $this->redis->blPop($keys, $timeout);
- if (!$ret || empty($ret)) {
- return false;
- }
- $key = $ret[0];
- $value = $ret[1];
- if ($key == self::KEY_QUEUE) {
- return json_decode($value, true);
- } else {
- return [self::CMD_ADMIN, $value];
- }
- }
- /**
- * ping操作,用于保持连接不断开
- */
- public function ping()
- {
- return $this->redis->ping();
- }
- /**
- * 魔术方法
- * @param string $method 被调用的方法名
- * @param array $args 传入的参数
- * @return mixed
- */
- public function __call($method, $args)
- {
- if (empty($method) || preg_match('/\W/', $method)) {
- return false;
- }
- if (in_array($method, $this->adminCmdList)) {
- return $this->admin($method);
- } else {
- array_unshift($args, $method);
- return $this->pushJob($args);
- }
- }
- }
|