Browse Source

添加代码

weicky 4 years ago
commit
d5a3465924
17 changed files with 1323 additions and 0 deletions
  1. 166 0
      class.async.php
  2. 59 0
      class.jobs.php
  3. 14 0
      config.php
  4. 55 0
      demo.php
  5. 172 0
      functions.php
  6. 19 0
      redis-cmd.php
  7. 192 0
      redis-stat.php
  8. 18 0
      start.sh
  9. 19 0
      stop.sh
  10. 22 0
      sysvmsg-cmd.php
  11. 111 0
      sysvmsg-stat.php
  12. 96 0
      transfer1.php
  13. 91 0
      transfer1b.php
  14. 7 0
      transfer1b.sh
  15. 170 0
      transfer2.php
  16. 25 0
      udp-cmd.php
  17. 87 0
      worker.php

+ 166 - 0
class.async.php

@@ -0,0 +1,166 @@
+<?php
+/**
+ * 异步任务队列类
+ */
+
+class Async
+{
+	/**
+	 * Redis的服务地址
+	 * @var string
+	 */
+	protected $host;
+
+	/**
+	 * Redis的服务端口
+	 * @var int
+	 */
+	protected $port;
+
+	/**
+	 * Worker的数量
+	 * @var int
+	 */
+	protected $workerNum;
+
+	/**
+	 * Redis对象
+	 * @var Redis
+	 */
+	protected $redis;
+
+	/**
+	 * 管理命令列表
+	 */
+	protected $adminCmdList = ['stop', 'restart', 'reload', 'quit', 'exit'];
+
+	/**
+	 * 异步任务数据的队列KEY
+	 * @const string
+	 */
+	const KEY_QUEUE = 'ASYNC_Q';
+
+	/**
+	 * 发送管理命令的Redis队列KEY
+	 * @const string
+	 */
+	const KEY_ADMIN = 'ADMIN_Q';
+
+	/**
+	 * 管理命令的标识
+	 * @const string
+	 */
+	const CMD_ADMIN = '!';
+
+	/**
+	 * 构造函数
+	 * @param string $host Redis服务地址
+	 * @param string $port Redis服务端口
+	 * @param int $workerNum Worker的数量
+	 */
+	public function __construct($host, $port, $workerNum)
+	{
+		$this->host = $host;
+		$this->port = $port;
+		$this->workerNum = $workerNum;
+		$this->redis = new Redis();
+		$this->redis->connect($host, $port, 3);
+	}
+
+	/**
+	 * 添加一条管理命令
+	 * @param int $workerId 指定操作的目标workerId
+	 * @param string $cmd 管理命令(stop,restart,reload,quit,exit)
+	 * @return bool 是否成功
+	 */
+	public function admin($cmd, $workerId = -1)
+	{
+		if (!in_array($cmd, $this->adminCmdList)) {
+			return false;
+		}
+		if ($workerId > -1) {
+			$flag = $this->redis->rPush(self::KEY_ADMIN . $workerId, $cmd);
+			return $flag !== false;
+		} else {
+			for ($i = 0; $i < $this->workerNum; $i++) {
+				$flag = $this->redis->rPush(self::KEY_ADMIN . $i, $cmd);
+				if ($flag === false) {
+					return false;
+				}
+			}
+			return true;
+		}
+	}
+
+	/**
+	 * 向任务队列中添加一条任务数据
+	 * @param string $name 任务名
+	 * @param mixed $arg1 参数1
+	 * ...
+	 * @return bool|int 成功返回队列中的消息数量,失败返回false
+	 */
+	public function pushJob()
+	{
+		$argc = func_num_args();
+		$args = func_get_args();
+		if ($argc < 1) {
+			return false;
+		}
+		if ($argc == 1 && is_array($args)) {
+			$args = $args[0];
+		}
+		return $this->redis->rPush(self::KEY_QUEUE, json_encode($args));
+	}
+
+	/**
+	 * 从任务队列中取一条任务数据出来(阻塞读取)
+	 * @param int $workerId 当前workerId,用于提取管理命令
+	 * @param int $timeout 超时时间
+	 * @return bool|array 成功返回任务数据,失败返回false
+	 */
+	public function popJob($workerId = -1, $timeout = 5)
+	{
+		$keys = [self::KEY_QUEUE];
+		if ($workerId > -1) {
+			$keys[] = self::KEY_ADMIN . $workerId;
+		}
+		$ret = $this->redis->blPop($keys, $timeout);
+		if (!$ret || empty($ret)) {
+			return false;
+		}
+		$key = $ret[0];
+		$value = $ret[1];
+		if ($key == self::KEY_QUEUE) {
+			return json_decode($value, true);
+		} else {
+			return [self::CMD_ADMIN, $value];
+		}
+	}
+
+	/**
+	 * ping操作,用于保持连接不断开
+	 */
+	public function ping()
+	{
+		return $this->redis->ping();
+	}
+
+	/**
+	 * 魔术方法
+	 * @param string $method 被调用的方法名
+	 * @param array $args 传入的参数
+	 * @return mixed
+	 */
+	public function __call($method, $args)
+	{
+		if (empty($method) || preg_match('/\W/', $method)) {
+			return false;
+		}
+		if (in_array($method, $this->adminCmdList)) {
+			return $this->admin($method);
+		} else {
+			array_unshift($args, $method);
+			return $this->pushJob($args);
+		}
+	}
+}

+ 59 - 0
class.jobs.php

