123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- <?php
- /**
- * 服务管理类
- */
- class Server
- {
- /**
- * 配置数据
- */
- protected $cfg;
- /**
- * 服务器的Socket资源句柄
- * @var resource
- */
- protected $fd;
- /**
- * 进程锁文件句柄
- * @var resource
- */
- protected $lockFile;
- /**
- * 客户列表
- * @var array
- */
- protected $clients;
- /**
- * select的读文件句柄列表
- * @var array
- */
- protected $reading;
- /**
- * select的写文件句柄列表
- * @var array
- */
- protected $writing;
- /**
- * 需要退出的标志
- * @var bool
- */
- protected $quiting;
- /**
- * 需要重启的标志
- * @var bool
- */
- protected $restarting;
- /**
- * 构造函数
- * @param array $cfg 配置数据
- */
- public function __construct($cfg)
- {
- $this->cfg = $cfg;
- $this->clients = [];
- $this->reading = [];
- $this->writing = [];
- $this->quiting = false;
- $this->restarting = false;
- }
- /**
- * 析构函数
- */
- public function __destruct()
- {
- if (isset($this->fd)) {
- fclose($this->fd);
- }
- }
- /**
- * 开始服务
- */
- public function start()
- {
- //创建锁文件
- $lockFilePath = "{$this->cfg['pid_path']}/{$this->cfg['server_name']}.{$this->cfg['port']}.pid";
- $this->lockFile = run_single_instance($lockFilePath);
- //修改进程名
- cli_set_process_title("{$this->cfg['php_bin_path']} -f {$_SERVER['argv'][0]} [{$this->cfg['server_name']}]");
- //创建socket
- $this->fd = stream_socket_server("tcp://{$this->cfg['host']}:{$this->cfg['port']}", $errno, $error);
- if ($this->fd === false) {
- debug_log("stream_socket_server() error:#{$errno},{$error}", 'ERROR', LOG_NAME);
- exit(1);
- }
- $this->reading[(int)$this->fd] = $this->fd;
- debug_log("Server start at {$this->cfg['host']}:{$this->cfg['port']}", 'DEBUG', LOG_NAME);
- //设置信号回调
- pcntl_signal(SIGTERM, [$this, 'sigHandle']);
- pcntl_signal(SIGUSR1, [$this, 'sigHandle']);
- pcntl_signal(SIGPIPE, SIG_IGN);
- }
- /**
- * 接收新连接
- */
- protected function acceptNewClient()
- {
- $fd = stream_socket_accept($this->fd);
- if (!$fd) {
- debug_log("stream_socket_accept() error", 'ERROR', LOG_NAME);
- return;
- }
- $cli = new Client($fd, $this->cfg);
- stream_set_blocking($fd, 0);
- $this->clients[(int)$fd] = $cli;
- $this->reading[(int)$fd] = $fd;
- $cli->onConnect();
- }
- /**
- * 删除连接
- */
- public function deleteClient($fd)
- {
- $cli = isset($this->clients[(int)$fd]) ? $this->clients[(int)$fd] : null;
- if ($cli != null) {
- $cli->onClose();
- }
- if (isset($this->writing[(int)$fd])) {
- unset($this->writing[(int)$fd]);
- }
- if (isset($this->reading[(int)$fd])) {
- unset($this->reading[(int)$fd]);
- }
- if (isset($this->clients[(int)$fd])) {
- unset($this->clients[(int)$fd]);
- }
- fclose($fd);
- }
- /**
- * 从指定连接读数据
- * @param resource $fd 要读取的客户连接socket资源
- */
- protected function readClient($fd)
- {
- if (!isset($this->clients[(int)$fd])) {
- $this->deleteClient($fd);
- return;
- }
- $cli = $this->clients[(int)$fd];
- $data = fread($fd, $this->cfg['max_read_size']);
- if ($data === false) {
- $this->deleteClient($fd);
- debug_log("socket_read({$fd}) error, client:" . $cli->peer(), 'ERROR', LOG_NAME);
- } elseif ($data === '') {
- $this->deleteClient($fd);
- } else {
- $cli->onRead($data);
- }
- }
- /**
- * 写消息给指定的客户连接
- * @param resource $fd 要写消息的客户连接socket资源
- */
- protected function writeClient($fd)
- {
- if (!isset($this->clients[(int)$fd])) {
- $this->deleteClient($fd);
- return;
- }
- $cli = $this->clients[(int)$fd];
- $data = $cli->prepareWrite();
- if ($data === false) {
- unset($this->writing[(int)$fd]);
- } else {
- $n = fwrite($fd, $data);
- if ($n === false) {
- $this->deleteClient($fd);
- debug_log("socket_write({$fd}) error, client:" . $cli->peer(), 'ERROR', LOG_NAME);
- } else {
- echo "{$fd} write:" . $data . "\n";
- $cli->onWrite($n);
- }
- }
- }
- /**
- * 将指定的客户连接设置为需要写数据的状态
- */
- public function setClientWriting($fd)
- {
- $this->writing[(int)$fd] = $fd;
- }
- /**
- * 退出
- */
- public function quit()
- {
- debug_log("Server is quiting", 'WARNING', LOG_NAME);
- foreach ($this->clients as $cli) {
- $this->deleteClient($cli->fd());
- }
- flock($this->lockFile, LOCK_UN);
- fclose($this->lockFile);
- exit(0);
- }
- /**
- * 重启
- */
- public function restart()
- {
- debug_log("Server is restarting", 'WARNING', LOG_NAME);
- foreach ($this->clients as $cli) {
- $this->deleteClient($cli->fd());
- }
- fclose($this->fd);
- flock($this->lockFile, LOCK_UN);
- fclose($this->lockFile);
- pcntl_exec($this->cfg['php_bin_path'], $_SERVER['argv']);
- debug_log("Server restart failed", 'ERROR', LOG_NAME);
- exit(1);
- }
- /**
- * 开始服务
- */
- public function run()
- {
- while (true) {
- $rList = array_values($this->reading); //必须复制一份
- $wList = array_values($this->writing); //必须复制一份
- $eList = [];
- $n = 0;
- list($sec, $msec) = GET_TIMER()->getWaitTime();
- declare (ticks = 1) {
- $n = stream_select($rList, $wList, $eList, $sec, $msec);
- }
- if ($n === false) {
- if ($this->quiting) {
- $this->quit();
- } elseif ($this->restarting) {
- $this->restart();
- } else {
- debug_log("stream_select() error", 'ERROR', LOG_NAME);
- exit(1);
- }
- } elseif ($n > 0) {
- foreach ($rList as $fd) {
- if ($fd == $this->fd) {
- $this->acceptNewClient();
- } else {
- $this->readClient($fd);
- }
- }
- foreach ($wList as $fd) {
- $this->writeClient($fd);
- }
- } else {
-
- }
- //定时器的循环检测代码
- GET_TIMER()->interval();
- }
- }
- /**
- * 信号回调
- */
- public function sigHandle($signo)
- {
- switch ($signo)
- {
- //退出
- case SIGTERM:
- $this->quiting = true;
- break;
- //重启
- case SIGUSR1:
- $this->restarting = true;
- break;
- }
- }
- }
|