123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- <?php
- /**
- * 客户抽象类
- */
- abstract class Client_Base
- {
- /**
- * 单次Socket写入的大小
- * @const int
- */
- const WRITE_MAX_SIZE = 4096;
- /**
- * 当前的客户ID
- */
- protected static $currentId = 0;
- /**
- * 客户ID
- */
- protected $id;
- /**
- * 配置数据
- */
- protected $cfg;
- /**
- * 客户连接的socket资源
- * @var resource
- */
- protected $fd;
- /**
- * 接收消息缓冲区
- * @var string
- */
- protected $readBuffer;
- /**
- * 发送消息队列
- */
- protected $writeQueue;
- /**
- * 连接方信息
- */
- protected $peer;
- /**
- * 客户连接远程主机地址
- * @var string
- */
- protected $host;
- /**
- * 客户连接远程端口
- * @var string
- */
- protected $port;
- /**
- * 构造函数
- * @param resouce $fd 客户连接的socket资源
- * @param array $cfg 配置数据
- */
- public function __construct($fd, $cfg)
- {
- $this->id = ++self::$currentId;
- $this->fd = $fd;
- $this->cfg = $cfg;
- $this->readBuffer = '';
- $this->writeQueue = [];
- $this->peer = stream_socket_get_name($this->fd, true);
- list($this->host, $this->port) = explode(':', $this->peer);
- }
- /**
- * 返回客户连接的Socket资源
- * @return resource Socket资源
- */
- public function fd()
- {
- return $this->fd;
- }
- /**
- * 返回客户连接的远程名称
- * @return string 名称(主机地址:端口号)
- */
- public function peer()
- {
- return $this->peer;
- }
- /**
- * 连接成功后的回调
- */
- public function onConnect()
- {
- }
- /**
- * 连接断开前的回调
- */
- public function onClose()
- {
- }
- /**
- * 读取消息后的回调
- * @param string $data 读到的数据
- */
- public function onRead($data)
- {
- echo $this->id . " read:" . $data . "\n";
- $this->readBuffer .= $data;
- while (!empty($this->readBuffer)) {
- $msg = Message::parse($this->readBuffer);
- if ($msg) {
- $this->onRequest($msg);
- } else {
- break;
- }
- }
- }
- /**
- * 写数据完成后的回调
- * @param int $n 写了多少数据
- */
- public function onWrite($n)
- {
- if (empty($this->writeQueue)) {
- return false;
- }
- $this->writeQueue[0][1] += $n;
- if ($this->writeQueue[0][1] >= $this->writeQueue[0][2]) {
- $first = array_shift($this->writeQueue);
- if ($first[3] != null) {
- call_user_func($first[3]);
- }
- }
- }
- /**
- * 从当前的发送消息队列中返回头一条消息的数据
- * @return string 要发送的数据
- */
- public function prepareWrite()
- {
- if (empty($this->writeQueue)) {
- return false;
- }
- $first = $this->writeQueue[0];
- return substr($first[0], $first[1], $this->cfg['max_write_size']);
- }
- /**
- * 关闭当前连接
- */
- public function close()
- {
- GET_SERVER()->deleteClient($this->fd);
- }
- /**
- * 向客户返回一条消息
- * @param Message|string 要发送的消息或原始数据
- */
- public function response($msg, $cb=null)
- {
- $buffer = is_a($msg, 'Message') ? $msg->toBuffer() : strval($msg);
- $this->writeQueue[] = [$buffer, 0, strlen($buffer), $cb];
- GET_SERVER()->setClientWriting($this->fd);
- }
- /**
- * 处理来自客户的请求
- * @param Message $msg 消息
- */
- abstract public function onRequest($msg);
- }
|