@@ -0,0 +1,59 @@
+<?php
+/**
+ * 异步任务执行类
+ */
+
+class Jobs
+{
+	/**
+	 * 执行任务统一入口
+	 * @param string $name 任务名
+	 * @param array $args 参数列表
+	 */
+	public function exec($name, $args)
+	{
+		if (!method_exists($this, $name)) {
+			debug_log("method {$name} is not exits!", 'DEBUG', 'jobs');
+			return false;
+		}
+		return call_user_func_array([$this, $name], $args);
+	}
+
+	/**
+	 * 测试
+	 */
+	protected function test()
+	{
+		$args = func_get_args();
+		debug_log('test(' . implode(',', $args) . ');', 'DEBUG', 'jobs');
+	}
+
+	/**
+	 * 抓百度页面
+	 */
+	protected function curl_get($url)
+	{
+		if (empty($url)) {
+			return;
+		}
+		$get = [];
+		$opt = [];
+		$error = '';
+		$errno = 0;
+		$code = 0;
+		$resp = curl_get($url, $get, $opt, $error, $errno, $code);
+		if ($errno || $code != 200) {
+			debug_log("curl_get({$url}) failed: #{$errno},{$error},http-status={$code}", 'ERROR', 'jobs');
+		} else {
+			debug_log("curl_get({$url}) => {$resp}", 'DEBUG', 'jobs');
+		}
+	}
+
+	/**
+	 * 打印当前进程的id
+	 */
+	protected function pid()
+	{
+		debug_log('Current process\'s id is: ' . getmypid(), 'DEBUG', 'jobs');
+	}
+}

+ 14 - 0
config.php

@@ -0,0 +1,14 @@
+<?php
+return [
+	'worker_num' => 3, //worker进程的数量
+	'redis_host' => '127.0.0.1', //Redis服务的IP
+	'redis_port' => 6379, //Redis服务的端口
+	'log_path' => __DIR__, //错误日志路径
+	'log_max_size' => 1024*1024*10, //错误日志最大占用空间
+	'log_types' => ['DEBUG', 'WARNING', 'ERROR', 'EXCEPTION'], //错误日志级别列表
+	'php_bin_path' => '/usr/bin/php', //php可执行程序路径
+	'udp_host' => '0.0.0.0', //UDP服务的IP
+	'udp_port' => 8888, //UDP服务的端口
+	'sysvmsg_key' => 0x2000, //Sysvmsg队列的KEY
+	'timeout' => 5,
+];

+ 55 - 0
demo.php

@@ -0,0 +1,55 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+
+include __DIR__ . '/functions.php';
+include __DIR__ . '/class.async.php';
+
+$cfg = [];
+load_config();
+
+// ---- 实际的业务代码 ----
+$send = empty($_GET['send']) ? 'redis' : $_GET['send'];
+VIEW(['ext' => get_loaded_extensions()]);
+
+// ---- 发起异步任务 ----
+switch ($send) {
+	case 'udp': //使用UDP
+		$s = stream_socket_client("udp://{$cfg['udp_host']}:{$cfg['udp_port']}", $errno, $error);
+		if ($s) {
+			$flag = fwrite($s, json_encode(array_merge(['test'], $_GET)));
+			fclose($s);
+		}
+		break;
+
+	case 'sysvmsg': //使用SYSVMSG
+		$q = msg_get_queue($cfg['sysvmsg_key'], 0666);
+		if ($q) {
+			msg_send($q, 1, json_encode(array_merge(['test'], $_GET)), false, false);
+		}
+		break;
+
+	default: //直接使用Redis-Cli
+		$async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
+		$async->pushJob(array_merge(['test'], $_GET));
+		break;
+}
+
+// ---- 其它函数 ----
+function VIEW($data) { ?>
+<!doctype html>
+<html>
+<head>
+<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
+<title>扩展列表</title>
+</head>
+<body>
+<h1>扩展列表</h1>
+<ul>
+	<?php foreach ($data['ext'] as $item) { ?>
+	<li><?php echo $item; ?></li>
+	<?php } ?>
+</ul>
+</body>
+</html>
+<?php
+}

+ 172 - 0
functions.php

