popJob($workerId, $cfg['timeout']); //超时处理 if (!$data) { //如果有使用数据库、其它的Redis,需要在此处对常驻的连接进行ping操作,以保持连接状态 continue; } //任务处理 $job = array_shift($data); if ($job == Async::CMD_ADMIN) { //管理命令 $job = array_shift($data); switch ($job) { //退出 case 'stop': case 'quit': case 'exit': debug_log("Worker#{$workerId} quiting", 'WARNING', LOG_NAME); unlink($lockFile); exit(0); //重启 case 'restart': debug_log("Worker#{$worker} restarting", 'WARNING', LOG_NAME); unlink($lockFile); if (function_exists('pcntl_exec')) { //有php_pcntl扩展 pcntl_exec($cfg['php_bin_path'], $argv); debug_log("Worker#{$workerId} restart failed", 'ERROR', LOG_NAME); exit(1); } else { //无php_pcntl扩展 shell_exec("{$cfg['php_bin_path']} {$workerId} >/dev/null 2>&1 &"); exit(0); } //刷新配置 case 'reload': load_config(); debug_log("Worker#{$workerId} reloaded", 'WARNING', LOG_NAME); break; //其它 default: debug_log("Worker#{$workerId} discard unknown job {$job}", 'WARNING', LOG_NAME); break; } } else { //异步任务 try { $jobs->exec($job, $data); } catch (Exception $e) { debug_log("Worker#{$workerId} catched an Exception during job {$job} executing:" . $e->getMessage(), 'EXCEPTION', LOG_NAME); } catch (Error $e) { debug_log("Worker#{$workerId} catched an Error during job {$job} executing" . $e->getMessage(), 'ERROR', LOG_NAME); } } } } catch (RedisException $e) { debug_log("Worker#{$workerId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME); }