Browse Source

添加代码

weicky 4 years ago
commit
2c01d6f759
19 changed files with 1329 additions and 0 deletions
  1. 1 0
      cli/bc.txt
  2. 149 0
      cli/chat.php
  3. 3 0
      cli/cmd.txt
  4. 1 0
      cli/talk.txt
  5. 201 0
      client.php
  6. 17 0
      config.php
  7. 183 0
      lib/client_base.php
  8. 169 0
      lib/functions.php
  9. 86 0
      lib/message.php
  10. 286 0
      lib/server.php
  11. 106 0
      lib/timerman.php
  12. 54 0
      main.php
  13. 2 0
      restart.sh
  14. 4 0
      start.sh
  15. 2 0
      stop.sh
  16. BIN
      test.jpg
  17. 3 0
      test/kill-test.sh
  18. 52 0
      test/test.php
  19. 10 0
      test/test.sh

+ 1 - 0
cli/bc.txt

@@ -0,0 +1 @@
+{"cmd":"bc","data":"%s"}\n

+ 149 - 0
cli/chat.php

@@ -0,0 +1,149 @@
+<?php
+/**
+ * TCP Client DEMO
+ */
+define('CONFIG_FILE', __DIR__ . '/../config.php');
+
+include __DIR__ . '/../lib/functions.php';
+include __DIR__ . '/../lib/message.php';
+
+if ($argc < 2) {
+	die("Usage: {$argv[0]} <name> <input file>\n");
+}
+
+if ($argc > 2) {
+	if (!file_exists($argv[2])) {
+		die("file {$argv[2]} is not exists!\n");
+	}
+	$intype = filetype($argv[2]);
+	if (!in_array($intype, ['fifo', 'file'])) {
+		die("file {$argv[2]} is not a fifo file or a regular file!\n");
+	}
+	$infile = fopen($argv[2], 'r+');
+	if (!$infile) {
+		die("open file {$argv[2]} failed!\n");
+	}
+} else {
+	$intype = 'stdin';
+	$infile = STDIN;
+}
+
+$cfg = [];
+load_config();
+
+$myid = 0;
+$users = [];
+
+//创建TCP连接
+$c = stream_socket_client("tcp://{$cfg['host']}:{$cfg['port']}", $errno, $error);
+if (!$c) {
+	die("#{$errno}: {$error}\n");
+}
+
+//发送登录请求
+$msg = new Message();
+$msg->set('cmd', 'login');
+$msg->set('name', $argv[1]);
+fwrite($c, $msg->toBuffer());
+
+//循环接收来自标准输入和服务器端的数据
+$buffer = '';
+while (1) {
+	if (feof($infile)) {
+		if ($intype == 'stdin') {
+			break;
+		} else {
+			fclose($infile);
+			$intype = 'stdin';
+			$infile = STDIN;
+		}
+	}
+	$rlist = [$infile, $c];
+	$wlist = [];
+	$elist = [];
+	if (!stream_select($rlist, $wlist, $elist, NULL)) {
+		break;
+	}
+	foreach ($rlist as $fd) {
+		if ($fd == $infile) {
+			$line = trim(fgets($infile));
+			if (empty($line)) {
+				continue;
+			}
+			$data = json_decode($line, true);
+			if ($data) {
+				$msg = new Message();
+				foreach ($data as $k => $v) {
+					$msg->set($k, $v);
+				}
+				fwrite($c, $msg->toBuffer());
+			} else {
+				echo "输入的内容不是JSON格式的:{$line}\n";
+			}
+		} elseif ($fd == $c) {
+			$readbuff = fread($c, 1024);
+			if (empty($readbuff)) { //接收消息失败
+				echo "网络断开……\n";
+				break 2;
+			} else { //成功接收到消息
+				$buffer .= $readbuff;
+				while (1) {
+					$msg = Message::parse($buffer);
+					if ($msg) {
+						onMessage($msg);
+					} else {
+						break 2;
+					}
+				}
+			}
+		}
+	}
+}
+fclose($c);
+fclose($infile);
+
+//服务端返回结果的处理
+function onMessage($msg)
+{
+	global $myid, $users;
+
+	$cmd = $msg->get('cmd');
+	$id = $msg->get('id');
+	$name = $msg->get('name');
+	//根据不同的命令进行对应的处理
+	switch ($cmd) {
+		//登录成功
+		case 'login':
+			$myid = $id;
+			$users = $msg->get('data');
+			echo "当前用户列表:\n";
+			foreach ($users as $id => $name) {
+				echo "\t#{$id} => {$name}\n";
+			}
+			break;
+		//用户上线
+		case 'enter':
+			echo "[#{$id} {$name}]上线了\n";
+			$users[$id] = $name;
+			break;
+		//用户下线
+		case 'leave':
+			echo "[#{$id} {$name}]已下线\n";
+			break;
+		//广播
+		case 'bc':
+			$data = $msg->get('data');
+			echo "[#{$id} {$name}]广播:{$data}\n";
+			break;
+		//私信
+		case 'talk':
+			$to = $msg->get('to');
+			$data = $msg->get('data');
+			if ($to == $myid) {
+				echo "#{$id} {$name}对您说:{$data}\n";
+			} else {
+				echo "您对 #{$to} {$users[$to]} 说:{$data}\n";
+			}
+			break;
+	}
+}