@@ -0,0 +1,172 @@
+<?php
+/**
+ * 加载配置
+ */
+function load_config()
+{
+	$GLOBALS['cfg'] = include(CONFIG_FILE);
+}
+
+/**
+ * 写入调试日志
+ * @param string $msg 日志消息
+ * @param string $type 日志级别
+ * @param string $file 文件名
+ * @return bool 操作状态
+ */
+function debug_log($msg, $type = 'DEBUG', $file = 'debug')
+{
+	global $cfg;
+	$type = strtoupper($type);
+	if (!in_array($type, $cfg['log_types'])) {
+		return false;
+	}
+	$msg = is_scalar($msg) ? strval($msg) : json_encode($msg, JSON_UNESCAPED_UNICODE);
+	$file = $cfg['log_path'] . '/' . $file . '.log';
+	clearstatcache(true, $file);
+	$flags = (file_exists($file) && filesize($file) >= $cfg['log_max_size']) ? 0 : FILE_APPEND;
+	return file_put_contents($file, date("[Y-m-d H:i:s] ") . $type . ': ' . $msg . "\n", $flags);
+}
+
+/**
+ * 确保只有一个实例在运行
+ * @param int $workerId WorkerID
+ * @return string 锁文件路径
+ */
+function run_single_instance($workerId)
+{
+	//锁文件路径
+	$lock = __DIR__ . '/' . basename($_SERVER['argv'][0], '.php') . '.' . $workerId . '.pid';
+	//清除缓存,避免 file_exists() 取到缓存的结果
+	clearstatcache(true, $lock);
+	//检查锁文件
+	if (file_exists($lock)) {
+		$saved = trim(file_get_contents($lock));
+		$count = intval(shell_exec("ps -Fp {$saved} | grep '{$_SERVER['argv'][0]}' | grep -v grep | wc -l"));
+		if ($count > 0) {
+			exit(0);
+		}
+		shell_exec("ps -Fp {$saved}| grep '{$_SERVER['argv'][0]}' | grep -v grep | awk '{print $2}' | xargs --no-run-if-empty kill"); //安全起见再kill一次
+	}
+	//保存当前进程ID
+	if (!file_put_contents($lock, getmypid() . "\n")) {
+		echo "Start failed: {$lock} write failed\n";
+		exit(1);
+	}
+	return $lock;
+}
+
+/**
+ * 使用cURL发送POST请求
+ * @param string $url 请求地址
+ * @param array $post POST数据数组
+ * @param array $options HTTP选项数组
+ * @param string $error 用于返回错误信息
+ * @param int $errno 用于返回错误码
+ * @param string $httpCode 用于返回响应的HTTP状态码
+ * @param array $outHeaders 用于获得返回的HTTP头
+ * @return mixed 成功返回请求返回结果,失败返回flase
+ */
+function curl_post($url, $post = [], $options = [], &$error = false, &$errno = false, &$httpCode = false, $isupload = false, &$outHeaders = false)
+{
+    $heads = array();
+    $defaults = [
+        CURLOPT_POST			=> 1,
+        CURLOPT_URL				=> $url,
+        CURLOPT_RETURNTRANSFER	=> 1,
+        CURLOPT_CONNECTTIMEOUT	=> 5,
+        CURLOPT_TIMEOUT			=> 10,
+        CURLOPT_POSTFIELDS		=> ($isupload || is_string($post)) ? $post : http_build_query($post),
+    ];
+    if ($isupload) {
+        $defaults[CURLOPT_SAFE_UPLOAD] = true;
+    }
+    if (is_array($outHeaders)) {
+        $defaults[CURLOPT_HEADERFUNCTION] = function ($c, $v) use(&$outHeaders) {
+            if(!empty(rtrim($v))) {
+                list($field, $value) = explode(':', $v, 2);
+                if ($field && $value) {
+                    $outHeaders[strtolower($field)] = trim($value);
+                }
+            }
+            return strlen($v);
+        };
+    }
+    $ch = curl_init();
+    $result = '';
+    if ($ch) {
+        foreach ($options as $k=>$v) {
+            $defaults[$k] = $v;
+        }
+        curl_setopt_array($ch, $defaults);
+        $result = curl_exec($ch);
+        if ($result === false) {
+            if ($error !== false) {
+                $error = curl_error($ch);
+            }
+            if ($errno !== false) {
+                $errno = curl_errno($ch);
+            }
+        }
+        if ($httpCode !== false) {
+            $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
+        }
+        curl_close($ch);
+    }
+    return $result;
+}
+
+/**
+ * 使用cURL发送GET请求
+ * @param string $url 请求地址
+ * @param array $post GET数据数组
+ * @param array $options HTTP选项数组
+ * @param string $error 用于返回错误信息
+ * @param int $errno 用于返回错误码
+ * @param string $httpCode 用于返回响应的HTTP状态码
+ * @param array $outHeaders 用于获得返回的HTTP头
+ * @return mixed 成功返回请求返回结果,失败返回flase
+ */
+function curl_get($url, $get = [], $options = [], &$error = false, &$errno = false, &$httpCode = false, &$outHeaders = false)
+{
+    $defaults = [
+        CURLOPT_URL				=> $url. (strpos($url, '?') === FALSE ? '?' : '&'). http_build_query($get),
+        CURLOPT_HEADER			=> 0,
+        CURLOPT_RETURNTRANSFER	=> TRUE,
+        CURLOPT_CONNECTTIMEOUT	=> 5,
+        CURLOPT_TIMEOUT			=> 10,
+    ];
+    if (is_array($outHeaders)) {
+        $defaults[CURLOPT_HEADERFUNCTION] = function ($c, $v) use(&$outHeaders) {
+            if (!empty(rtrim($v))) {
+                list($field, $value) = explode(':', $v, 2);
+                if ($field && $value) {
+                    $outHeaders[strtolower($field)] = trim($value);
+                }
+            }
+            return strlen($v);
+        };
+    }
+    $ch = curl_init();
+    $result = '';
+    if ($ch) {
+        foreach ($options as $k=>$v) {
+            $defaults[$k] = $v;
+        }
+        curl_setopt_array($ch, $defaults);
+        $result = curl_exec($ch);
+        if ($result === false) {
+            if ($error !== false) {
+                $error = curl_error($ch);
+            }
+            if ($errno !== false) {
+                $errno = curl_errno($ch);
+            }
+        }
+        if ($httpCode !== false) {
+            $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
+        }
+        curl_close($ch);
+    }
+    return $result;
+}

+ 19 - 0
redis-cmd.php

@@ -0,0 +1,19 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+
+include __DIR__ . '/functions.php';
+include __DIR__ . '/class.async.php';
+
+$cfg = [];
+load_config();
+
+if ($argc < 2) {
+	die("Usage: {$argv[0]} cmd arg...\n");
+}
+
+$args = $argv;
+array_shift($args);
+$method = array_shift($args);
+$async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
+$ret = $async->$method($args);
+echo $ret ? "success\n" : "FAILED\n";

+ 192 - 0
redis-stat.php

