redis_monitor.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "redisdog/model"
  8. )
  9. const (
  10. MONITOR_DEFAULT_LOOP_INTERVAL = 30
  11. MONITOR_DEFAULT_MAIL_INTERVAL = 3600
  12. )
  13. type RedisMonitor struct {
  14. redisCfg *model.RedisCfg
  15. monitorLogger *model.MonitorLog
  16. warnLogger *model.WarnLog
  17. processLogger *model.ProcessLog
  18. logs map[int64]*model.MonitorLogRow
  19. }
  20. func NewRedisMonitor() *RedisMonitor {
  21. return &RedisMonitor{
  22. redisCfg: model.NewRedisCfg(Db),
  23. monitorLogger: model.NewMonitorLog(Db),
  24. warnLogger: model.NewWarnLog(Db),
  25. processLogger: model.NewProcessLog(Db),
  26. logs: make(map[int64]*model.MonitorLogRow),
  27. }
  28. }
  29. func (m *RedisMonitor) Loop() {
  30. interval := MONITOR_DEFAULT_LOOP_INTERVAL
  31. for {
  32. if str, ok := SysCfg.Get("monitor_loop_interval"); ok {
  33. if num, err := strconv.Atoi(str); err == nil {
  34. interval = num
  35. } else {
  36. SYSLOG("DEBUG", fmt.Sprintf("表syscfg记录cfg_key=monitor_loop_interval的值不是数字:%s", err.Error()))
  37. }
  38. }
  39. time.Sleep(time.Second * time.Duration(interval))
  40. m.loopStep()
  41. }
  42. }
  43. func (m *RedisMonitor) loopStep() {
  44. interval := MONITOR_DEFAULT_MAIL_INTERVAL
  45. rows, err := m.redisCfg.GetAll(0)
  46. if err != nil {
  47. SYSLOG("WARN", fmt.Sprintf("函数model.RedisCfg.GetAll(0)调用失败:%s", err.Error()))
  48. return
  49. }
  50. for _, row := range rows {
  51. now, mails := time.Now(), make([]*MailRequest, 0)
  52. ts, tstr := now.Unix(), now.String()
  53. //取配置
  54. if str, ok := SysCfg.Get("monitor_mail_interval"); ok {
  55. if num, err := strconv.Atoi(str); err == nil {
  56. interval = num
  57. } else {
  58. SYSLOG("DEBUG", fmt.Sprintf("表syscfg记录cfg_key=monitor_mail_interval的值不是数字:%s", err.Error()))
  59. }
  60. }
  61. //初始化检测数据
  62. if _, ok := m.logs[row.Id]; !ok {
  63. m.logs[row.Id] = &model.MonitorLogRow{RedisId: row.Id}
  64. }
  65. m.logs[row.Id].LogTime = ts
  66. //查询Redis的状态信息
  67. info := queryRedisInfo(row)
  68. if info.Error != "" {
  69. //更新检测数据
  70. m.logs[row.Id].QueryStatus = false
  71. m.logs[row.Id].FailedCount++
  72. //判断状态异常检查是否连接出错次数达到上限
  73. if m.logs[row.Id].FailedCount >= row.MaxStatusFailed {
  74. mails = append(mails, &MailRequest{
  75. sendto: strings.Split(row.MailList, ";"),
  76. title: fmt.Sprintf("报警:Redis[%s]状态检测失败", row.Address),
  77. content: fmt.Sprintf(`<p>时间:%s</p><p>Redis#%d[%s]状态检测失败:<b>%s</b>,已连续失败:<b>%d</b>次</p>`, tstr, row.Id, row.Address, info.Error, m.logs[row.Id].FailedCount),
  78. })
  79. }
  80. //记录日志
  81. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态查询失败:%s,连接第%d次", row.Id, row.Address, info.Error, m.logs[row.Id].FailedCount))
  82. } else {
  83. var usedMemory, maxMemory, systemMemory, connections, qps, evictedKeys, eviIncreased int
  84. //已用内存
  85. if usedMemory, err = strconv.Atoi(info.Data["used_memory"]); err != nil {
  86. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态字段used_memory值无效:%s", row.Id, row.Address, err.Error()))
  87. }
  88. //最大可分配内存
  89. if maxMemory, err = strconv.Atoi(info.Data["maxmemory"]); err != nil {
  90. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态字段maxmemory值无效:%s", row.Id, row.Address, err.Error()))
  91. }
  92. //系统内存
  93. if systemMemory, err = strconv.Atoi(info.Data["total_system_memory"]); err != nil {
  94. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态字段total_system_memory值无效:%s", row.Id, row.Address, err.Error()))
  95. }
  96. //连接数
  97. if connections, err = strconv.Atoi(info.Data["connected_clients"]); err != nil {
  98. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态字段connected_clients值无效:%s", row.Id, row.Address, err.Error()))
  99. }
  100. //QPS
  101. if qps, err = strconv.Atoi(info.Data["instantaneous_ops_per_sec"]); err != nil {
  102. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态字段instantaneous_ops_per_sec值无效:%s", row.Id, row.Address, err.Error()))
  103. }
  104. //淘汰数
  105. if evictedKeys, err = strconv.Atoi(info.Data["evicted_keys"]); err != nil {
  106. SYSLOG("WARN", fmt.Sprintf("Redis#%d[%s]状态字段evicted_keys值无效:%s", row.Id, row.Address, err.Error()))
  107. }
  108. //新增淘汰数
  109. eviIncreased = evictedKeys - int(m.logs[row.Id].EvictedKeys)
  110. //检查连接的客户端数量是否达到上限
  111. if connections >= int(row.MaxConnection) {
  112. mails = append(mails, &MailRequest{
  113. sendto: strings.Split(row.MailList, ";"),
  114. title: fmt.Sprintf("报警:Redis[%s]连接数达到上限", row.Address),
  115. content: fmt.Sprintf(`<p>时间:%s</p><p>Redis#%d[%s]连接数已达到:<b>%d</b>,上限为:%d</p>`, tstr, row.Id, row.Address, connections, row.MaxConnection),
  116. })
  117. }
  118. //检查QPS是否达到上限
  119. if qps >= int(row.MaxQPS) {
  120. mails = append(mails, &MailRequest{
  121. sendto: strings.Split(row.MailList, ";"),
  122. title: fmt.Sprintf("报警:Redis[%s]QPS达到上限", row.Address),
  123. content: fmt.Sprintf(`<p>时间:%s</p><p>Redis#%d[%s]QPS已达到:<span color="red">%d</span>,上限为:%d</p>`, tstr, row.Id, row.Address, qps, row.MaxQPS),
  124. })
  125. }
  126. //检查新增淘汰数是否达到上限
  127. if eviIncreased >= int(row.MaxEviIncreased) {
  128. mails = append(mails, &MailRequest{
  129. sendto: strings.Split(row.MailList, ";"),
  130. title: fmt.Sprintf("报警:Redis[%s]新增淘汰记录数达到上限", row.Address),
  131. content: fmt.Sprintf(`<p>时间:%s</p><p>Redis#%d[%s]新增淘汰记录数已达到:<span color="red">%d</span>,上限为:%d</p>`, tstr, row.Id, row.Address, eviIncreased, row.MaxEviIncreased),
  132. })
  133. }
  134. //有内存使用限制,并且允许自动扩容
  135. if maxMemory > 0 {
  136. flag := false
  137. //检查当前使用内存是否达到了预警值
  138. freeMemory := maxMemory - usedMemory
  139. if freeMemory < int(row.MinMemoryFree) {
  140. mails = append(mails, &MailRequest{
  141. sendto: strings.Split(row.MailList, ";"),
  142. title: fmt.Sprintf("报警:Redis[%s]可用内存不足", row.Address),
  143. content: fmt.Sprintf(`<p>时间:%s</p><p>Redis#%d[%s]可用内存不足,分配:<b>%s</b>,剩余:<b>%s</b>,要求剩余:<b>%s</b></p>`, tstr, row.Id, row.Address, info.Data["maxmemory_human"], number2size(float64(freeMemory)), number2size(float64(row.MinMemoryFree))),
  144. })
  145. flag = true
  146. }
  147. //判断是否需要自动扩容
  148. if flag && row.StepMemoryIncrease > 0 && maxMemory < int(row.MaxMemoryUsage) {
  149. newMaxMemory := int64(maxMemory) + row.StepMemoryIncrease
  150. if newMaxMemory > row.MaxMemoryUsage {
  151. newMaxMemory = row.MaxMemoryUsage
  152. }
  153. ret, err := resetRedisConfig(row, newMaxMemory)
  154. //判断扩容结果
  155. if err != nil {
  156. SYSLOG("ERROR", fmt.Sprintf("Redis#%d[%s]扩容失败:%s", row.Id, row.Address, err.Error()))
  157. } else {
  158. if ret {
  159. mails = append(mails, &MailRequest{
  160. sendto: strings.Split(row.MailList, ";"),
  161. title: fmt.Sprintf("通知:Redis[%s]已自动扩容", row.Address),
  162. content: fmt.Sprintf(`<p>时间:%s</p><p>Redis#%d[%s]最大可用内存限制已由<b>%s</b>调整为<b>%s</b></p>`, tstr, row.Id, row.Address, info.Data["maxmemory_human"], number2size(float64(newMaxMemory))),
  163. })
  164. if _, err = m.processLogger.Add(&model.ProcessLogRow{RedisId: row.Id, MaxMemoryBefore: int64(maxMemory), MaxMemoryAfter: newMaxMemory}); err != nil {
  165. SYSLOG("ERROR", fmt.Sprintf("Redis#%d[%s]扩容日志写入DB失败:%s", row.Id, row.Address, err.Error()))
  166. }
  167. } else {
  168. SYSLOG("ERROR", fmt.Sprintf("Redis#%d[%s]扩容失败:无改变", row.Id, row.Address))
  169. }
  170. }
  171. }
  172. }
  173. //更新检测数据
  174. m.logs[row.Id].QueryStatus = true
  175. m.logs[row.Id].FailedCount = 0
  176. m.logs[row.Id].UsedMemory = int64(usedMemory)
  177. m.logs[row.Id].MaxMemory = int64(maxMemory)
  178. m.logs[row.Id].SystemMemory = int64(systemMemory)
  179. m.logs[row.Id].Connection = int64(connections)
  180. m.logs[row.Id].QPS = int64(qps)
  181. m.logs[row.Id].EvictedKeys = int64(evictedKeys)
  182. m.logs[row.Id].EviIncreased = int64(eviIncreased)
  183. }
  184. //添加任务到邮件发送队列
  185. if len(mails) > 0 && m.logs[row.Id].LastMailTime < ts-int64(interval) {
  186. m.logs[row.Id].LastMailTime = ts
  187. for _, mail := range mails {
  188. Sender.Push(mail)
  189. if _, err = m.warnLogger.Add(&model.WarnLogRow{RedisId: row.Id, WarnMsg: mail.content}); err != nil {
  190. SYSLOG("ERROR", fmt.Sprintf("Redis#%d[%s]报警日志写入DB失败:%s", row.Id, row.Address, err.Error()))
  191. }
  192. }
  193. }
  194. //记录检查日志
  195. if _, err = m.monitorLogger.Add(m.logs[row.Id]); err != nil {
  196. SYSLOG("ERROR", fmt.Sprintf("Redis#%d[%s]监控日志写入DB失败:%s", row.Id, row.Address, err.Error()))
  197. }
  198. }
  199. }