+ 3 - 0
cli/cmd.txt

@@ -0,0 +1,3 @@
+{"cmd":"bc","data":"测试"}
+{"cmd":"bc","data":"测试"}
+{"cmd":"bc","data":"测试"}

+ 1 - 0
cli/talk.txt

@@ -0,0 +1 @@
+{"cmd":"talk","to":%d,"data":"%s"}\n

+ 201 - 0
client.php

@@ -0,0 +1,201 @@
+<?php
+/**
+ * Demo的客户连接类
+ */
+class Client extends Client_Base
+{
+	/**
+	 * 是否登录
+	 */
+	protected $logined = false;
+
+	/**
+	 * 定时器的ID
+	 */
+	protected $timerId;
+
+	/**
+	 * 连接后的回调
+	 */
+	public function onConnect()
+	{
+		//为了方便测试,注释掉验证登录的逻辑
+		//$this->timerId = GET_TIMER()->add(3000, [$this, 'checkLogin']);
+	}
+
+	/**
+	 * 连接断开前的回调
+	 */
+	public function onClose()
+	{
+		if ($this->logined) {
+			unset($GLOBALS['users'][$this->id]);
+			//广播离线通知给其他客户端
+			$msg = new Message();
+			$msg->set('cmd', 'leave');
+			$msg->set('id', $this->id);
+			$msg->set('name', $this->name);
+			BC($msg);
+		}
+	}
+
+	/**
+	 * 检查登录状态
+	 */
+	public function checkLogin()
+	{
+		if (!$this->logined)
+		{
+			$msg = new Message();
+			$msg->set('cmd', 'logout');
+			$msg->set('data', 'byebye!');
+			$this->response($msg, [$this, 'close']);
+		}
+	}
+
+	/**
+	 * 实现的分发请求消息的接口
+	 */
+	public function onRequest($msg)
+	{
+		$cmd = $msg->get('cmd');
+		if (empty($cmd)) {
+			return;
+		}
+		switch ($cmd) {
+			case 'time': //请求服务器时间
+				$this->onTime();
+				break;
+			case 'test':
+				$this->onTest();
+				break;
+			case 'dl': //下载图片
+				$this->onDownload();
+				break;
+			case 'login': //登录
+				$this->onLogin($msg);
+				break;
+			case 'bc': //广播
+				$this->onBC($msg);
+				break;
+			case 'talk': //私聊
+				$this->onTalk($msg);
+				break;
+		}
+	}
+
+	/**
+	 * 返回服务器时间
+	 */
+	protected function onTime()
+	{
+		$msg = new Message();
+		$msg->set('cmd', 'time');
+		$msg->set('data', time());
+		$this->response($msg);
+	}
+
+	/**
+	 * 测试接口
+	 */
+	protected function onTest()
+	{
+		$msg = new Message();
+		$msg->set('cmd', 'time');
+		$msg->set('data', 'Hello!');
+		$this->response($msg);
+	}
+
+	protected $cache = '';
+
+	/**
+	 * 测试接口
+	 */
+	protected function onDownload()
+	{
+		if (empty($this->cache)) {
+			$this->cache = base64_encode(file_get_contents(__DIR__ . '/test.jpg'));
+		}
+		$msg = new Message();
+		$msg->set('cmd', 'download');
+		$msg->set('data', $this->cache);
+		$this->response($msg);
+	}
+
+	/**
+	 * 当前客户的名称
+	 * @var string
+	 */
+	protected $name;
+
+	/**
+	 * 返回当前客户的名称
+	 * @return string 客户的名称
+	 */
+	public function getName()
+	{
+		return $this->name;
+	}
+
+	/**
+	 * 客户端登录处理
+	 * @param Message $msg 请求消息
+	 */
+	protected function onLogin($msg)
+	{
+		$name = $msg->get('name');
+		if (empty($name)) {
+			return;
+		}
+		$this->name = $name;
+		$this->logined = true;
+		if ($this->timerId) {
+			GET_TIMER()->clear($this->timerId);
+			$this->timerId = 0;
+		}
+		$GLOBALS['users'][$this->id] = $this;
+		//---- login ----
+		$data = [];
+		foreach ($GLOBALS['users'] as $id => $cli) {
+			$data[$id] = $cli->getName();
+		}
+		$msg = new Message();
+		$msg->set('cmd', 'login');
+		$msg->set('id', $this->id);
+		$msg->set('data', $data);
+		$this->response($msg);
+		//---- enter ----
+		$bcMsg = new Message();
+		$bcMsg->set('cmd', 'enter');
+		$bcMsg->set('id', $this->id);
+		$bcMsg->set('name', $name);
+		BC($bcMsg);
+	}
+
+	/**
+	 * 客户端广播处理
+	 * @param Message $msg 请求消息
+	 */
+	protected function onBC($msg)
+	{
+		$msg->set('id', $this->id);
+		$msg->set('name', $this->name);
+		BC($msg);
+	}
+
+	/**
+	 * 客户端私聊处理
+	 * @param Message $msg 请求消息
+	 */
+	protected function onTalk($msg)
+	{
+		$to = $msg->get('to');
+		if (isset($GLOBALS['users'][$to])) {
+			$msg->set('id', $this->id);
+			$msg->set('name', $this->name);
+			$msg->set('to_name', $GLOBALS['users'][$to]->getName());
+			$this->response($msg);
+			$GLOBALS['users'][$to]->response($msg);
+		}
+	}
+}

