123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- <?php
- define('CONFIG_FILE', __DIR__ . '/config.php');
- define('LOG_NAME', 'transfer1');
- define('DGRAM_MAXSIZE', 1000); //理论上限65507,内网建议值1472,公网建议值548
- include __DIR__ . '/functions.php';
- include __DIR__ . '/class.async.php';
- //加载配置
- $cfg = [];
- load_config();
- //检查参数
- $addr = "{$cfg['udp_host']}:{$cfg['udp_port']}";
- $quit = $restart = false;
- //保障只运行一个进程
- $lockFile = run_single_instance($cfg['udp_port']);
- //设置信号处理函数
- pcntl_signal(SIGTERM, 'sig_handle');
- pcntl_signal(SIGUSR1, 'sig_handle');
- //创建UDP服务
- $s = stream_socket_server("udp://{$addr}", $errno, $error, STREAM_SERVER_BIND);
- if (!$s) {
- debug_log("Transfer@{$addr} stream_socket_server() call failed: #{$errno},{$error}", 'ERROR', LOG_NAME);
- exit(1);
- }
- //接收UDP请求
- try {
- $async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
- debug_log("Transfer@{$addr} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
- while (1) {
- $rset = $wset = $eset = [];
- $rset[] = $s;
- $n = 0;
- declare (ticks = 1) {
- $n = stream_select($rset, $wset, $eset, $cfg['timeout']);
- }
- if ($n === false) {
- fclose($s);
- unlink($lockFile);
- if ($quit) { //退出
- debug_log("Transfer@{$addr} quiting", 'WARNING', LOG_NAME);
- exit(0);
- } elseif ($restart) { //重启
- debug_log("Transfer@{$addr} restarting", 'WARNING', LOG_NAME);
- pcntl_exec($cfg['php_bin_path'], $argv);
- debug_log("Transfer@{$addr} restart failed", 'ERROR', LOG_NAME);
- exit(1);
- } else { //错误
- debug_log("Transfer@{$addr} stream_select() call failed", 'ERROR', LOG_NAME);
- exit(1);
- }
- } else if ($n === 0) { //超时
- $async->ping();
- } else { //有数据
- $buffer = stream_socket_recvfrom($s, DGRAM_MAXSIZE, 0, $from);
- if (empty($buffer)) {
- debug_log("Transfer@{$addr} stream_socket_recvfrom() got empty data from {$from}", 'WARNING', LOG_NAME);
- continue;
- }
- $data = json_decode($buffer, true);
- if (!$data || !is_array($data) || count($data) < 1) {
- continue;
- }
- $async->pushJob($data);
- }
- }
- } catch (RedisException $e) {
- debug_log("Transfer@{$addr} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
- }
- fclose($s);
- unlink($lockFile);
- /**
- * 信号处理函数
- * @param int $signo 信号
- */
- function sig_handle($signo)
- {
- global $quit, $restart;
- switch ($signo)
- {
- //退出
- case SIGTERM:
- $quit = true;
- break;
- //重启
- case SIGUSR1:
- $restart = true;
- break;
- }
- }
|