host = $host; $this->port = $port; $this->workerNum = $workerNum; $this->redis = new Redis(); $this->redis->connect($host, $port, 3); } /** * 添加一条管理命令 * @param int $workerId 指定操作的目标workerId * @param string $cmd 管理命令(stop,restart,reload,quit,exit) * @return bool 是否成功 */ public function admin($cmd, $workerId = -1) { if (!in_array($cmd, $this->adminCmdList)) { return false; } if ($workerId > -1) { $flag = $this->redis->rPush(self::KEY_ADMIN . $workerId, $cmd); return $flag !== false; } else { for ($i = 0; $i < $this->workerNum; $i++) { $flag = $this->redis->rPush(self::KEY_ADMIN . $i, $cmd); if ($flag === false) { return false; } } return true; } } /** * 向任务队列中添加一条任务数据 * @param string $name 任务名 * @param mixed $arg1 参数1 * ... * @return bool|int 成功返回队列中的消息数量,失败返回false */ public function pushJob() { $argc = func_num_args(); $args = func_get_args(); if ($argc < 1) { return false; } if ($argc == 1 && is_array($args)) { $args = $args[0]; } return $this->redis->rPush(self::KEY_QUEUE, json_encode($args)); } /** * 从任务队列中取一条任务数据出来(阻塞读取) * @param int $workerId 当前workerId,用于提取管理命令 * @param int $timeout 超时时间 * @return bool|array 成功返回任务数据,失败返回false */ public function popJob($workerId = -1, $timeout = 5) { $keys = [self::KEY_QUEUE]; if ($workerId > -1) { $keys[] = self::KEY_ADMIN . $workerId; } $ret = $this->redis->blPop($keys, $timeout); if (!$ret || empty($ret)) { return false; } $key = $ret[0]; $value = $ret[1]; if ($key == self::KEY_QUEUE) { return json_decode($value, true); } else { return [self::CMD_ADMIN, $value]; } } /** * ping操作,用于保持连接不断开 */ public function ping() { return $this->redis->ping(); } /** * 魔术方法 * @param string $method 被调用的方法名 * @param array $args 传入的参数 * @return mixed */ public function __call($method, $args) { if (empty($method) || preg_match('/\W/', $method)) { return false; } if (in_array($method, $this->adminCmdList)) { return $this->admin($method); } else { array_unshift($args, $method); return $this->pushJob($args); } } }