+ 17 - 0
config.php

@@ -0,0 +1,17 @@
+<?php
+return [
+	'server_name' => 'PHP_DEMO_TCP_SERVER',
+	'pid_path' => '/tmp',
+	'host' => '0.0.0.0',
+	'port' => 8890,
+	'backlog' => 5,
+	'timeout_s' => 0,
+	'timeout_us' => 10000,
+	'max_read_size' => 4096,
+	'max_write_size' => 4096,
+	'log_path' => __DIR__, //错误日志路径
+	'log_max_size' => 1024*1024*10, //错误日志最大占用空间
+	'log_types' => ['DEBUG', 'WARNING', 'ERROR', 'EXCEPTION'], //错误日志级别列表
+	'php_bin_path' => '/usr/bin/php', //php可执行程序路径
+	'sysvmsg_key' => 0x2001,
+];

+ 183 - 0
lib/client_base.php

@@ -0,0 +1,183 @@
+<?php
+/**
+ * 客户抽象类
+ */
+abstract class Client_Base
+{
+	/**
+	 * 单次Socket写入的大小
+	 * @const int
+	 */
+	const WRITE_MAX_SIZE = 4096;
+
+	/**
+	 * 当前的客户ID
+	 */
+	protected static $currentId = 0;
+
+	/**
+	 * 客户ID
+	 */
+	protected $id;
+
+	/**
+	 * 配置数据
+	 */
+	protected $cfg;
+
+	/**
+	 * 客户连接的socket资源
+	 * @var resource
+	 */
+	protected $fd;
+
+	/**
+	 * 接收消息缓冲区
+	 * @var string
+	 */
+	protected $readBuffer;
+
+	/**
+	 * 发送消息队列
+	 */
+	protected $writeQueue;
+
+	/**
+	 * 连接方信息
+	 */
+	protected $peer;
+
+	/**
+	 * 客户连接远程主机地址
+	 * @var string
+	 */
+	protected $host;
+
+	/**
+	 * 客户连接远程端口
+	 * @var string
+	 */
+	protected $port;
+
+	/**
+	 * 构造函数
+	 * @param resouce $fd 客户连接的socket资源
+	 * @param array $cfg 配置数据
+	 */
+	public function __construct($fd, $cfg)
+	{
+		$this->id = ++self::$currentId;
+		$this->fd = $fd;
+		$this->cfg = $cfg;
+		$this->readBuffer = '';
+		$this->writeQueue = [];
+		$this->peer = stream_socket_get_name($this->fd, true);
+		list($this->host, $this->port) = explode(':', $this->peer);
+	}
+
+	/**
+	 * 返回客户连接的Socket资源
+	 * @return resource Socket资源
+	 */
+	public function fd()
+	{
+		return $this->fd;
+	}
+
+	/**
+	 * 返回客户连接的远程名称
+	 * @return string 名称(主机地址:端口号)
+	 */
+	public function peer()
+	{
+		return $this->peer;
+	}
+
+	/**
+	 * 连接成功后的回调
+	 */
+	public function onConnect()
+	{
+	}
+
+	/**
+	 * 连接断开前的回调
+	 */
+	public function onClose()
+	{
+	}
+
+	/**
+	 * 读取消息后的回调
+	 * @param string $data 读到的数据
+	 */
+	public function onRead($data)
+	{
+		echo $this->id . " read:" . $data . "\n";
+		$this->readBuffer .= $data;
+		while (!empty($this->readBuffer)) {
+			$msg = Message::parse($this->readBuffer);
+			if ($msg) {
+				$this->onRequest($msg);
+			} else {
+				break;
+			}
+		}
+	}
+
+	/**
+	 * 写数据完成后的回调
+	 * @param int $n 写了多少数据
+	 */
+	public function onWrite($n)
+	{
+		if (empty($this->writeQueue)) {
+			return false;
+		}
+		$this->writeQueue[0][1] += $n;
+		if ($this->writeQueue[0][1] >= $this->writeQueue[0][2]) {
+			$first = array_shift($this->writeQueue);
+			if ($first[3] != null) {
+				call_user_func($first[3]);
+			}
+		}
+	}
+
+	/**
+	 * 从当前的发送消息队列中返回头一条消息的数据
+	 * @return string 要发送的数据
+	 */
+	public function prepareWrite()
+	{
+		if (empty($this->writeQueue)) {
+			return false;
+		}
+		$first = $this->writeQueue[0];
+		return substr($first[0], $first[1], $this->cfg['max_write_size']);
+	}
+
+	/**
+	 * 关闭当前连接
+	 */
+	public function close()
+	{
+		GET_SERVER()->deleteClient($this->fd);
+	}
+
+	/**
+	 * 向客户返回一条消息
+	 * @param Message|string 要发送的消息或原始数据
+	 */
+	public function response($msg, $cb=null)
+	{
+		$buffer = is_a($msg, 'Message') ? $msg->toBuffer() : strval($msg);
+		$this->writeQueue[] = [$buffer, 0, strlen($buffer), $cb];
+		GET_SERVER()->setClientWriting($this->fd);
+	}
+
+	/**
+	 * 处理来自客户的请求
+	 * @param Message $msg 消息
+	 */
+	abstract public function onRequest($msg);
+}