@@ -0,0 +1,192 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+
+include __DIR__ . '/functions.php';
+
+$cfg = [];
+load_config();
+
+function stat_redis($host, $port)
+{
+	$stat = null;
+	try {
+		$redis = new Redis();
+		$redis->connect($host, $port);
+		$stat = $redis->info();
+		$stat['server'] = "{$host}:{$port}";
+		if($stat['process_id']) {
+			$item = $redis->config('GET', 'maxmemory');
+			$stat['maxmemory'] = $item['maxmemory'];
+			$item = $redis->config('GET', 'databases');
+			$stat['databases'] = $item['databases'] ? intval($item['databases']) : 16;
+		}
+	} catch (RedisException $e) {
+		$stat = false;
+	}
+	return $stat;
+}
+
+function parse_db_stat($stat)
+{
+	$ret = [];
+	if (empty($stat)) {
+		return $ret;
+	}
+	$arr = explode(',', $stat);
+	foreach ($arr as $item) {
+		list($k, $v) = explode('=', $item, 2);
+		$ret[$k] = $v;
+	}
+	return $ret;
+}
+
+$redisStats  = array();
+$redisStats['default'] = stat_redis($cfg['redis_host'], $cfg['redis_port']);
+?>
+<!doctype html>
+<html>
+<head>
+<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
+<title>缓存状态监控</title>
+<style type="text/css">
+body{
+	font-size: 12px;
+}
+h1{
+	width: 1600px;
+}
+table{
+	border-collapse: collapse;
+}
+th, td{
+	padding: 2px 6px;
+	border: 1px solid #000000;
+}
+tbody tr:hover{
+	background-color: #DDDDFF;
+}
+td{
+	text-align: right;
+}
+td ul{
+	list-style: none;
+	margin: 0px;
+	padding: 0px;
+}
+td li{
+	margin: 0px;
+	padding: 0px;
+}
+.left{
+	text-align: left;
+}
+.right{
+	text-align: right;
+}
+.red{
+	color: #FF0000;
+}
+.blue{
+	color: #0000FF;
+}
+.green{
+	color: #009900;
+}
+.yellow{
+	color: #999900;
+}
+</style>
+</head>
+<body>
+<h1>Redis</h1>
+<table>
+	<thead>
+		<tr>
+			<th>名称</th>
+			<th>地址</th>
+			<th>进程ID</th>
+			<th>版本号</th>
+			<th>上线时长</th>
+			<th>用户CPU时长</th>
+			<th>OPS/QPS</th>
+			<th>总空间</th>
+			<th>已用空间</th>
+			<th>过期数</th>
+			<th>丢弃数</th>
+			<th>当前客户端</th>
+			<th>阻塞客户端</th>
+			<th>总接收连接数</th>
+			<th>总处理指令数</th>
+			<th>拒绝连接数</th>
+			<th>获取命中</th>
+			<th>获取未命中</th>
+			<th>DB0使用</th>
+			<!--
+			<th>最长输出列表</th>
+			<th>最大输入缓冲</th>
+			-->
+		</tr>
+	</thead>
+	<tbody>
+		<?php foreach($redisStats as $name => $stats) { if (!$stats) continue; ?>
+		<tr>
+			<td class="left blue"><?php echo $name; ?></td>
+			<td class="left"><?php echo $stats['server']; ?></td>
+			<td class="<?php echo $stats['process_id'] ? '' : 'red'; ?>"><?php echo $stats['process_id'] ? $stats['process_id'] : '未启动'; ?></td>
+			<td class="left"><?php echo $stats['redis_version']; ?></td>
+			<td><?php echo intval($stats['uptime_in_days']) ; ?>天</td>
+			<td><?php echo floatval($stats['used_cpu_user']); ?>秒</td>
+			<td><?php echo $stats['instantaneous_ops_per_sec']; ?></td>
+			<td class="<?php echo $stats['process_id'] && !$stats['maxmemory'] ? 'red' : ''; ?>"><?php echo $stats['process_id'] && !$stats['maxmemory'] ? '无限' : sprintf("%.2f", intval($stats['maxmemory']) / 1048576) . 'M'; ?></td>
+			<td class="<?php echo $stats['maxmemory'] && $stats['used_memory'] && floatval(intval($stats['used_memory']) / intval($stats['maxmemory'])) > 0.8 ? 'red' : ''; ?>"><?php echo sprintf("%.2f", intval($stats['used_memory']) / 1048576); ?>M</td>
+			<td><?php echo intval($stats['expired_keys']); ?></td>
+			<td class="<?php echo $stats['evicted_keys'] ? 'red' : ''; ?>"><?php echo intval($stats['evicted_keys']); ?></td>
+			<td><?php echo intval($stats['connected_clients']); ?></td>
+			<td class="<?php echo $stats['blocked_clients'] ? 'red' : ''; ?>"><?php echo intval($stats['blocked_clients']); ?></td>
+			<td><?php echo intval($stats['total_connections_received']); ?></td>
+			<td><?php echo intval($stats['total_commands_processed']); ?></td>
+			<td class="<?php echo $stats['rejected_connections'] ? 'red' : ''; ?>"><?php echo intval($stats['rejected_connections']); ?></td>
+			<td><?php echo intval($stats['keyspace_hits']); ?></td>
+			<td><?php echo intval($stats['keyspace_misses']); ?></td>
+			<td class="left"><?php echo $stats['db0'] ? $stats['db0'] : '未使用'; ?></td>
+			<!--
+			<td><?php /* echo intval($stats['client_longest_output_list']); */ ?></td>
+			<td><?php /* echo intval($stats['client_biggest_input_buf']); */ ?></td>
+			-->
+		</tr>
+		<?php } ?>
+	</tbody>
+</table>
+
+<?php foreach($redisStats as $name => $stats) { ?>
+<h1>Redis - <?php echo $name; ?></h1>
+<table>
+	<thead>
+		<tr>
+			<th>DB序号</th>
+			<th>KEY数量</th>
+			<th>过期数</th>
+			<th>平均有效期</th>
+		</tr>
+	</thead>
+	<tbody>
+		<?php if($stats['databases']) { for($i=0; $i<$stats['databases']; $i++) { $dbStat = isset($stats["db{$i}"]) ? parse_db_stat($stats["db{$i}"]) : []; ?>
+		<tr>
+			<td><?php echo $i; ?></td>
+			<?php if (empty($dbStat)) { ?>
+			<td>-</td>
+			<td>-</td>
+			<td>-</td>
+			<?php } else { ?>
+			<td><?php echo $dbStat['keys']; ?></td>
+			<td><?php echo $dbStat['expires']; ?></td>
+			<td><?php echo $dbStat['avg_ttl']; ?></td>
+			<?php } ?>
+		</tr>
+		<?php } } ?>
+	</tbody>
+</table>
+<?php } ?>
+
+</body>
+</html>

