worker.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. <?php
  2. /**
  3. * 运行环境:php7, php_redis, php_pcntl
  4. */
  5. define('CONFIG_FILE', __DIR__ . '/config.php');
  6. define('LOG_NAME', 'worker');
  7. include __DIR__ . '/functions.php';
  8. include __DIR__ . '/class.async.php';
  9. include __DIR__ . '/class.jobs.php';
  10. //检查参数
  11. if ($argc < 2) {
  12. die("Usage: {$argv[0]} worker_id\n");
  13. }
  14. $workerId = intval($_SERVER['argv'][1]);
  15. //确保只有一个实例在运行
  16. $lockFile = run_single_instance($workerId);
  17. //加载配置
  18. $cfg = [];
  19. load_config();
  20. try {
  21. $async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
  22. $jobs = new Jobs();
  23. debug_log("Worker#{$workerId} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
  24. while (1) {
  25. //取数据
  26. $data = $async->popJob($workerId, $cfg['timeout']);
  27. //超时处理
  28. if (!$data) {
  29. //如果有使用数据库、其它的Redis,需要在此处对常驻的连接进行ping操作,以保持连接状态
  30. continue;
  31. }
  32. //任务处理
  33. $job = array_shift($data);
  34. if ($job == Async::CMD_ADMIN) { //管理命令
  35. $job = array_shift($data);
  36. switch ($job) {
  37. //退出
  38. case 'stop':
  39. case 'quit':
  40. case 'exit':
  41. debug_log("Worker#{$workerId} quiting", 'WARNING', LOG_NAME);
  42. unlink($lockFile);
  43. exit(0);
  44. //重启
  45. case 'restart':
  46. debug_log("Worker#{$worker} restarting", 'WARNING', LOG_NAME);
  47. unlink($lockFile);
  48. if (function_exists('pcntl_exec')) { //有php_pcntl扩展
  49. pcntl_exec($cfg['php_bin_path'], $argv);
  50. debug_log("Worker#{$workerId} restart failed", 'ERROR', LOG_NAME);
  51. exit(1);
  52. } else { //无php_pcntl扩展
  53. shell_exec("{$cfg['php_bin_path']} {$workerId} >/dev/null 2>&1 &");
  54. exit(0);
  55. }
  56. //刷新配置
  57. case 'reload':
  58. load_config();
  59. debug_log("Worker#{$workerId} reloaded", 'WARNING', LOG_NAME);
  60. break;
  61. //其它
  62. default:
  63. debug_log("Worker#{$workerId} discard unknown job {$job}", 'WARNING', LOG_NAME);
  64. break;
  65. }
  66. } else { //异步任务
  67. try {
  68. $jobs->exec($job, $data);
  69. } catch (Exception $e) {
  70. debug_log("Worker#{$workerId} catched an Exception during job {$job} executing:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
  71. } catch (Error $e) {
  72. debug_log("Worker#{$workerId} catched an Error during job {$job} executing" . $e->getMessage(), 'ERROR', LOG_NAME);
  73. }
  74. }
  75. }
  76. } catch (RedisException $e) {
  77. debug_log("Worker#{$workerId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
  78. }