+ 169 - 0
lib/functions.php

@@ -0,0 +1,169 @@
+<?php
+/**
+ * 加载配置
+ */
+function load_config()
+{
+	$GLOBALS['cfg'] = include(CONFIG_FILE);
+}
+
+/**
+ * 写入调试日志
+ * @param string $msg 日志消息
+ * @param string $type 日志级别
+ * @param string $file 文件名
+ * @return bool 操作状态
+ */
+function debug_log($msg, $type = 'DEBUG', $file = 'debug')
+{
+	global $cfg;
+	$type = strtoupper($type);
+	if (!in_array($type, $cfg['log_types'])) {
+		return false;
+	}
+	$msg = is_scalar($msg) ? strval($msg) : json_encode($msg, JSON_UNESCAPED_UNICODE);
+	$file = $cfg['log_path'] . '/' . $file . '.log';
+	clearstatcache(true, $file);
+	$flags = (file_exists($file) && filesize($file) >= $cfg['log_max_size']) ? 0 : FILE_APPEND;
+	return file_put_contents($file, date("[Y-m-d H:i:s] ") . $type . ': ' . $msg . "\n", $flags);
+}
+
+/**
+ * 确保只有一个实例在运行
+ * @param int $lock 锁文件路径
+ * @return resource 锁文件句柄
+ */
+function run_single_instance($lock)
+{
+	//打开文件(不存在创建,存在以写入方式打开,指针位于文件头,)
+	$fh = fopen($lock, 'c');
+	if (!$fh) {
+		exit(1);
+	}
+	//尝试上锁
+	if (!flock($fh, LOCK_EX | LOCK_NB)) {
+		exit(0);
+	}
+	//截断文件
+	ftruncate($fh, 0);
+	//写入进程ID
+	fwrite($fh, getmypid());
+	fflush($fh);
+	//返回
+	return $fh;
+}
+
+/**
+ * 使用cURL发送POST请求
+ * @param string $url 请求地址
+ * @param array $post POST数据数组
+ * @param array $options HTTP选项数组
+ * @param string $error 用于返回错误信息
+ * @param int $errno 用于返回错误码
+ * @param string $httpCode 用于返回响应的HTTP状态码
+ * @param array $outHeaders 用于获得返回的HTTP头
+ * @return mixed 成功返回请求返回结果,失败返回flase
+ */
+function curl_post($url, $post = [], $options = [], &$error = false, &$errno = false, &$httpCode = false, $isupload = false, &$outHeaders = false)
+{
+    $heads = array();
+    $defaults = [
+        CURLOPT_POST			=> 1,
+        CURLOPT_URL				=> $url,
+        CURLOPT_RETURNTRANSFER	=> 1,
+        CURLOPT_CONNECTTIMEOUT	=> 5,
+        CURLOPT_TIMEOUT			=> 10,
+        CURLOPT_POSTFIELDS		=> ($isupload || is_string($post)) ? $post : http_build_query($post),
+    ];
+    if ($isupload) {
+        $defaults[CURLOPT_SAFE_UPLOAD] = true;
+    }
+    if (is_array($outHeaders)) {
+        $defaults[CURLOPT_HEADERFUNCTION] = function ($c, $v) use(&$outHeaders) {
+            if(!empty(rtrim($v))) {
+                list($field, $value) = explode(':', $v, 2);
+                if ($field && $value) {
+                    $outHeaders[strtolower($field)] = trim($value);
+                }
+            }
+            return strlen($v);
+        };
+    }
+    $ch = curl_init();
+    $result = '';
+    if ($ch) {
+        foreach ($options as $k=>$v) {
+            $defaults[$k] = $v;
+        }
+        curl_setopt_array($ch, $defaults);
+        $result = curl_exec($ch);
+        if ($result === false) {
+            if ($error !== false) {
+                $error = curl_error($ch);
+            }
+            if ($errno !== false) {
+                $errno = curl_errno($ch);
+            }
+        }
+        if ($httpCode !== false) {
+            $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
+        }
+        curl_close($ch);
+    }
+    return $result;
+}
+
+/**
+ * 使用cURL发送GET请求
+ * @param string $url 请求地址
+ * @param array $post GET数据数组
+ * @param array $options HTTP选项数组
+ * @param string $error 用于返回错误信息
+ * @param int $errno 用于返回错误码
+ * @param string $httpCode 用于返回响应的HTTP状态码
+ * @param array $outHeaders 用于获得返回的HTTP头
+ * @return mixed 成功返回请求返回结果,失败返回flase
+ */
+function curl_get($url, $get = [], $options = [], &$error = false, &$errno = false, &$httpCode = false, &$outHeaders = false)
+{
+    $defaults = [
+        CURLOPT_URL				=> $url. (strpos($url, '?') === FALSE ? '?' : '&'). http_build_query($get),
+        CURLOPT_HEADER			=> 0,
+        CURLOPT_RETURNTRANSFER	=> TRUE,
+        CURLOPT_CONNECTTIMEOUT	=> 5,
+        CURLOPT_TIMEOUT			=> 10,
+    ];
+    if (is_array($outHeaders)) {
+        $defaults[CURLOPT_HEADERFUNCTION] = function ($c, $v) use(&$outHeaders) {
+            if (!empty(rtrim($v))) {
+                list($field, $value) = explode(':', $v, 2);
+                if ($field && $value) {
+                    $outHeaders[strtolower($field)] = trim($value);
+                }
+            }
+            return strlen($v);
+        };
+    }
+    $ch = curl_init();
+    $result = '';
+    if ($ch) {
+        foreach ($options as $k=>$v) {
+            $defaults[$k] = $v;
+        }
+        curl_setopt_array($ch, $defaults);
+        $result = curl_exec($ch);
+        if ($result === false) {
+            if ($error !== false) {
+                $error = curl_error($ch);
+            }
+            if ($errno !== false) {
+                $errno = curl_errno($ch);
+            }
+        }
+        if ($httpCode !== false) {
+            $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
+        }
+        curl_close($ch);
+    }
+    return $result;
+}