+ 18 - 0
start.sh

@@ -0,0 +1,18 @@
+#!/bin/bash
+
+# 可以将下面的代码添加到crontab列表中
+# * * * * * bash <绝对路径>/start.sh
+
+
+WORKER_NUM=3
+WORKER_ID=0
+ROOT_PATH=$(cd $(dirname $0); pwd)
+
+while [ "$WORKER_ID" -lt "$WORKER_NUM" ]
+do
+	php -f $ROOT_PATH/worker.php "$WORKER_ID" 1>/dev/null 2>&1 &
+	((WORKER_ID++))
+done
+
+php -f $ROOT_PATH/transfer1.php 1>/dev/null 2>&1 &
+php -f $ROOT_PATH/transfer2.php '1' 1>/dev/null 2>&1 &

+ 19 - 0
stop.sh

@@ -0,0 +1,19 @@
+#!/bin/bash
+
+cd $(dirname $0)
+
+php -f redis-cmd.php stop
+
+if [ -f ./transfer1.*.pid ]; then
+	kill $(cat ./transfer1.*.pid)
+fi
+
+if [ -f ./transfer1b.*.pid ]; then
+	kill $(cat ./transfer1b.*.pid)
+fi
+
+if [ -f ./transfer2.*.pid ]; then
+	kill $(cat ./transfer2.*.pid)
+fi
+
+echo "done!"

+ 22 - 0
sysvmsg-cmd.php

@@ -0,0 +1,22 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+
+include __DIR__ . '/functions.php';
+
+//加载配置
+$cfg = [];
+load_config();
+
+//检查参数
+if ($argc < 2) {
+	die("Usage: {$argv[0]} CMD ARG...\n");
+}
+
+$queueKey = $cfg['sysvmsg_key'];
+$args = $argv;
+array_shift($args); //php_self
+
+//获取队列
+$q = msg_get_queue($queueKey, 0666);
+$ret = msg_send($q, 1, json_encode($args), false, false);
+var_dump($ret);

+ 111 - 0
sysvmsg-stat.php

@@ -0,0 +1,111 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+
+include __DIR__ . '/functions.php';
+
+//加载配置
+$cfg = [];
+load_config();
+
+$key = $cfg['sysvmsg_key'];
+$q = msg_get_queue($key);
+$stat = msg_stat_queue($q);
+?>
+<!doctype html>
+<html>
+<head>
+<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
+<title>消息队列使用统计</title>
+<style type="text/css">
+body{
+	font-size: 14px;
+}
+table{
+	border-collapse: collapse;
+}
+th, td{
+	padding: 5px 10px;
+	border: 1px solid #000000;
+}
+.center{
+	text-align: center;
+}
+.left{
+	text-align: left;
+}
+.right{
+	text-align: right;
+}
+</style>
+</head>
+<body>
+<h1>消息队列使用统计</h1>
+<table>
+	<tr>
+		<th class="center">KEY</th>
+		<th class="center">描述</th>
+		<th class="center">值</th>
+	</tr>
+	<tr>
+		<td>msg_perm.uid</td>
+		<td>所有者</td>
+		<td class="right"><?php echo shell_exec("id {$stat['msg_perm.uid']}"); ?></td>
+	</tr>
+	<tr>
+		<td>msg_perm.gid</td>
+		<td>所有者组</td>
+		<td class="right"><?php echo $stat['msg_perm.gid']; ?></td>
+	</tr>
+	<tr>
+		<td>msg_perm.mode</td>
+		<td>权限</td>
+		<td class="right">0<?php echo decoct($stat['msg_perm.mode']); ?></td>
+	</tr>
+	<tr>
+		<td>msg_stime</td>
+		<td>最后发送消息时间</td>
+		<td class="right"><?php echo $stat['msg_stime'] ? date('Y-m-d H:i:s', $stat['msg_stime']) . ' (' . $stat['msg_stime'] . ')' : '无'; ?></td>
+	</tr>
+	<tr>
+		<td>msg_rtime</td>
+		<td>最后接收消息时间</td>
+		<td class="right"><?php echo $stat['msg_rtime'] ? date('Y-m-d H:i:s', $stat['msg_rtime']) . ' (' . $stat['msg_rtime'] . ')' : '无'; ?></td>
+	</tr>
+	<tr>
+		<td>msg_ctime</td>
+		<td>创建时间</td>
+		<td class="right"><?php echo date('Y-m-d H:i:s', $stat['msg_ctime']) . ' (' . $stat['msg_ctime'] . ')'; ?></td>
+	</tr>
+	<tr>
+		<td>msg_qnum</td>
+		<td>当前队列中的消息数</td>
+		<td class="right"><?php echo $stat['msg_qnum']; ?></td>
+	</tr>
+	<tr>
+		<td>msgmnb</td>
+		<td>队列空间上限</td>
+		<td class="right"><?php echo $stat['msg_qbytes']; ?> 字节</td>
+	</tr>
+	<tr>
+		<td>msgmni</td>
+		<td>系统队列数量上限</td>
+		<td class="right"><?php echo file_get_contents('/proc/sys/kernel/msgmni'); ?> 个</td>
+	</tr>
+	<tr>
+		<td>msgmax</td>
+		<td>单条消息空间上限</td>
+		<td class="right"><?php echo file_get_contents('/proc/sys/kernel/msgmax'); ?> 字节</td>
+	</tr>
+	<tr>
+		<td>msg_lspid</td>
+		<td>最后发送消息的进程ID</td>
+		<td class="right"><?php echo $stat['msg_lspid']; ?></td>
+	</tr>
+	<tr>
+		<td>msg_lrpid</td>
+		<td>最后接收消息的进程ID</td>
+		<td class="right"><?php echo $stat['msg_lrpid']; ?></td>
+	</tr>
+</table>
+</body>
+</html>

