transfer2.php 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. <?php
  2. define('CONFIG_FILE', __DIR__ . '/config.php');
  3. define('LOG_NAME', 'transfer2');
  4. define('MSG_MAX_SIZE', 1024);
  5. include __DIR__ . '/functions.php';
  6. include __DIR__ . '/class.async.php';
  7. //检查参数
  8. if ($argc < 2) {
  9. die("Usage: {$argv[0]} TRANSFER_ID\n");
  10. }
  11. //加载配置
  12. $cfg = [];
  13. load_config();
  14. $transferId = intval($argv[1]); //进程的编号
  15. $queueKey = intval($cfg['sysvmsg_key']); //消息队列的KEY
  16. //保障只运行一个进程
  17. $lockFile = run_single_instance($transferId);
  18. //创建队列
  19. $q = msg_get_queue($queueKey, 0666);
  20. if (!$q) {
  21. die("msg_get_queue() failed!\n");
  22. }
  23. //创建子进程
  24. $parentId = getmypid();
  25. $childPid = create_child_process($parentId, $q, $cfg['timeout']);
  26. //修改进程名称
  27. cli_set_process_title('php -f ' . implode(' ', $argv) . ' [master]');
  28. //设置信号处理函数(在子进程创建之后才设置信号处理函数)
  29. $quit = $restart = false;
  30. $async = null;
  31. pcntl_signal(SIGTERM, 'sig_handle'); //quit
  32. pcntl_signal(SIGUSR1, 'sig_handle'); //restart
  33. pcntl_signal(SIGUSR2, 'sig_handle'); //ping
  34. pcntl_signal(SIGCHLD, 'sig_handle'); //child quit
  35. //从队列中取消息
  36. try {
  37. $async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
  38. debug_log("Transfer#{$transferId} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
  39. $flag = false;
  40. while (1) {
  41. declare (ticks = 1) {
  42. $flag = msg_receive($q, 0, $msgtype, MSG_MAX_SIZE, $msg, false, 0, $errcode);
  43. }
  44. if ($flag) {
  45. $data = json_decode($msg, true);
  46. if (!$data || !is_array($data) || count($data) < 1) {
  47. continue;
  48. }
  49. $async->pushJob($data);
  50. } else {
  51. if ($quit) {
  52. debug_log("Transfer#{$transferId} quiting", 'WARNING', LOG_NAME);
  53. unlink($lockFile);
  54. if (kill_child_process($transferId, $childPid)) { //先杀子进程
  55. exit(0);
  56. } else {
  57. debug_log("Transfer#{$transferId} quit failed", 'ERROR', LOG_NAME);
  58. exit(1);
  59. }
  60. } elseif ($restart) {
  61. debug_log("Transfer#{$transferId} restarting", 'WARNING', LOG_NAME);
  62. unlink($lockFile);
  63. if (kill_child_process($transferId, $childPid)) { //先杀子进程
  64. pcntl_exec($cfg['php_bin_path'], $argv);
  65. }
  66. debug_log("Transfer#{$transferId} restart failed", 'ERROR', LOG_NAME);
  67. exit(1);
  68. } else {
  69. $async->ping();
  70. }
  71. }
  72. }
  73. } catch (RedisException $e) {
  74. debug_log("Transfer#{$transferId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
  75. }
  76. /**
  77. * 信号处理函数
  78. * @param int $signo 信号值
  79. */
  80. function sig_handle($signo)
  81. {
  82. global $cfg, $q, $parentId, $childPid, $quit, $restart;
  83. switch ($signo) {
  84. //退出
  85. case SIGTERM:
  86. $quit = true;
  87. pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGUSR1, SIGUSR2, SIGCHLD]);
  88. break;
  89. //重启
  90. case SIGUSR1:
  91. $restart = true;
  92. pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGUSR1, SIGUSR2, SIGCHLD]);
  93. break;
  94. //保持连接
  95. case SIGUSR2:
  96. break;
  97. //子进程退出
  98. case SIGCHLD:
  99. if (!$quit && pcntl_waitpid($childPid, $status, WNOHANG) == $childPid) {
  100. $childPid = create_child_process($parentId, $q, $cfg['timeout']);
  101. }
  102. break;
  103. }
  104. }
  105. /**
  106. * 创建子进程
  107. * @param int $ppid 父进程的ID
  108. * @param resource $queue 消息队列的资源ID
  109. * @param int $timeout SLEEP的时间
  110. * @return int 子进程ID
  111. */
  112. function create_child_process($ppid, $queue, $timeout)
  113. {
  114. global $async;
  115. $pid = pcntl_fork();
  116. //创建失败
  117. if ($pid < 0) {
  118. die("pcntl_fork() failed!\n");
  119. }
  120. //父进程
  121. if ($pid > 0) {
  122. return $pid;
  123. }
  124. //子进程的主循环
  125. pcntl_signal(SIGTERM, SIG_DFL); //quit
  126. pcntl_signal(SIGUSR1, SIG_DFL); //restart
  127. pcntl_signal(SIGUSR2, SIG_DFL); //ping
  128. pcntl_signal(SIGCHLD, SIG_DFL); //child quit
  129. cli_set_process_title('php -f ' . implode(' ', $_SERVER['argv']) . ' [timer]');
  130. if ($async) {
  131. unset($async);
  132. }
  133. while (1) {
  134. sleep($timeout);
  135. posix_kill($ppid, SIGUSR2);
  136. }
  137. exit(0);
  138. }
  139. /**
  140. * 杀死子进程
  141. * @param int $pid 子进程ID
  142. * @return bool 是否成功
  143. */
  144. function kill_child_process($transferId, $pid)
  145. {
  146. if (!posix_kill($pid, SIGTERM)) {
  147. debug_log("Transfer#{$transferId} posix_kill({$pid}) failed", 'ERROR', LOG_NAME);
  148. return false;
  149. }
  150. $returnPid = pcntl_waitpid($pid, $status);
  151. if ($returnPid != $pid) {
  152. debug_log("Transfer#{$transferId} pcntl_waitpid({$pid}) call failed, return value = {$returnPid}", 'ERROR', LOG_NAME);
  153. return false;
  154. }
  155. return true;
  156. }