client_base.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. <?php
  2. /**
  3. * 客户抽象类
  4. */
  5. abstract class Client_Base
  6. {
  7. /**
  8. * 单次Socket写入的大小
  9. * @const int
  10. */
  11. const WRITE_MAX_SIZE = 4096;
  12. /**
  13. * 当前的客户ID
  14. */
  15. protected static $currentId = 0;
  16. /**
  17. * 客户ID
  18. */
  19. protected $id;
  20. /**
  21. * 配置数据
  22. */
  23. protected $cfg;
  24. /**
  25. * 客户连接的socket资源
  26. * @var resource
  27. */
  28. protected $fd;
  29. /**
  30. * 接收消息缓冲区
  31. * @var string
  32. */
  33. protected $readBuffer;
  34. /**
  35. * 发送消息队列
  36. */
  37. protected $writeQueue;
  38. /**
  39. * 连接方信息
  40. */
  41. protected $peer;
  42. /**
  43. * 客户连接远程主机地址
  44. * @var string
  45. */
  46. protected $host;
  47. /**
  48. * 客户连接远程端口
  49. * @var string
  50. */
  51. protected $port;
  52. /**
  53. * 构造函数
  54. * @param resouce $fd 客户连接的socket资源
  55. * @param array $cfg 配置数据
  56. */
  57. public function __construct($fd, $cfg)
  58. {
  59. $this->id = ++self::$currentId;
  60. $this->fd = $fd;
  61. $this->cfg = $cfg;
  62. $this->readBuffer = '';
  63. $this->writeQueue = [];
  64. $this->peer = stream_socket_get_name($this->fd, true);
  65. list($this->host, $this->port) = explode(':', $this->peer);
  66. }
  67. /**
  68. * 返回客户连接的Socket资源
  69. * @return resource Socket资源
  70. */
  71. public function fd()
  72. {
  73. return $this->fd;
  74. }
  75. /**
  76. * 返回客户连接的远程名称
  77. * @return string 名称(主机地址:端口号)
  78. */
  79. public function peer()
  80. {
  81. return $this->peer;
  82. }
  83. /**
  84. * 连接成功后的回调
  85. */
  86. public function onConnect()
  87. {
  88. }
  89. /**
  90. * 连接断开前的回调
  91. */
  92. public function onClose()
  93. {
  94. }
  95. /**
  96. * 读取消息后的回调
  97. * @param string $data 读到的数据
  98. */
  99. public function onRead($data)
  100. {
  101. echo $this->id . " read:" . $data . "\n";
  102. $this->readBuffer .= $data;
  103. while (!empty($this->readBuffer)) {
  104. $msg = Message::parse($this->readBuffer);
  105. if ($msg) {
  106. $this->onRequest($msg);
  107. } else {
  108. break;
  109. }
  110. }
  111. }
  112. /**
  113. * 写数据完成后的回调
  114. * @param int $n 写了多少数据
  115. */
  116. public function onWrite($n)
  117. {
  118. if (empty($this->writeQueue)) {
  119. return false;
  120. }
  121. $this->writeQueue[0][1] += $n;
  122. if ($this->writeQueue[0][1] >= $this->writeQueue[0][2]) {
  123. $first = array_shift($this->writeQueue);
  124. if ($first[3] != null) {
  125. call_user_func($first[3]);
  126. }
  127. }
  128. }
  129. /**
  130. * 从当前的发送消息队列中返回头一条消息的数据
  131. * @return string 要发送的数据
  132. */
  133. public function prepareWrite()
  134. {
  135. if (empty($this->writeQueue)) {
  136. return false;
  137. }
  138. $first = $this->writeQueue[0];
  139. return substr($first[0], $first[1], $this->cfg['max_write_size']);
  140. }
  141. /**
  142. * 关闭当前连接
  143. */
  144. public function close()
  145. {
  146. GET_SERVER()->deleteClient($this->fd);
  147. }
  148. /**
  149. * 向客户返回一条消息
  150. * @param Message|string 要发送的消息或原始数据
  151. */
  152. public function response($msg, $cb=null)
  153. {
  154. $buffer = is_a($msg, 'Message') ? $msg->toBuffer() : strval($msg);
  155. $this->writeQueue[] = [$buffer, 0, strlen($buffer), $cb];
  156. GET_SERVER()->setClientWriting($this->fd);
  157. }
  158. /**
  159. * 处理来自客户的请求
  160. * @param Message $msg 消息
  161. */
  162. abstract public function onRequest($msg);
  163. }