Class Mongo::Pool
In: lib/mongo/util/pool.rb
Parent: Object

Methods

Constants

PING_ATTEMPTS = 6
MAX_PING_TIME = 1_000_000

Attributes

address  [RW] 
checked_out  [RW] 
connection  [RW] 
host  [RW] 
port  [RW] 
safe  [RW] 
size  [RW] 
timeout  [RW] 

Public Class methods

Create a new pool of connections.

[Source]

# File lib/mongo/util/pool.rb, line 27
    def initialize(connection, host, port, opts={})
      @connection  = connection

      @host, @port = host, port

      # A Mongo::Node object.
      @node = opts[:node]

      # The string address
      @address = "#{@host}:#{@port}"

      # Pool size and timeout.
      @size      = opts[:size] || 10000
      @timeout   = opts[:timeout]   || 5.0

      # Mutex for synchronizing pool access
      @connection_mutex = Mutex.new

      # Condition variable for signal and wait
      @queue = ConditionVariable.new

      # Operations to perform on a socket
      @socket_ops = Hash.new { |h, k| h[k] = [] }

      @sockets      = []
      @pids         = {}
      @checked_out  = []
      @threads      = {}
      @ping_time    = nil
      @last_ping    = nil
      @closed       = false
      @last_pruning = Time.now
    end

Public Instance methods

If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.

[Source]

# File lib/mongo/util/pool.rb, line 190
    def authenticate_existing
      @connection_mutex.synchronize do
        @sockets.each do |socket|
          @socket_ops[socket] << Proc.new do
            @connection.apply_saved_authentication(:socket => socket)
          end
        end
      end
    end

Return a socket to the pool.

[Source]

# File lib/mongo/util/pool.rb, line 150
    def checkin(socket)
      @connection_mutex.synchronize do
        if @checked_out.delete(socket)
          @queue.signal
        else
          return false
        end
      end
      true
    end

Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.

[Source]

# File lib/mongo/util/pool.rb, line 249
    def checkout
      @connection.connect if !@connection.connected?
      start_time = Time.now
      loop do
        if (Time.now - start_time) > @timeout
            raise ConnectionTimeoutError, "could not obtain connection within " +
              "#{@timeout} seconds. The max pool size is currently #{@size}; " +
              "consider increasing the pool size or timeout."
        end

        @connection_mutex.synchronize do
          if @sockets.size > @size * 1.5
            prune
          end

          socket = if @checked_out.size < @sockets.size
                     checkout_existing_socket
                   else
                     checkout_new_socket
                   end

          if socket
            # This calls all procs, in order, scoped to existing sockets.
            # At the moment, we use this to lazily authenticate and
            # logout existing socket connections.
            @socket_ops[socket].reject! do |op|
              op.call
            end

            return socket
          else
            # Otherwise, wait
            @queue.wait(@connection_mutex)
          end
        end
      end
    end

Checks out the first available socket from the pool.

If the pid has changed, remove the socket and check out new one.

This method is called exclusively from checkout; therefore, it runs within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 219
    def checkout_existing_socket
      socket = (@sockets - @checked_out).first
      if @pids[socket] != Process.pid
         @pids[socket] = nil
         @sockets.delete(socket)
         socket.close if socket
         checkout_new_socket
      else
        @checked_out << socket
        @threads[socket] = Thread.current.object_id
        socket
      end
    end

Adds a new socket to the pool and checks it out.

This method is called exclusively from checkout; therefore, it runs within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 165
    def checkout_new_socket
      begin
        socket = self.connection.socket_class.new(@host, @port)
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
      rescue => ex
        socket.close if socket
        raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
        @node.close if @node
      end

      # If any saved authentications exist, we want to apply those
      # when creating new sockets.
      @connection.apply_saved_authentication(:socket => socket)

      @sockets << socket
      @pids[socket] = Process.pid
      @checked_out << socket
      @threads[socket] = Thread.current.object_id
      socket
    end

Close this pool.

@option opts [Boolean] :soft (false) If true,

  close only those sockets that are not checked out.

[Source]

# File lib/mongo/util/pool.rb, line 65
    def close(opts={})
      @connection_mutex.synchronize do
        if opts[:soft]
          sockets_to_close = @sockets - @checked_out
        else
          sockets_to_close = @sockets
        end
        sockets_to_close.each do |sock|
          begin
            sock.close unless sock.closed?
          rescue IOError => ex
            warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
          end
        end
        @sockets.clear
        @pids.clear
        @checked_out.clear
        @closed = true
      end
    end

[Source]

# File lib/mongo/util/pool.rb, line 86
    def closed?
      @closed
    end

[Source]

# File lib/mongo/util/pool.rb, line 99
    def host_port
      [@host, @port]
    end

[Source]

# File lib/mongo/util/pool.rb, line 95
    def host_string
      "#{@host}:#{@port}"
    end

[Source]

# File lib/mongo/util/pool.rb, line 90
    def inspect
      "#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
        "@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
    end

Store the logout op for each existing socket to be applied before the next use of each socket.

[Source]

# File lib/mongo/util/pool.rb, line 202
    def logout_existing(db)
      @connection_mutex.synchronize do
        @sockets.each do |socket|
          @socket_ops[socket] << Proc.new do
            @connection.db(db).issue_logout(:socket => socket)
          end
        end
      end
    end

[Source]

# File lib/mongo/util/pool.rb, line 141
    def ping
      begin
        return self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
      rescue OperationFailure, SocketError, SystemCallError, IOError => ex
        return false
      end
    end

Refresh ping time only if we haven‘t checked within the last five minutes.

[Source]

# File lib/mongo/util/pool.rb, line 105
    def ping_time
      if !@last_ping
        @last_ping = Time.now
        @ping_time = refresh_ping_time
      elsif Time.now - @last_ping > 300
        @last_ping = Time.now
        @ping_time = refresh_ping_time
      else
        @ping_time
      end
    end

If we have more sockets than the soft limit specified by the max pool size, then we should prune those extraneous sockets.

Note: this must be called from within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 238
    def prune
      idle_sockets = @sockets - @checked_out
      idle_sockets.each do |socket|
        socket.close unless socket.closed?
        @sockets.delete(socket)
      end
    end

Return the time it takes on average to do a round-trip against this node.

[Source]

# File lib/mongo/util/pool.rb, line 119
    def refresh_ping_time
      trials = []
      PING_ATTEMPTS.times do
        t1 = Time.now
        if !self.ping
          return MAX_PING_TIME
        end
        trials << (Time.now - t1) * 1000
      end

      trials.sort!

      # Delete shortest and longest times
      trials.delete_at(trials.length-1)
      trials.delete_at(0)

      total = 0.0
      trials.each { |t| total += t }

      (total / trials.length).ceil
    end

[Validate]