class.async.php 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. <?php
  2. /**
  3. * 异步任务队列类
  4. */
  5. class Async
  6. {
  7. /**
  8. * Redis的服务地址
  9. * @var string
  10. */
  11. protected $host;
  12. /**
  13. * Redis的服务端口
  14. * @var int
  15. */
  16. protected $port;
  17. /**
  18. * Worker的数量
  19. * @var int
  20. */
  21. protected $workerNum;
  22. /**
  23. * Redis对象
  24. * @var Redis
  25. */
  26. protected $redis;
  27. /**
  28. * 管理命令列表
  29. */
  30. protected $adminCmdList = ['stop', 'restart', 'reload', 'quit', 'exit'];
  31. /**
  32. * 异步任务数据的队列KEY
  33. * @const string
  34. */
  35. const KEY_QUEUE = 'ASYNC_Q';
  36. /**
  37. * 发送管理命令的Redis队列KEY
  38. * @const string
  39. */
  40. const KEY_ADMIN = 'ADMIN_Q';
  41. /**
  42. * 管理命令的标识
  43. * @const string
  44. */
  45. const CMD_ADMIN = '!';
  46. /**
  47. * 构造函数
  48. * @param string $host Redis服务地址
  49. * @param string $port Redis服务端口
  50. * @param int $workerNum Worker的数量
  51. */
  52. public function __construct($host, $port, $workerNum)
  53. {
  54. $this->host = $host;
  55. $this->port = $port;
  56. $this->workerNum = $workerNum;
  57. $this->redis = new Redis();
  58. $this->redis->connect($host, $port, 3);
  59. }
  60. /**
  61. * 添加一条管理命令
  62. * @param int $workerId 指定操作的目标workerId
  63. * @param string $cmd 管理命令(stop,restart,reload,quit,exit)
  64. * @return bool 是否成功
  65. */
  66. public function admin($cmd, $workerId = -1)
  67. {
  68. if (!in_array($cmd, $this->adminCmdList)) {
  69. return false;
  70. }
  71. if ($workerId > -1) {
  72. $flag = $this->redis->rPush(self::KEY_ADMIN . $workerId, $cmd);
  73. return $flag !== false;
  74. } else {
  75. for ($i = 0; $i < $this->workerNum; $i++) {
  76. $flag = $this->redis->rPush(self::KEY_ADMIN . $i, $cmd);
  77. if ($flag === false) {
  78. return false;
  79. }
  80. }
  81. return true;
  82. }
  83. }
  84. /**
  85. * 向任务队列中添加一条任务数据
  86. * @param string $name 任务名
  87. * @param mixed $arg1 参数1
  88. * ...
  89. * @return bool|int 成功返回队列中的消息数量,失败返回false
  90. */
  91. public function pushJob()
  92. {
  93. $argc = func_num_args();
  94. $args = func_get_args();
  95. if ($argc < 1) {
  96. return false;
  97. }
  98. if ($argc == 1 && is_array($args)) {
  99. $args = $args[0];
  100. }
  101. return $this->redis->rPush(self::KEY_QUEUE, json_encode($args));
  102. }
  103. /**
  104. * 从任务队列中取一条任务数据出来(阻塞读取)
  105. * @param int $workerId 当前workerId,用于提取管理命令
  106. * @param int $timeout 超时时间
  107. * @return bool|array 成功返回任务数据,失败返回false
  108. */
  109. public function popJob($workerId = -1, $timeout = 5)
  110. {
  111. $keys = [self::KEY_QUEUE];
  112. if ($workerId > -1) {
  113. $keys[] = self::KEY_ADMIN . $workerId;
  114. }
  115. $ret = $this->redis->blPop($keys, $timeout);
  116. if (!$ret || empty($ret)) {
  117. return false;
  118. }
  119. $key = $ret[0];
  120. $value = $ret[1];
  121. if ($key == self::KEY_QUEUE) {
  122. return json_decode($value, true);
  123. } else {
  124. return [self::CMD_ADMIN, $value];
  125. }
  126. }
  127. /**
  128. * ping操作,用于保持连接不断开
  129. */
  130. public function ping()
  131. {
  132. return $this->redis->ping();
  133. }
  134. /**
  135. * 魔术方法
  136. * @param string $method 被调用的方法名
  137. * @param array $args 传入的参数
  138. * @return mixed
  139. */
  140. public function __call($method, $args)
  141. {
  142. if (empty($method) || preg_match('/\W/', $method)) {
  143. return false;
  144. }
  145. if (in_array($method, $this->adminCmdList)) {
  146. return $this->admin($method);
  147. } else {
  148. array_unshift($args, $method);
  149. return $this->pushJob($args);
  150. }
  151. }
  152. }