+ 86 - 0
lib/message.php

@@ -0,0 +1,86 @@
+<?php
+/**
+ * 消息类
+ */
+class Message
+{
+	/**
+	 * 消息最大长度
+	 */
+	const MAX_SIZE = 1048576;
+
+	/**
+	 * 保存的数据
+	 */
+	protected $value;
+
+	/**
+	 * 构造函数
+	 */
+	public function __construct()
+	{
+		$this->value = [];
+	}
+
+	/**
+	 * 从原始数据尝试创建消息
+	 * @param string $buffer 待解析的数据,成功后会修改该数据的内容
+	 * @return Message|false 成功返回消息对象,失败返回false
+	 */
+	public static function parse(&$buffer)
+	{
+		$len = strlen($buffer);
+		if ($len < 4) {
+			return false;
+		}
+		$arr = unpack('N', substr($buffer, 0, 4));
+		$datalen = $arr[1];
+		if ($datalen > self::MAX_SIZE) {
+			return false;
+		}
+		if ($len < $datalen + 4) {
+			return false;
+		}
+		$body = substr($buffer, 4, $datalen);
+		$buffer = substr($buffer, 4 + $datalen);
+		$msg = new self();
+		$msg->set(json_decode($body, true));
+		return $msg;
+	}
+
+	/**
+	 * 取数据
+	 * @param string $key 要取的字段名
+	 * @return mixed 字段的值
+	 */
+	public function get($key = '')
+	{
+		return empty($key) ? $this->value : (is_array($this->value) && isset($this->value[$key]) ? $this->value[$key] : null);
+	}
+
+	/**
+	 * 设置数据
+	 * @param string|mixed $key 要设置的字段名,或完整的数据数组
+	 * @param mixed $value 要设置的值
+	 */
+	public function set($key, $value = null)
+	{
+		if ($value === null) {
+			$this->value = $key;
+		} elseif (is_array($this->value)) {
+			$this->value[$key] = $value;
+		} else {
+			$this->value = [$key => $value];
+		}
+	}
+
+	/**
+	 * 将保存的数据转成原始数据
+	 * @return string 数据
+	 */
+	public function toBuffer()
+	{
+		$buffer = empty($this->value) ? '' : json_encode($this->value);
+		return pack('N', strlen($buffer)) . $buffer;
+	}
+}

+ 286 - 0
lib/server.php

