transfer1.php 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. <?php
  2. define('CONFIG_FILE', __DIR__ . '/config.php');
  3. define('LOG_NAME', 'transfer1');
  4. define('DGRAM_MAXSIZE', 1000); //理论上限65507,内网建议值1472,公网建议值548
  5. include __DIR__ . '/functions.php';
  6. include __DIR__ . '/class.async.php';
  7. //加载配置
  8. $cfg = [];
  9. load_config();
  10. //检查参数
  11. $addr = "{$cfg['udp_host']}:{$cfg['udp_port']}";
  12. $quit = $restart = false;
  13. //保障只运行一个进程
  14. $lockFile = run_single_instance($cfg['udp_port']);
  15. //设置信号处理函数
  16. pcntl_signal(SIGTERM, 'sig_handle');
  17. pcntl_signal(SIGUSR1, 'sig_handle');
  18. //创建UDP服务
  19. $s = stream_socket_server("udp://{$addr}", $errno, $error, STREAM_SERVER_BIND);
  20. if (!$s) {
  21. debug_log("Transfer@{$addr} stream_socket_server() call failed: #{$errno},{$error}", 'ERROR', LOG_NAME);
  22. exit(1);
  23. }
  24. //接收UDP请求
  25. try {
  26. $async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
  27. debug_log("Transfer@{$addr} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
  28. while (1) {
  29. $rset = $wset = $eset = [];
  30. $rset[] = $s;
  31. $n = 0;
  32. declare (ticks = 1) {
  33. $n = stream_select($rset, $wset, $eset, $cfg['timeout']);
  34. }
  35. if ($n === false) {
  36. fclose($s);
  37. unlink($lockFile);
  38. if ($quit) { //退出
  39. debug_log("Transfer@{$addr} quiting", 'WARNING', LOG_NAME);
  40. exit(0);
  41. } elseif ($restart) { //重启
  42. debug_log("Transfer@{$addr} restarting", 'WARNING', LOG_NAME);
  43. pcntl_exec($cfg['php_bin_path'], $argv);
  44. debug_log("Transfer@{$addr} restart failed", 'ERROR', LOG_NAME);
  45. exit(1);
  46. } else { //错误
  47. debug_log("Transfer@{$addr} stream_select() call failed", 'ERROR', LOG_NAME);
  48. exit(1);
  49. }
  50. } else if ($n === 0) { //超时
  51. $async->ping();
  52. } else { //有数据
  53. $buffer = stream_socket_recvfrom($s, DGRAM_MAXSIZE, 0, $from);
  54. if (empty($buffer)) {
  55. debug_log("Transfer@{$addr} stream_socket_recvfrom() got empty data from {$from}", 'WARNING', LOG_NAME);
  56. continue;
  57. }
  58. $data = json_decode($buffer, true);
  59. if (!$data || !is_array($data) || count($data) < 1) {
  60. continue;
  61. }
  62. $async->pushJob($data);
  63. }
  64. }
  65. } catch (RedisException $e) {
  66. debug_log("Transfer@{$addr} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
  67. }
  68. fclose($s);
  69. unlink($lockFile);
  70. /**
  71. * 信号处理函数
  72. * @param int $signo 信号
  73. */
  74. function sig_handle($signo)
  75. {
  76. global $quit, $restart;
  77. switch ($signo)
  78. {
  79. //退出
  80. case SIGTERM:
  81. $quit = true;
  82. break;
  83. //重启
  84. case SIGUSR1:
  85. $restart = true;
  86. break;
  87. }
  88. }