Class Mongo::Pool
In: lib/mongo/util/pool.rb
Parent: Object

Methods

Constants

PING_ATTEMPTS = 6
MAX_PING_TIME = 1_000_000
PRUNE_INTERVAL = 10_000

Attributes

address  [RW] 
checked_out  [RW] 
connection  [RW] 
host  [RW] 
port  [RW] 
safe  [RW] 
size  [RW] 
timeout  [RW] 

Public Class methods

Create a new pool of connections.

[Source]

# File lib/mongo/util/pool.rb, line 28
    def initialize(connection, host, port, opts={})
      @connection  = connection

      @host, @port = host, port

      # A Mongo::Node object.
      @node = opts[:node]

      # The string address
      @address = "#{@host}:#{@port}"

      # Pool size and timeout.
      @size    = opts.fetch(:size, 20)
      @timeout = opts.fetch(:timeout, 30)

      # Mutex for synchronizing pool access
      @connection_mutex = Mutex.new

      # Condition variable for signal and wait
      @queue = ConditionVariable.new

      # Operations to perform on a socket
      @socket_ops = Hash.new { |h, k| h[k] = [] }

      @sockets      = []
      @pids         = {}
      @checked_out  = []
      @ping_time    = nil
      @last_ping    = nil
      @closed       = false
      @threads_to_sockets = {}
      @checkout_counter   = 0
    end

Public Instance methods

If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.

[Source]

# File lib/mongo/util/pool.rb, line 182
    def authenticate_existing
      @connection_mutex.synchronize do
        @sockets.each do |socket|
          @socket_ops[socket] << Proc.new do
            @connection.apply_saved_authentication(:socket => socket)
          end
        end
      end
    end

Return a socket to the pool.

[Source]

# File lib/mongo/util/pool.rb, line 142
    def checkin(socket)
      @connection_mutex.synchronize do
        if @checked_out.delete(socket)
          @queue.signal
        else
          return false
        end
      end
      true
    end

Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.

[Source]

# File lib/mongo/util/pool.rb, line 241
    def checkout
      @connection.connect if !@connection.connected?
      start_time = Time.now
      loop do
        if (Time.now - start_time) > @timeout
          raise ConnectionTimeoutError, "could not obtain connection within " +
            "#{@timeout} seconds. The max pool size is currently #{@size}; " +
            "consider increasing the pool size or timeout."
        end

        @connection_mutex.synchronize do
          if @checkout_counter > PRUNE_INTERVAL
            @checkout_counter = 0
            prune_thread_socket_hash
          else
            @checkout_counter += 1
          end

          if socket_for_thread = @threads_to_sockets[Thread.current]
            if !@checked_out.include?(socket_for_thread)
              socket = checkout_existing_socket(socket_for_thread)
            end
          else # First checkout for this thread
            thread_length = @threads_to_sockets.keys.length
            if (thread_length <= @sockets.size) && (@sockets.size < @size)
              socket = checkout_new_socket
            elsif @checked_out.size < @sockets.size
              socket = checkout_existing_socket
            elsif @sockets.size < @size
              socket = checkout_new_socket
            end
          end

          if socket
            # This calls all procs, in order, scoped to existing sockets.
            # At the moment, we use this to lazily authenticate and
            # logout existing socket connections.
            @socket_ops[socket].reject! do |op|
              op.call
            end

            if socket.closed?
              @checked_out.delete(socket)
              @sockets.delete(socket)
              @threads_to_sockets.each do |k,v|
                if v == socket
                  @threads_to_sockets.delete(k)
                end
              end

              socket = checkout_new_socket
            end
            
            return socket
          else
            # Otherwise, wait
            @queue.wait(@connection_mutex)
          end
        end
      end
    end

Checks out the first available socket from the pool.

If the pid has changed, remove the socket and check out new one.

This method is called exclusively from checkout; therefore, it runs within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 211
    def checkout_existing_socket(socket=nil)
      if !socket
        socket = (@sockets - @checked_out).first
      end

      if @pids[socket] != Process.pid
        @pids[socket] = nil
        @sockets.delete(socket)
        if socket
          socket.close unless socket.closed?
        end
        checkout_new_socket
      else
        @checked_out << socket
        @threads_to_sockets[Thread.current] = socket
        socket
      end
    end

Adds a new socket to the pool and checks it out.

This method is called exclusively from checkout; therefore, it runs within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 157
    def checkout_new_socket
      begin
        socket = @connection.socket_class.new(@host, @port, @connection.op_timeout)
        socket.pool = self
      rescue => ex
        socket.close if socket
        raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
        @node.close if @node
      end

      # If any saved authentications exist, we want to apply those
      # when creating new sockets.
      @connection.apply_saved_authentication(:socket => socket)

      @sockets << socket
      @pids[socket] = Process.pid
      @checked_out << socket
      @threads_to_sockets[Thread.current] = socket
      socket
    end

Close this pool.

@option opts [Boolean] :soft (false) If true,

  close only those sockets that are not checked out.

[Source]

# File lib/mongo/util/pool.rb, line 66
    def close(opts={})
      @connection_mutex.synchronize do
        if opts[:soft] && !@checked_out.empty?
          @closing = true
          close_sockets(@sockets - @checked_out)
        else
          close_sockets(@sockets)
          @closed = true
        end
      end
    end

[Source]

# File lib/mongo/util/pool.rb, line 78
    def closed?
      @closed
    end

[Source]

# File lib/mongo/util/pool.rb, line 91
    def host_port
      [@host, @port]
    end

[Source]

# File lib/mongo/util/pool.rb, line 87
    def host_string
      "#{@host}:#{@port}"
    end

[Source]

# File lib/mongo/util/pool.rb, line 82
    def inspect
      "#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
        "@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
    end

Store the logout op for each existing socket to be applied before the next use of each socket.

[Source]

# File lib/mongo/util/pool.rb, line 194
    def logout_existing(db)
      @connection_mutex.synchronize do
        @sockets.each do |socket|
          @socket_ops[socket] << Proc.new do
            @connection.db(db).issue_logout(:socket => socket)
          end
        end
      end
    end

[Source]

# File lib/mongo/util/pool.rb, line 133
    def ping
      begin
        return self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
      rescue OperationFailure, SocketError, SystemCallError, IOError
        return false
      end
    end

Refresh ping time only if we haven‘t checked within the last five minutes.

[Source]

# File lib/mongo/util/pool.rb, line 97
    def ping_time
      if !@last_ping
        @last_ping = Time.now
        @ping_time = refresh_ping_time
      elsif Time.now - @last_ping > 300
        @last_ping = Time.now
        @ping_time = refresh_ping_time
      else
        @ping_time
      end
    end

[Source]

# File lib/mongo/util/pool.rb, line 230
    def prune_thread_socket_hash
      current_threads = Set[*Thread.list]

      @threads_to_sockets.delete_if do |thread, socket|
        !current_threads.include?(thread)
      end
    end

Return the time it takes on average to do a round-trip against this node.

[Source]

# File lib/mongo/util/pool.rb, line 111
    def refresh_ping_time
      trials = []
      PING_ATTEMPTS.times do
        t1 = Time.now
        if !self.ping
          return MAX_PING_TIME
        end
        trials << (Time.now - t1) * 1000
      end

      trials.sort!

      # Delete shortest and longest times
      trials.delete_at(trials.length-1)
      trials.delete_at(0)

      total = 0.0
      trials.each { |t| total += t }

      (total / trials.length).ceil
    end

[Validate]