@@ -0,0 +1,286 @@
+<?php
+/**
+ * 服务管理类
+ */
+class Server
+{
+	/**
+	 * 配置数据
+	 */
+	protected $cfg;
+
+	/**
+	 * 服务器的Socket资源句柄
+	 * @var resource
+	 */
+	protected $fd;
+
+	/**
+	 * 进程锁文件句柄
+	 * @var resource
+	 */
+	protected $lockFile;
+
+	/**
+	 * 客户列表
+	 * @var array
+	 */
+	protected $clients;
+
+	/**
+	 * select的读文件句柄列表
+	 * @var array
+	 */
+	protected $reading;
+
+	/**
+	 * select的写文件句柄列表
+	 * @var array
+	 */
+	protected $writing;
+
+	/**
+	 * 需要退出的标志
+	 * @var bool
+	 */
+	protected $quiting;
+
+	/**
+	 * 需要重启的标志
+	 * @var bool
+	 */
+	protected $restarting;
+
+	/**
+	 * 构造函数
+	 * @param array $cfg 配置数据
+	 */
+	public function __construct($cfg)
+	{
+		$this->cfg = $cfg;
+		$this->clients = [];
+		$this->reading = [];
+		$this->writing = [];
+		$this->quiting = false;
+		$this->restarting = false;
+	}
+
+	/**
+	 * 析构函数
+	 */
+	public function __destruct()
+	{
+		if (isset($this->fd)) {
+			fclose($this->fd);
+		}
+	}
+
+	/**
+	 * 开始服务
+	 */
+	public function start()
+	{
+		//创建锁文件
+		$lockFilePath = "{$this->cfg['pid_path']}/{$this->cfg['server_name']}.{$this->cfg['port']}.pid";
+		$this->lockFile = run_single_instance($lockFilePath);
+		//修改进程名
+		cli_set_process_title("{$this->cfg['php_bin_path']} -f {$_SERVER['argv'][0]} [{$this->cfg['server_name']}]");
+		//创建socket
+		$this->fd = stream_socket_server("tcp://{$this->cfg['host']}:{$this->cfg['port']}", $errno, $error);
+		if ($this->fd === false) {
+			debug_log("stream_socket_server() error:#{$errno},{$error}", 'ERROR', LOG_NAME);
+			exit(1);
+		}
+		$this->reading[(int)$this->fd] = $this->fd;
+		debug_log("Server start at {$this->cfg['host']}:{$this->cfg['port']}", 'DEBUG', LOG_NAME);
+		//设置信号回调
+		pcntl_signal(SIGTERM, [$this, 'sigHandle']);
+		pcntl_signal(SIGUSR1, [$this, 'sigHandle']);
+		pcntl_signal(SIGPIPE, SIG_IGN);
+	}
+
+	/**
+	 * 接收新连接
+	 */
+	protected function acceptNewClient()
+	{
+		$fd = stream_socket_accept($this->fd);
+		if (!$fd) {
+			debug_log("stream_socket_accept() error", 'ERROR', LOG_NAME);
+			return;
+		}
+		$cli = new Client($fd, $this->cfg);
+		stream_set_blocking($fd, 0);
+		$this->clients[(int)$fd] = $cli;
+		$this->reading[(int)$fd] = $fd;
+		$cli->onConnect();
+	}
+
+	/**
+	 * 删除连接
+	 */
+	public function deleteClient($fd)
+	{
+		$cli = isset($this->clients[(int)$fd]) ? $this->clients[(int)$fd] : null;
+		if ($cli != null) {
+			$cli->onClose();
+		}
+		if (isset($this->writing[(int)$fd])) {
+			unset($this->writing[(int)$fd]);
+		}
+		if (isset($this->reading[(int)$fd])) {
+			unset($this->reading[(int)$fd]);
+		}
+		if (isset($this->clients[(int)$fd])) {
+			unset($this->clients[(int)$fd]);
+		}
+		fclose($fd);
+	}
+
+	/**
+	 * 从指定连接读数据
+	 * @param resource $fd 要读取的客户连接socket资源
+	 */
+	protected function readClient($fd)
+	{
+		if (!isset($this->clients[(int)$fd])) {
+			$this->deleteClient($fd);
+			return;
+		}
+		$cli = $this->clients[(int)$fd];
+		$data = fread($fd, $this->cfg['max_read_size']);
+		if ($data === false) {
+			$this->deleteClient($fd);
+			debug_log("socket_read({$fd}) error, client:" . $cli->peer(), 'ERROR', LOG_NAME);
+		} elseif ($data === '') {
+			$this->deleteClient($fd);
+		} else {
+			$cli->onRead($data);
+		}
+	}
+
+	/**
+	 * 写消息给指定的客户连接
+	 * @param resource $fd 要写消息的客户连接socket资源
+	 */
+	protected function writeClient($fd)
+	{
+		if (!isset($this->clients[(int)$fd])) {
+			$this->deleteClient($fd);
+			return;
+		}
+		$cli = $this->clients[(int)$fd];
+		$data = $cli->prepareWrite();
+		if ($data === false) {
+			unset($this->writing[(int)$fd]);
+		} else {
+			$n = fwrite($fd, $data);
+			if ($n === false) {
+				$this->deleteClient($fd);
+				debug_log("socket_write({$fd}) error, client:" . $cli->peer(), 'ERROR', LOG_NAME);
+			} else {
+				echo "{$fd} write:" . $data . "\n";
+				$cli->onWrite($n);
+			}
+		}
+	}
+
+	/**
+	 * 将指定的客户连接设置为需要写数据的状态
+	 */
+	public function setClientWriting($fd)
+	{
+		$this->writing[(int)$fd] = $fd;
+	}
+
+	/**
+	 * 退出
+	 */
+	public function quit()
+	{
+		debug_log("Server is quiting", 'WARNING', LOG_NAME);
+		foreach ($this->clients as $cli) {
+			$this->deleteClient($cli->fd());
+		}
+		flock($this->lockFile, LOCK_UN);
+		fclose($this->lockFile);
+		exit(0);
+	}
+
+	/**
+	 * 重启
+	 */
+	public function restart()
+	{
+		debug_log("Server is restarting", 'WARNING', LOG_NAME);
+		foreach ($this->clients as $cli) {
+			$this->deleteClient($cli->fd());
+		}
+		fclose($this->fd);
+		flock($this->lockFile, LOCK_UN);
+		fclose($this->lockFile);
+		pcntl_exec($this->cfg['php_bin_path'], $_SERVER['argv']);
+		debug_log("Server restart failed", 'ERROR', LOG_NAME);
+		exit(1);
+	}
+
+	/**
+	 * 开始服务
+	 */
+	public function run()
+	{
+		while (true) {
+			$rList = array_values($this->reading); //必须复制一份
+			$wList = array_values($this->writing); //必须复制一份
+			$eList = [];
+			$n = 0;
+			list($sec, $msec) = GET_TIMER()->getWaitTime();
+			declare (ticks = 1) {
+				$n = stream_select($rList, $wList, $eList, $sec, $msec);
+			}
+			if ($n === false) {
+				if ($this->quiting) {
+					$this->quit();
+				} elseif ($this->restarting) {
+					$this->restart();
+				} else {
+					debug_log("stream_select() error", 'ERROR', LOG_NAME);
+					exit(1);
+				}
+			} elseif ($n > 0) {
+				foreach ($rList as $fd) {
+					if ($fd == $this->fd) {
+						$this->acceptNewClient();
+					} else {
+						$this->readClient($fd);
+					}
+				}
+				foreach ($wList as $fd) {
+					$this->writeClient($fd);
+				}
+			} else {
+				
+			}
+			//定时器的循环检测代码
+			GET_TIMER()->interval();
+		}
+	}
+
+	/**
+	 * 信号回调
+	 */
+	public function sigHandle($signo)
+	{
+		switch ($signo)
+		{
+			//退出
+			case SIGTERM:
+				$this->quiting = true;
+				break;
+			//重启
+			case SIGUSR1:
+				$this->restarting = true;
+				break;
+		}
+	}
+}

