123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- <?php
- define('CONFIG_FILE', __DIR__ . '/config.php');
- define('LOG_NAME', 'transfer2');
- define('MSG_MAX_SIZE', 1024);
- include __DIR__ . '/functions.php';
- include __DIR__ . '/class.async.php';
- //检查参数
- if ($argc < 2) {
- die("Usage: {$argv[0]} TRANSFER_ID\n");
- }
- //加载配置
- $cfg = [];
- load_config();
- $transferId = intval($argv[1]); //进程的编号
- $queueKey = intval($cfg['sysvmsg_key']); //消息队列的KEY
- //保障只运行一个进程
- $lockFile = run_single_instance($transferId);
- //创建队列
- $q = msg_get_queue($queueKey, 0666);
- if (!$q) {
- die("msg_get_queue() failed!\n");
- }
- //创建子进程
- $parentId = getmypid();
- $childPid = create_child_process($parentId, $q, $cfg['timeout']);
- //修改进程名称
- cli_set_process_title('php -f ' . implode(' ', $argv) . ' [master]');
- //设置信号处理函数(在子进程创建之后才设置信号处理函数)
- $quit = $restart = false;
- $async = null;
- pcntl_signal(SIGTERM, 'sig_handle'); //quit
- pcntl_signal(SIGUSR1, 'sig_handle'); //restart
- pcntl_signal(SIGUSR2, 'sig_handle'); //ping
- pcntl_signal(SIGCHLD, 'sig_handle'); //child quit
- //从队列中取消息
- try {
- $async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
- debug_log("Transfer#{$transferId} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
- $flag = false;
- while (1) {
- declare (ticks = 1) {
- $flag = msg_receive($q, 0, $msgtype, MSG_MAX_SIZE, $msg, false, 0, $errcode);
- }
- if ($flag) {
- $data = json_decode($msg, true);
- if (!$data || !is_array($data) || count($data) < 1) {
- continue;
- }
- $async->pushJob($data);
- } else {
- if ($quit) {
- debug_log("Transfer#{$transferId} quiting", 'WARNING', LOG_NAME);
- unlink($lockFile);
- if (kill_child_process($transferId, $childPid)) { //先杀子进程
- exit(0);
- } else {
- debug_log("Transfer#{$transferId} quit failed", 'ERROR', LOG_NAME);
- exit(1);
- }
- } elseif ($restart) {
- debug_log("Transfer#{$transferId} restarting", 'WARNING', LOG_NAME);
- unlink($lockFile);
- if (kill_child_process($transferId, $childPid)) { //先杀子进程
- pcntl_exec($cfg['php_bin_path'], $argv);
- }
- debug_log("Transfer#{$transferId} restart failed", 'ERROR', LOG_NAME);
- exit(1);
- } else {
- $async->ping();
- }
- }
- }
- } catch (RedisException $e) {
- debug_log("Transfer#{$transferId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
- }
- /**
- * 信号处理函数
- * @param int $signo 信号值
- */
- function sig_handle($signo)
- {
- global $cfg, $q, $parentId, $childPid, $quit, $restart;
- switch ($signo) {
- //退出
- case SIGTERM:
- $quit = true;
- pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGUSR1, SIGUSR2, SIGCHLD]);
- break;
- //重启
- case SIGUSR1:
- $restart = true;
- pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGUSR1, SIGUSR2, SIGCHLD]);
- break;
- //保持连接
- case SIGUSR2:
- break;
- //子进程退出
- case SIGCHLD:
- if (!$quit && pcntl_waitpid($childPid, $status, WNOHANG) == $childPid) {
- $childPid = create_child_process($parentId, $q, $cfg['timeout']);
- }
- break;
- }
- }
- /**
- * 创建子进程
- * @param int $ppid 父进程的ID
- * @param resource $queue 消息队列的资源ID
- * @param int $timeout SLEEP的时间
- * @return int 子进程ID
- */
- function create_child_process($ppid, $queue, $timeout)
- {
- global $async;
- $pid = pcntl_fork();
- //创建失败
- if ($pid < 0) {
- die("pcntl_fork() failed!\n");
- }
- //父进程
- if ($pid > 0) {
- return $pid;
- }
- //子进程的主循环
- pcntl_signal(SIGTERM, SIG_DFL); //quit
- pcntl_signal(SIGUSR1, SIG_DFL); //restart
- pcntl_signal(SIGUSR2, SIG_DFL); //ping
- pcntl_signal(SIGCHLD, SIG_DFL); //child quit
- cli_set_process_title('php -f ' . implode(' ', $_SERVER['argv']) . ' [timer]');
- if ($async) {
- unset($async);
- }
- while (1) {
- sleep($timeout);
- posix_kill($ppid, SIGUSR2);
- }
- exit(0);
- }
- /**
- * 杀死子进程
- * @param int $pid 子进程ID
- * @return bool 是否成功
- */
- function kill_child_process($transferId, $pid)
- {
- if (!posix_kill($pid, SIGTERM)) {
- debug_log("Transfer#{$transferId} posix_kill({$pid}) failed", 'ERROR', LOG_NAME);
- return false;
- }
- $returnPid = pcntl_waitpid($pid, $status);
- if ($returnPid != $pid) {
- debug_log("Transfer#{$transferId} pcntl_waitpid({$pid}) call failed, return value = {$returnPid}", 'ERROR', LOG_NAME);
- return false;
- }
- return true;
- }
|