+ 96 - 0
transfer1.php

@@ -0,0 +1,96 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+define('LOG_NAME', 'transfer1');
+define('DGRAM_MAXSIZE', 1000); //理论上限65507,内网建议值1472,公网建议值548
+
+include __DIR__ . '/functions.php';
+include __DIR__ . '/class.async.php';
+
+//加载配置
+$cfg = [];
+load_config();
+
+//检查参数
+$addr = "{$cfg['udp_host']}:{$cfg['udp_port']}";
+$quit = $restart = false;
+
+//保障只运行一个进程
+$lockFile = run_single_instance($cfg['udp_port']);
+
+//设置信号处理函数
+pcntl_signal(SIGTERM, 'sig_handle');
+pcntl_signal(SIGUSR1, 'sig_handle');
+
+//创建UDP服务
+$s = stream_socket_server("udp://{$addr}", $errno, $error, STREAM_SERVER_BIND);
+if (!$s) {
+	debug_log("Transfer@{$addr} stream_socket_server() call failed: #{$errno},{$error}", 'ERROR', LOG_NAME);
+	exit(1);
+}
+
+//接收UDP请求
+try {
+	$async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
+	debug_log("Transfer@{$addr} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
+	while (1) {
+		$rset = $wset = $eset = [];
+		$rset[] = $s;
+		$n = 0;
+		declare (ticks = 1) {
+			$n = stream_select($rset, $wset, $eset, $cfg['timeout']);
+		}
+		if ($n === false) {
+			fclose($s);
+			unlink($lockFile);
+			if ($quit) { //退出
+				debug_log("Transfer@{$addr} quiting", 'WARNING', LOG_NAME);
+				exit(0);
+			} elseif ($restart) { //重启
+				debug_log("Transfer@{$addr} restarting", 'WARNING', LOG_NAME);
+				pcntl_exec($cfg['php_bin_path'], $argv);
+				debug_log("Transfer@{$addr} restart failed", 'ERROR', LOG_NAME);
+				exit(1);
+			} else { //错误
+				debug_log("Transfer@{$addr} stream_select() call failed", 'ERROR', LOG_NAME);
+				exit(1);
+			}
+		} else if ($n === 0) { //超时
+			$async->ping();
+		} else { //有数据
+			$buffer = stream_socket_recvfrom($s, DGRAM_MAXSIZE, 0, $from);
+			if (empty($buffer)) {
+				debug_log("Transfer@{$addr} stream_socket_recvfrom() got empty data from {$from}", 'WARNING', LOG_NAME);
+				continue;
+			}
+			$data = json_decode($buffer, true);
+			if (!$data || !is_array($data) || count($data) < 1) {
+				continue;
+			}
+			$async->pushJob($data);
+		}
+	}
+} catch (RedisException $e) {
+	debug_log("Transfer@{$addr} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
+}
+fclose($s);
+unlink($lockFile);
+
+/**
+ * 信号处理函数
+ * @param int $signo 信号
+ */
+function sig_handle($signo)
+{
+	global $quit, $restart;
+	switch ($signo)
+	{
+		//退出
+		case SIGTERM:
+			$quit = true;
+			break;
+		//重启
+		case SIGUSR1:
+			$restart = true;
+			break;
+	}
+}

+ 91 - 0
transfer1b.php

@@ -0,0 +1,91 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+define('LOG_NAME', 'transfer1b');
+define('DGRAM_MAXSIZE', 1000); //理论上限65507,内网建议值1472,公网建议值548
+
+include __DIR__ . '/functions.php';
+include __DIR__ . '/class.async.php';
+
+//加载配置
+$cfg = [];
+load_config();
+
+//检查参数
+$addr = "{$cfg['udp_host']}:{$cfg['udp_port']}";
+$quit = $restart = false;
+
+//保障只运行一个进程
+$lockFile = run_single_instance($cfg['udp_port']);
+
+//设置信号处理函数
+pcntl_signal(SIGTERM, 'sig_handle', false);
+pcntl_signal(SIGUSR1, 'sig_handle', false);
+pcntl_signal(SIGUSR2, 'sig_handle', false);
+
+//创建UDP服务
+$s = stream_socket_server("udp://{$addr}", $errno, $error, STREAM_SERVER_BIND);
+if (!$s) {
+	debug_log("Transfer@{$addr} stream_socket_server() call failed: #{$errno},{$error}", 'ERROR', LOG_NAME);
+	exit(1);
+}
+
+//接收UDP请求
+try {
+	$async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
+	debug_log("Transfer@{$addr} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
+	while (1) {
+		$buffer = '';
+		declare (ticks = 1) {
+			$buffer = stream_socket_recvfrom($s, DGRAM_MAXSIZE, 0, $from);
+		}
+		if (empty($buffer)) {
+			if ($quit) { //退出
+				debug_log("Transfer@{$addr} quiting", 'WARNING', LOG_NAME);
+				fclose($s);
+				unlink($lockFile);
+				exit(0);
+			} elseif ($restart) { //重启
+				fclose($s);
+				unlink($lockFile);
+				debug_log("Transfer@{$addr} restarting", 'WARNING', LOG_NAME);
+				pcntl_exec($cfg['php_bin_path'], $argv);
+				debug_log("Transfer@{$addr} restart failed", 'ERROR', LOG_NAME);
+				exit(1);
+			} else { //定时器
+				$async->ping();
+			}
+		} else {
+			$data = json_decode($buffer, true);
+			if (!empty($data) && is_array($data)) {
+				$async->pushJob($data);
+			}
+		}
+	}
+} catch (RedisException $e) {
+	debug_log("Transfer@{$addr} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
+}
+fclose($s);
+unlink($lockFile);
+
+/**
+ * 信号处理函数
+ * @param int $signo 信号
+ */
+function sig_handle($signo)
+{
+	global $quit, $restart;
+	switch ($signo)
+	{
+		//退出
+		case SIGTERM:
+			$quit = true;
+			break;
+		//重启
+		case SIGUSR1:
+			$restart = true;
+			break;
+		//定时器
+		case SIGUSR2:
+			break;
+	}
+}

+ 7 - 0
transfer1b.sh

@@ -0,0 +1,7 @@
+#!/bin/bash
+
+while true
+do
+	sleep 5
+	kill -USR2 $(cat ./transfer1b.*.pid)
+done

+ 170 - 0
transfer2.php

@@ -0,0 +1,170 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+define('LOG_NAME', 'transfer2');
+define('MSG_MAX_SIZE', 1024);
+
+include __DIR__ . '/functions.php';
+include __DIR__ . '/class.async.php';
+
+//检查参数
+if ($argc < 2) {
+	die("Usage: {$argv[0]} TRANSFER_ID\n");
+}
+
+//加载配置
+$cfg = [];
+load_config();
+$transferId = intval($argv[1]); //进程的编号
+$queueKey = intval($cfg['sysvmsg_key']); //消息队列的KEY
+
+//保障只运行一个进程
+$lockFile = run_single_instance($transferId);
+
+//创建队列
+$q = msg_get_queue($queueKey, 0666);
+if (!$q) {
+	die("msg_get_queue() failed!\n");
+}
+
+//创建子进程
+$parentId = getmypid();
+$childPid = create_child_process($parentId, $q, $cfg['timeout']);
+
+//修改进程名称
+cli_set_process_title('php -f ' . implode(' ', $argv) . ' [master]');
+
+//设置信号处理函数(在子进程创建之后才设置信号处理函数)
+$quit = $restart = false;
+$async = null;
+pcntl_signal(SIGTERM, 'sig_handle'); //quit
+pcntl_signal(SIGUSR1, 'sig_handle'); //restart
+pcntl_signal(SIGUSR2, 'sig_handle'); //ping
+pcntl_signal(SIGCHLD, 'sig_handle'); //child quit
+
+//从队列中取消息
+try {
+	$async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
+	debug_log("Transfer#{$transferId} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
+	$flag = false;
+	while (1) {
+		declare (ticks = 1) {
+			$flag = msg_receive($q, 0, $msgtype, MSG_MAX_SIZE, $msg, false, 0, $errcode);
+		}
+		if ($flag) {
+			$data = json_decode($msg, true);
+			if (!$data || !is_array($data) || count($data) < 1) {
+				continue;
+			}
+			$async->pushJob($data);
+		} else {
+			if ($quit) {
+				debug_log("Transfer#{$transferId} quiting", 'WARNING', LOG_NAME);
+				unlink($lockFile);
+				if (kill_child_process($transferId, $childPid)) { //先杀子进程
+					exit(0);
+				} else {
+					debug_log("Transfer#{$transferId} quit failed", 'ERROR', LOG_NAME);
+					exit(1);
+				}
+			} elseif ($restart) {
+				debug_log("Transfer#{$transferId} restarting", 'WARNING', LOG_NAME);
+				unlink($lockFile);
+				if (kill_child_process($transferId, $childPid)) { //先杀子进程
+					pcntl_exec($cfg['php_bin_path'], $argv);
+				}
+				debug_log("Transfer#{$transferId} restart failed", 'ERROR', LOG_NAME);
+				exit(1);
+			} else {
+				$async->ping();
+			}
+		}
+	}
+} catch (RedisException $e) {
+	debug_log("Transfer#{$transferId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
+}
+
+/**
+ * 信号处理函数
+ * @param int $signo 信号值
+ */
+function sig_handle($signo)
+{
+	global $cfg, $q, $parentId, $childPid, $quit, $restart;
+	switch ($signo) {
+		//退出
+		case SIGTERM:
+			$quit = true;
+			pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGUSR1, SIGUSR2, SIGCHLD]);
+			break;
+		//重启
+		case SIGUSR1:
+			$restart = true;
+			pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGUSR1, SIGUSR2, SIGCHLD]);
+			break;
+		//保持连接
+		case SIGUSR2:
+			break;
+		//子进程退出
+		case SIGCHLD:
+			if (!$quit && pcntl_waitpid($childPid, $status, WNOHANG) == $childPid) {
+				$childPid = create_child_process($parentId, $q, $cfg['timeout']);
+			}
+			break;
+	}
+}
+
+/**
+ * 创建子进程
+ * @param int $ppid 父进程的ID
+ * @param resource $queue 消息队列的资源ID
+ * @param int $timeout SLEEP的时间
+ * @return int 子进程ID
+ */
+function create_child_process($ppid, $queue, $timeout)
+{
+	global $async;
+
+	$pid = pcntl_fork();
+	//创建失败
+	if ($pid < 0) {
+		die("pcntl_fork() failed!\n");
+	}
+	//父进程
+	if ($pid > 0) {
+		return $pid;
+	}
+	//子进程的主循环
+	pcntl_signal(SIGTERM, SIG_DFL); //quit
+	pcntl_signal(SIGUSR1, SIG_DFL); //restart
+	pcntl_signal(SIGUSR2, SIG_DFL); //ping
+	pcntl_signal(SIGCHLD, SIG_DFL); //child quit
+	cli_set_process_title('php -f ' . implode(' ', $_SERVER['argv']) . ' [timer]');
+	if ($async) {
+		unset($async);
+	}
+	while (1) {
+		sleep($timeout);
+		posix_kill($ppid, SIGUSR2);
+	}
+	exit(0);
+}
+
+/**
+ * 杀死子进程
+ * @param int $pid 子进程ID
+ * @return bool 是否成功
+ */
+function kill_child_process($transferId, $pid)
+{
+	if (!posix_kill($pid, SIGTERM)) {
+		debug_log("Transfer#{$transferId} posix_kill({$pid}) failed", 'ERROR', LOG_NAME);
+		return false;
+	}
+	$returnPid = pcntl_waitpid($pid, $status);
+	if ($returnPid != $pid) {
+		debug_log("Transfer#{$transferId} pcntl_waitpid({$pid}) call failed, return value = {$returnPid}", 'ERROR', LOG_NAME);
+		return false;
+	}
+	return true;
+}
+

+ 25 - 0
udp-cmd.php

@@ -0,0 +1,25 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+
+include __DIR__ . '/functions.php';
+
+//检查参数
+if ($argc < 2) {
+	die("Usage: {$argv[0]} CMD ARG...\n");
+}
+
+//加载配置
+$cfg = [];
+load_config();
+
+$s = stream_socket_client("udp://{$cfg['udp_host']}:{$cfg['udp_port']}", $errno, $error);
+if (!$s) {
+	die("stream_socket_server() call failed: #{$errno},{$error}");
+}
+
+$args = $argv;
+array_shift($args); //php_self
+$flag = fwrite($s, json_encode($args));
+fclose($s);
+
+var_dump($flag);

+ 87 - 0
worker.php

@@ -0,0 +1,87 @@
+<?php
+/**
+ * 运行环境:php7, php_redis, php_pcntl
+ */
+define('CONFIG_FILE', __DIR__ . '/config.php');
+define('LOG_NAME', 'worker');
+
+include __DIR__ . '/functions.php';
+include __DIR__ . '/class.async.php';
+include __DIR__ . '/class.jobs.php';
+
+//检查参数
+if ($argc < 2) {
+	die("Usage: {$argv[0]} worker_id\n");
+}
+$workerId = intval($_SERVER['argv'][1]);
+
+//确保只有一个实例在运行
+$lockFile = run_single_instance($workerId);
+
+//加载配置
+$cfg = [];
+load_config();
+
+try {
+	$async = new Async($cfg['redis_host'], $cfg['redis_port'], $cfg['worker_num']);
+	$jobs = new Jobs();
+	debug_log("Worker#{$workerId} started, pid = " . getmypid(), 'DEBUG', LOG_NAME);
+
+	while (1) {
+		//取数据
+		$data = $async->popJob($workerId, $cfg['timeout']);
+		//超时处理
+		if (!$data) {
+			//如果有使用数据库、其它的Redis,需要在此处对常驻的连接进行ping操作,以保持连接状态
+			continue;
+		}
+		//任务处理
+		$job = array_shift($data);
+		if ($job == Async::CMD_ADMIN) { //管理命令
+			$job = array_shift($data);
+			switch ($job) {
+				//退出
+				case 'stop':
+				case 'quit':
+				case 'exit':
+					debug_log("Worker#{$workerId} quiting", 'WARNING', LOG_NAME);
+					unlink($lockFile);
+					exit(0);
+
+				//重启
+				case 'restart':
+					debug_log("Worker#{$worker} restarting", 'WARNING', LOG_NAME);
+					unlink($lockFile);
+					if (function_exists('pcntl_exec')) { //有php_pcntl扩展
+						pcntl_exec($cfg['php_bin_path'], $argv);
+						debug_log("Worker#{$workerId} restart failed", 'ERROR', LOG_NAME);
+						exit(1);
+					} else { //无php_pcntl扩展
+						shell_exec("{$cfg['php_bin_path']} {$workerId} >/dev/null 2>&1 &");
+						exit(0);
+					}
+
+				//刷新配置
+				case 'reload':
+					load_config();
+					debug_log("Worker#{$workerId} reloaded", 'WARNING', LOG_NAME);
+					break;
+
+				//其它
+				default:
+					debug_log("Worker#{$workerId} discard unknown job {$job}", 'WARNING', LOG_NAME);
+					break;
+			}
+		} else { //异步任务
+			try {
+				$jobs->exec($job, $data);
+			} catch (Exception $e) {
+				debug_log("Worker#{$workerId} catched an Exception during job {$job} executing:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
+			} catch (Error $e) {
+				debug_log("Worker#{$workerId} catched an Error during job {$job} executing" . $e->getMessage(), 'ERROR', LOG_NAME);
+			}
+		}
+	}
+} catch (RedisException $e) {
+	debug_log("Worker#{$workerId} catched an RedisException:" . $e->getMessage(), 'EXCEPTION', LOG_NAME);
+}