+ 106 - 0
lib/timerman.php

@@ -0,0 +1,106 @@
+<?php
+/**
+ * 定时器管理类
+ */
+class TimerMan
+{
+	/**
+	 * 定时器列表
+	 * @var array
+	 */
+	protected $timers;
+
+	/**
+	 * ID值
+	 * @int
+	 */
+	protected $id;
+
+	/**
+	 * 定时器到期时间
+	 */
+	protected $nextTimeout;
+
+	/**
+	 * 构造函数
+	 */
+	public function __construct()
+	{
+		$this->timers = [];
+		$this->nextTimeout = 0;
+	}
+
+	/**
+	 * 添加定时器
+	 * @param int $ms 毫秒数
+	 * @param mixed $handle 回调函数
+	 * @param mixed $args 回调函数参数
+	 * @return int 定时器ID
+	 */
+	public function add($ms, $handle, $args = [])
+	{
+		$now = floor(microtime(true) * 1000);
+		$to = $now + $ms;
+		if (!$this->nextTimeout || $to < $this->nextTimeout) {
+			$this->nextTimeout = $to;
+		}
+		$this->timers[++$this->id] = [
+			$to,
+			$handle,
+			is_array($args) ? $args : [$args],
+		];
+		return $this->id;
+	}
+
+	/**
+	 * 清除定时器
+	 * @param int $id 定时器ID
+	 */
+	public function clear($id)
+	{
+		if (isset($this->timers[$id])) {
+			unset($this->timers[$id]);
+		}
+	}
+
+	/**
+	 * 取select应当等待的时长
+	 */
+	public function getWaitTime()
+	{
+		if (!$this->nextTimeout) {
+			return [NULL, NULL];
+		}
+		$now = floor(microtime(true) * 1000);
+		if ($now >= $this->nextTimeout) {
+			return [NULL, NULL];
+		}
+		$ms = $this->nextTimeout - $now;
+		if ($ms < 10) {
+			return [0, 10];
+		}
+		return [floor($ms/1000), $ms%1000];
+	}
+
+	/**
+	 * 定时器的循环检测代码
+	 */
+	public function interval()
+	{
+		$cnt = 0;
+		$now = floor(microtime(true) * 1000) + 10; //误差10
+		$timeouted = [];
+		$min = 0;
+		foreach ($this->timers as $id => $item) {
+			if ($item[0] <= $now) {
+				$timeouted[$id] = $item;
+				unset($this->timers[$id]);
+			}
+			$min = $min == 0 ? $item[0] : min($min, $item[0]);
+		}
+		foreach ($timeouted as $id => $item) {
+			call_user_func_array($item[1], $item[2]);
+		}
+		$this->nextTimeout = $min;
+	}
+}

