123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562 |
- // Copyright 2012 Gary Burd
- //
- // Licensed under the Apache License, Version 2.0 (the "License"): you may
- // not use this file except in compliance with the License. You may obtain
- // a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- // License for the specific language governing permissions and limitations
- // under the License.
- package redis
- import (
- "bytes"
- "crypto/rand"
- "crypto/sha1"
- "errors"
- "io"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- "github.com/gomodule/redigo/internal"
- )
- var (
- _ ConnWithTimeout = (*activeConn)(nil)
- _ ConnWithTimeout = (*errorConn)(nil)
- )
- var nowFunc = time.Now // for testing
- // ErrPoolExhausted is returned from a pool connection method (Do, Send,
- // Receive, Flush, Err) when the maximum number of database connections in the
- // pool has been reached.
- var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
- var (
- errPoolClosed = errors.New("redigo: connection pool closed")
- errConnClosed = errors.New("redigo: connection closed")
- )
- // Pool maintains a pool of connections. The application calls the Get method
- // to get a connection from the pool and the connection's Close method to
- // return the connection's resources to the pool.
- //
- // The following example shows how to use a pool in a web application. The
- // application creates a pool at application startup and makes it available to
- // request handlers using a package level variable. The pool configuration used
- // here is an example, not a recommendation.
- //
- // func newPool(addr string) *redis.Pool {
- // return &redis.Pool{
- // MaxIdle: 3,
- // IdleTimeout: 240 * time.Second,
- // Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
- // }
- // }
- //
- // var (
- // pool *redis.Pool
- // redisServer = flag.String("redisServer", ":6379", "")
- // )
- //
- // func main() {
- // flag.Parse()
- // pool = newPool(*redisServer)
- // ...
- // }
- //
- // A request handler gets a connection from the pool and closes the connection
- // when the handler is done:
- //
- // func serveHome(w http.ResponseWriter, r *http.Request) {
- // conn := pool.Get()
- // defer conn.Close()
- // ...
- // }
- //
- // Use the Dial function to authenticate connections with the AUTH command or
- // select a database with the SELECT command:
- //
- // pool := &redis.Pool{
- // // Other pool configuration not shown in this example.
- // Dial: func () (redis.Conn, error) {
- // c, err := redis.Dial("tcp", server)
- // if err != nil {
- // return nil, err
- // }
- // if _, err := c.Do("AUTH", password); err != nil {
- // c.Close()
- // return nil, err
- // }
- // if _, err := c.Do("SELECT", db); err != nil {
- // c.Close()
- // return nil, err
- // }
- // return c, nil
- // },
- // }
- //
- // Use the TestOnBorrow function to check the health of an idle connection
- // before the connection is returned to the application. This example PINGs
- // connections that have been idle more than a minute:
- //
- // pool := &redis.Pool{
- // // Other pool configuration not shown in this example.
- // TestOnBorrow: func(c redis.Conn, t time.Time) error {
- // if time.Since(t) < time.Minute {
- // return nil
- // }
- // _, err := c.Do("PING")
- // return err
- // },
- // }
- //
- type Pool struct {
- // Dial is an application supplied function for creating and configuring a
- // connection.
- //
- // The connection returned from Dial must not be in a special state
- // (subscribed to pubsub channel, transaction started, ...).
- Dial func() (Conn, error)
- // TestOnBorrow is an optional application supplied function for checking
- // the health of an idle connection before the connection is used again by
- // the application. Argument t is the time that the connection was returned
- // to the pool. If the function returns an error, then the connection is
- // closed.
- TestOnBorrow func(c Conn, t time.Time) error
- // Maximum number of idle connections in the pool.
- MaxIdle int
- // Maximum number of connections allocated by the pool at a given time.
- // When zero, there is no limit on the number of connections in the pool.
- MaxActive int
- // Close connections after remaining idle for this duration. If the value
- // is zero, then idle connections are not closed. Applications should set
- // the timeout to a value less than the server's timeout.
- IdleTimeout time.Duration
- // If Wait is true and the pool is at the MaxActive limit, then Get() waits
- // for a connection to be returned to the pool before returning.
- Wait bool
- // Close connections older than this duration. If the value is zero, then
- // the pool does not close connections based on age.
- MaxConnLifetime time.Duration
- chInitialized uint32 // set to 1 when field ch is initialized
- mu sync.Mutex // mu protects the following fields
- closed bool // set to true when the pool is closed.
- active int // the number of open connections in the pool
- ch chan struct{} // limits open connections when p.Wait is true
- idle idleList // idle connections
- }
- // NewPool creates a new pool.
- //
- // Deprecated: Initialize the Pool directory as shown in the example.
- func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
- return &Pool{Dial: newFn, MaxIdle: maxIdle}
- }
- // Get gets a connection. The application must close the returned connection.
- // This method always returns a valid connection so that applications can defer
- // error handling to the first use of the connection. If there is an error
- // getting an underlying connection, then the connection Err, Do, Send, Flush
- // and Receive methods return that error.
- func (p *Pool) Get() Conn {
- pc, err := p.get(nil)
- if err != nil {
- return errorConn{err}
- }
- return &activeConn{p: p, pc: pc}
- }
- // PoolStats contains pool statistics.
- type PoolStats struct {
- // ActiveCount is the number of connections in the pool. The count includes
- // idle connections and connections in use.
- ActiveCount int
- // IdleCount is the number of idle connections in the pool.
- IdleCount int
- }
- // Stats returns pool's statistics.
- func (p *Pool) Stats() PoolStats {
- p.mu.Lock()
- stats := PoolStats{
- ActiveCount: p.active,
- IdleCount: p.idle.count,
- }
- p.mu.Unlock()
- return stats
- }
- // ActiveCount returns the number of connections in the pool. The count
- // includes idle connections and connections in use.
- func (p *Pool) ActiveCount() int {
- p.mu.Lock()
- active := p.active
- p.mu.Unlock()
- return active
- }
- // IdleCount returns the number of idle connections in the pool.
- func (p *Pool) IdleCount() int {
- p.mu.Lock()
- idle := p.idle.count
- p.mu.Unlock()
- return idle
- }
- // Close releases the resources used by the pool.
- func (p *Pool) Close() error {
- p.mu.Lock()
- if p.closed {
- p.mu.Unlock()
- return nil
- }
- p.closed = true
- p.active -= p.idle.count
- pc := p.idle.front
- p.idle.count = 0
- p.idle.front, p.idle.back = nil, nil
- if p.ch != nil {
- close(p.ch)
- }
- p.mu.Unlock()
- for ; pc != nil; pc = pc.next {
- pc.c.Close()
- }
- return nil
- }
- func (p *Pool) lazyInit() {
- // Fast path.
- if atomic.LoadUint32(&p.chInitialized) == 1 {
- return
- }
- // Slow path.
- p.mu.Lock()
- if p.chInitialized == 0 {
- p.ch = make(chan struct{}, p.MaxActive)
- if p.closed {
- close(p.ch)
- } else {
- for i := 0; i < p.MaxActive; i++ {
- p.ch <- struct{}{}
- }
- }
- atomic.StoreUint32(&p.chInitialized, 1)
- }
- p.mu.Unlock()
- }
- // get prunes stale connections and returns a connection from the idle list or
- // creates a new connection.
- func (p *Pool) get(ctx interface {
- Done() <-chan struct{}
- Err() error
- }) (*poolConn, error) {
- // Handle limit for p.Wait == true.
- if p.Wait && p.MaxActive > 0 {
- p.lazyInit()
- if ctx == nil {
- <-p.ch
- } else {
- select {
- case <-p.ch:
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- }
- p.mu.Lock()
- // Prune stale connections at the back of the idle list.
- if p.IdleTimeout > 0 {
- n := p.idle.count
- for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
- pc := p.idle.back
- p.idle.popBack()
- p.mu.Unlock()
- pc.c.Close()
- p.mu.Lock()
- p.active--
- }
- }
- // Get idle connection from the front of idle list.
- for p.idle.front != nil {
- pc := p.idle.front
- p.idle.popFront()
- p.mu.Unlock()
- if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
- (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
- return pc, nil
- }
- pc.c.Close()
- p.mu.Lock()
- p.active--
- }
- // Check for pool closed before dialing a new connection.
- if p.closed {
- p.mu.Unlock()
- return nil, errors.New("redigo: get on closed pool")
- }
- // Handle limit for p.Wait == false.
- if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
- p.mu.Unlock()
- return nil, ErrPoolExhausted
- }
- p.active++
- p.mu.Unlock()
- c, err := p.Dial()
- if err != nil {
- c = nil
- p.mu.Lock()
- p.active--
- if p.ch != nil && !p.closed {
- p.ch <- struct{}{}
- }
- p.mu.Unlock()
- }
- return &poolConn{c: c, created: nowFunc()}, err
- }
- func (p *Pool) put(pc *poolConn, forceClose bool) error {
- p.mu.Lock()
- if !p.closed && !forceClose {
- pc.t = nowFunc()
- p.idle.pushFront(pc)
- if p.idle.count > p.MaxIdle {
- pc = p.idle.back
- p.idle.popBack()
- } else {
- pc = nil
- }
- }
- if pc != nil {
- p.mu.Unlock()
- pc.c.Close()
- p.mu.Lock()
- p.active--
- }
- if p.ch != nil && !p.closed {
- p.ch <- struct{}{}
- }
- p.mu.Unlock()
- return nil
- }
- type activeConn struct {
- p *Pool
- pc *poolConn
- state int
- }
- var (
- sentinel []byte
- sentinelOnce sync.Once
- )
- func initSentinel() {
- p := make([]byte, 64)
- if _, err := rand.Read(p); err == nil {
- sentinel = p
- } else {
- h := sha1.New()
- io.WriteString(h, "Oops, rand failed. Use time instead.")
- io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
- sentinel = h.Sum(nil)
- }
- }
- func (ac *activeConn) Close() error {
- pc := ac.pc
- if pc == nil {
- return nil
- }
- ac.pc = nil
- if ac.state&internal.MultiState != 0 {
- pc.c.Send("DISCARD")
- ac.state &^= (internal.MultiState | internal.WatchState)
- } else if ac.state&internal.WatchState != 0 {
- pc.c.Send("UNWATCH")
- ac.state &^= internal.WatchState
- }
- if ac.state&internal.SubscribeState != 0 {
- pc.c.Send("UNSUBSCRIBE")
- pc.c.Send("PUNSUBSCRIBE")
- // To detect the end of the message stream, ask the server to echo
- // a sentinel value and read until we see that value.
- sentinelOnce.Do(initSentinel)
- pc.c.Send("ECHO", sentinel)
- pc.c.Flush()
- for {
- p, err := pc.c.Receive()
- if err != nil {
- break
- }
- if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
- ac.state &^= internal.SubscribeState
- break
- }
- }
- }
- pc.c.Do("")
- ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
- return nil
- }
- func (ac *activeConn) Err() error {
- pc := ac.pc
- if pc == nil {
- return errConnClosed
- }
- return pc.c.Err()
- }
- func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
- pc := ac.pc
- if pc == nil {
- return nil, errConnClosed
- }
- ci := internal.LookupCommandInfo(commandName)
- ac.state = (ac.state | ci.Set) &^ ci.Clear
- return pc.c.Do(commandName, args...)
- }
- func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
- pc := ac.pc
- if pc == nil {
- return nil, errConnClosed
- }
- cwt, ok := pc.c.(ConnWithTimeout)
- if !ok {
- return nil, errTimeoutNotSupported
- }
- ci := internal.LookupCommandInfo(commandName)
- ac.state = (ac.state | ci.Set) &^ ci.Clear
- return cwt.DoWithTimeout(timeout, commandName, args...)
- }
- func (ac *activeConn) Send(commandName string, args ...interface{}) error {
- pc := ac.pc
- if pc == nil {
- return errConnClosed
- }
- ci := internal.LookupCommandInfo(commandName)
- ac.state = (ac.state | ci.Set) &^ ci.Clear
- return pc.c.Send(commandName, args...)
- }
- func (ac *activeConn) Flush() error {
- pc := ac.pc
- if pc == nil {
- return errConnClosed
- }
- return pc.c.Flush()
- }
- func (ac *activeConn) Receive() (reply interface{}, err error) {
- pc := ac.pc
- if pc == nil {
- return nil, errConnClosed
- }
- return pc.c.Receive()
- }
- func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
- pc := ac.pc
- if pc == nil {
- return nil, errConnClosed
- }
- cwt, ok := pc.c.(ConnWithTimeout)
- if !ok {
- return nil, errTimeoutNotSupported
- }
- return cwt.ReceiveWithTimeout(timeout)
- }
- type errorConn struct{ err error }
- func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
- func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
- return nil, ec.err
- }
- func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
- func (ec errorConn) Err() error { return ec.err }
- func (ec errorConn) Close() error { return nil }
- func (ec errorConn) Flush() error { return ec.err }
- func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
- func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
- type idleList struct {
- count int
- front, back *poolConn
- }
- type poolConn struct {
- c Conn
- t time.Time
- created time.Time
- next, prev *poolConn
- }
- func (l *idleList) pushFront(pc *poolConn) {
- pc.next = l.front
- pc.prev = nil
- if l.count == 0 {
- l.back = pc
- } else {
- l.front.prev = pc
- }
- l.front = pc
- l.count++
- return
- }
- func (l *idleList) popFront() {
- pc := l.front
- l.count--
- if l.count == 0 {
- l.front, l.back = nil, nil
- } else {
- pc.next.prev = nil
- l.front = pc.next
- }
- pc.next, pc.prev = nil, nil
- }
- func (l *idleList) popBack() {
- pc := l.back
- l.count--
- if l.count == 0 {
- l.front, l.back = nil, nil
- } else {
- pc.prev.next = nil
- l.back = pc.prev
- }
- pc.next, pc.prev = nil, nil
- }
|