123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- <?php
- /**
- * 运行环境:php7, php_redis, php_pcntl
- */
- define('CONFIG_FILE', __DIR__ . '/config.php');
- define('LOG_NAME', 'worker');
- include __DIR__ . '/functions.php';
- include __DIR__ . '/class.async.php';
- include __DIR__ . '/class.jobs.php';
- //检查参数
- if ($argc < 2) {
- die("Usage: {$argv[0]} worker_id\n");
- }
- $workerId = intval($_SERVER['argv'][1]);
- //确保只有一个实例在运行
- $lockFile = run_single_instance($workerId);
- //加载配置
- $cfg = [];
- load_config();
- try {
- $async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
- $jobs = new Jobs();
- debug_log("Worker#{$workerId} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
- while (1) {
- //取数据
- $data = $async->popJob($workerId, $cfg['timeout']);
- //超时处理
- if (!$data) {
- //如果有使用数据库、其它的Redis,需要在此处对常驻的连接进行ping操作,以保持连接状态
- continue;
- }
- //任务处理
- $job = array_shift($data);
- if ($job == Async::CMD_ADMIN) { //管理命令
- $job = array_shift($data);
- switch ($job) {
- //退出
- case 'stop':
- case 'quit':
- case 'exit':
- debug_log("Worker#{$workerId} quiting", 'WARNING', LOG_NAME);
- unlink($lockFile);
- exit(0);
- //重启
- case 'restart':
- debug_log("Worker#{$worker} restarting", 'WARNING', LOG_NAME);
- unlink($lockFile);
- if (function_exists('pcntl_exec')) { //有php_pcntl扩展
- pcntl_exec($cfg['php_bin_path'], $argv);
- debug_log("Worker#{$workerId} restart failed", 'ERROR', LOG_NAME);
- exit(1);
- } else { //无php_pcntl扩展
- shell_exec("{$cfg['php_bin_path']} {$workerId} >/dev/null 2>&1 &");
- exit(0);
- }
- //刷新配置
- case 'reload':
- load_config();
- debug_log("Worker#{$workerId} reloaded", 'WARNING', LOG_NAME);
- break;
- //其它
- default:
- debug_log("Worker#{$workerId} discard unknown job {$job}", 'WARNING', LOG_NAME);
- break;
- }
- } else { //异步任务
- try {
- $jobs->exec($job, $data);
- } catch (Exception $e) {
- debug_log("Worker#{$workerId} catched an Exception during job {$job} executing:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
- } catch (Error $e) {
- debug_log("Worker#{$workerId} catched an Error during job {$job} executing" . $e->getMessage(), 'ERROR', LOG_NAME);
- }
- }
- }
- } catch (RedisException $e) {
- debug_log("Worker#{$workerId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
- }
|