+ 54 - 0
main.php

@@ -0,0 +1,54 @@
+<?php
+define('CONFIG_FILE', __DIR__ . '/config.php');
+define('LOG_NAME', 'tcpserver');
+
+include __DIR__ . '/lib/functions.php';
+include __DIR__ . '/lib/message.php';
+include __DIR__ . '/lib/server.php';
+include __DIR__ . '/lib/client_base.php';
+include __DIR__ . '/lib/timerman.php';
+include __DIR__ . '/client.php';
+
+//配置
+$cfg = [];
+load_config();
+
+//登录后的客户端列表
+$users = [];
+
+//定时管理器
+$timerman = new TimerMan();
+
+//Server对象
+$srv = new Server($cfg);
+$srv->start();
+$srv->run();
+
+/**
+ * 取全局Server对象的函数(避免在Client对象中保存Server的对象引用,从而导致循环指向的问题)
+ * @return Server
+ */
+function GET_SERVER()
+{
+	return $GLOBALS['srv'];
+}
+
+/**
+ * 取全局定时器管理对象
+ * @return TimerMan
+ */
+function GET_TIMER()
+{
+	return $GLOBALS['timerman'];
+}
+
+/**
+ * 广播消息
+ * @param Message $msg
+ */
+function BC($msg)
+{
+	foreach ($GLOBALS['users'] as $cli) {
+		$cli->response($msg);
+	}
+}

+ 2 - 0
restart.sh

@@ -0,0 +1,2 @@
+#!/bin/bash
+pkill -USR1 -f 'PHP_DEMO_TCP_SERVER'

+ 4 - 0
start.sh

@@ -0,0 +1,4 @@
+#!/bin/bash
+
+cd $(dirname $0)
+php -f main.php > /dev/null 2>&1 &

+ 2 - 0
stop.sh

@@ -0,0 +1,2 @@
+#!/bin/bash
+pkill -f 'PHP_DEMO_TCP_SERVER'

BIN
test.jpg


+ 3 - 0
test/kill-test.sh

@@ -0,0 +1,3 @@
+#!/bin/bash
+ps -eF | grep test | grep php | grep -v grep | awk '{print $2}' | xargs kill
+rm *.txt

+ 52 - 0
test/test.php

@@ -0,0 +1,52 @@
+<?php
+/**
+ * TCP Client DEMO
+ */
+if ($argc < 4) {
+	die("Usage: {$argv[0]} SERVER_IP SERVER_PORT CMD COUNT DELAY_MS\n");
+}
+
+include __DIR__ . '/../lib/message.php';
+
+$cnt = $argc > 4 ? intval($argv[4]) : 1;
+$delay = ($argc > 5 ? intval($argv[5]) : 100) * 1000;
+$service = "tcp://{$argv[1]}:{$argv[2]}";
+$s = stream_socket_client($service, $errno, $error);
+if (!$s) {
+	die("stream_socket_client({$service}) error:#{$errno},{$error}");
+}
+
+for ($i = 0; $i < $cnt; $i++) {
+	$req = new Message();
+	$req->set('cmd', $argv[3]);
+	if (!fwrite($s, $req->toBuffer())) {
+		echo "fwrite() error!\n";
+		break;
+	}
+	$data = fread($s, 4);
+	if ($data) {
+		if (strlen($data) != 4) {
+			die("fread() return " . bin2hex($data) . "\n");
+		}
+		$arr = unpack('N', $data);
+		$len = $arr[1];
+		$data = '';
+		$rest = $len;
+		do {
+			$ndata = fread($s, $rest);
+			if (!$ndata) {
+				break;
+			}
+			$data .= $ndata;
+			$rest -= strlen($ndata);
+		} while ($rest > 0);
+		if ($len > 1024) {
+			echo "[{$len}] --> " . substr($data, 0, 1024) . " ...\n";
+		} else {
+			echo "[{$len}] --> {$data}\n";
+		}
+	}
+	usleep($delay);
+}
+
+fclose($s);

+ 10 - 0
test/test.sh

@@ -0,0 +1,10 @@
+#!/bin/bash
+
+I=0
+N=100
+
+while [ "$I" -lt "$N" ]
+do
+	php -f ./test.php 127.0.0.1 8890 'time' 10000 10 > ./$I.txt 2>/dev/null &
+	((I++))
+done