diff --git a/docs/reference/create-client.txt b/docs/reference/create-client.txt index b71a866910..f54805651a 100644 --- a/docs/reference/create-client.txt +++ b/docs/reference/create-client.txt @@ -575,6 +575,11 @@ Ruby Options - ``Object`` - ``Logger`` + * - ``:max_connecting`` + - The maximum number of connections that the connection pool will try to establish in parallel. + - ``Integer`` + - 2 + * - ``:max_idle_time`` - The maximum time, in seconds, that a connection can be idle before it is closed by the connection pool. @@ -1018,6 +1023,9 @@ URI options are explained in detail in the :manual:`Connection URI reference * - localThresholdMS=Integer - ``:local_threshold => Float`` + * - maxConnecting=Integer + - ``:max_connecting => Integer`` + * - maxIdleTimeMS=Integer - ``:max_idle_time => Float`` @@ -1551,6 +1559,14 @@ maximum size, the thread waits for a connection to be returned to the pool by another thread. If ``max_pool_size`` is set to zero, there is no limit for the maximum number of connections in the pool. +Each pool has a limit on the number of connections that can be concurrently +connecting to a server. This limit is called ``max_connecting`` and defaults to +2. If the number of connections that are currently connecting to a server +reaches this limit, the pool will wait for a connection attempt to succeed or +fail before attempting to create a new connection. If your application +has a large number of threads, you may want to increase ``max_connecting`` to avoid +having threads wait for a connection to be established. + The number of seconds the thread will wait for a connection to become available is configurable. This setting, called ``wait_queue_timeout``, is defined in seconds. If this timeout is reached, a ``Timeout::Error`` is raised. The @@ -1595,6 +1611,14 @@ process, increase ``max_pool_size``: client = Mongo::Client.new(["localhost:27017"], max_pool_size: 200) +To support extremely high numbers of threads that share the same client +within one process, increase ``max_connecting``: + +.. code-block:: ruby + + client = Mongo::Client.new(["localhost:27017"], max_pool_size: 200, max_connecting: 10) + + Any number of threads are allowed to wait for connections to become available, and they can wait the default (1 second) or the ``wait_queue_timeout`` setting: @@ -1627,7 +1651,7 @@ such as Unicorn, Puma or Passenger, or when the application otherwise forks, each process should generally each have their own ``Mongo::Client`` instances. This is because: -1. The background threads remain in the parent process and are not transfered +1. The background threads remain in the parent process and are not transferred to the child process. 2. File descriptors like network sockets are shared between parent and child processes. diff --git a/docs/release-notes.txt b/docs/release-notes.txt index bb8739c977..f8eebf104a 100644 --- a/docs/release-notes.txt +++ b/docs/release-notes.txt @@ -28,6 +28,12 @@ now supports Ruby 3.2. Ruby 2.5 and 2.6 are now deprecated. This release includes the following new features: +- The driver now limits the number of connections established by a connection + pool simultaneously. By default the limit is 2. The limit can be configured + with the ``:max_connecting`` option of the ``Mongo::Client`` constructor. + The default should be sufficient for most applications. However, if your + application is using a large number of threads, you may need to increase + the limit. - Added support for automatic AWS credentials retrieval and authentication with temporary credentials when AWS KMS is used for client side encryption. - Added support for automatic GCP credentials retrieval when Google Cloud Key diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index 42de028ca5..e06231d4ad 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -71,6 +71,7 @@ class Client :local_threshold, :logger, :log_prefix, + :max_connecting, :max_idle_time, :max_pool_size, :max_read_retries, @@ -266,6 +267,12 @@ def hash # @option options [ String ] :log_prefix A custom log prefix to use when # logging. This option is experimental and subject to change in a future # version of the driver. + # @option options [ Integer ] :max_connecting The maximum number of + # connections that can be connecting simultaneously. The default is 2. + # This option should be increased if there are many threads that share + # the same client and the application is experiencing timeouts + # while waiting for connections to be established. + # selecting a server for an operation. The default is 2. # @option options [ Integer ] :max_idle_time The maximum seconds a socket can remain idle # since it has been checked in to the pool. # @option options [ Integer ] :max_pool_size The maximum size of the @@ -1318,6 +1325,7 @@ def validate_new_options!(opts) key = k.to_sym if VALID_OPTIONS.include?(key) validate_max_min_pool_size!(key, opts) + validate_max_connecting!(key, opts) validate_read!(key, opts) if key == :compressors compressors = valid_compressors(v) @@ -1580,6 +1588,23 @@ def validate_max_min_pool_size!(option, opts) true end + # Validates whether the max_connecting option is valid. + # + # @param [ Symbol ] option The option to validate. + # @param [ Hash ] opts The client options. + # + # @return [ true ] If the option is valid. + # @raise [ Error::InvalidMaxConnecting ] If the option is invalid. + def validate_max_connecting!(option, opts) + if option == :max_connecting && opts.key?(:max_connecting) + max_connecting = opts[:max_connecting] || Server::ConnectionPool::DEFAULT_MAX_CONNECTING + if max_connecting <= 0 + raise Error::InvalidMaxConnecting.new(opts[:max_connecting]) + end + end + true + end + def validate_read!(option, opts) if option == :read && opts.has_key?(:read) read = opts[:read] diff --git a/lib/mongo/error.rb b/lib/mongo/error.rb index 883fbee9a5..36296735d7 100644 --- a/lib/mongo/error.rb +++ b/lib/mongo/error.rb @@ -159,6 +159,7 @@ def write_concern_error_labels require 'mongo/error/invalid_document' require 'mongo/error/invalid_file' require 'mongo/error/invalid_file_revision' +require 'mongo/error/invalid_max_connecting' require 'mongo/error/invalid_min_pool_size' require 'mongo/error/invalid_read_option' require 'mongo/error/invalid_application_name' diff --git a/lib/mongo/error/invalid_max_connecting.rb b/lib/mongo/error/invalid_max_connecting.rb new file mode 100644 index 0000000000..759d4c1732 --- /dev/null +++ b/lib/mongo/error/invalid_max_connecting.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +# Copyright (C) 2014-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Error + # Exception that is raised when trying to create a client with an invalid + # max_connecting option. + class InvalidMaxConnecting < Error + # Instantiate the new exception. + def initialize(max_connecting) + super("Invalid max_connecting: #{max_connecting}. Please ensure that it is greater than zero. ") + end + end + end +end diff --git a/lib/mongo/server/connection_pool.rb b/lib/mongo/server/connection_pool.rb index 543ac6abde..dfbc718522 100644 --- a/lib/mongo/server/connection_pool.rb +++ b/lib/mongo/server/connection_pool.rb @@ -29,12 +29,16 @@ class ConnectionPool # The default max size for the connection pool. # # @since 2.9.0 - DEFAULT_MAX_SIZE = 20.freeze + DEFAULT_MAX_SIZE = 20 # The default min size for the connection pool. # # @since 2.9.0 - DEFAULT_MIN_SIZE = 0.freeze + DEFAULT_MIN_SIZE = 0 + + # The default maximum number of connections that can be connecting at + # any given time. + DEFAULT_MAX_CONNECTING = 2 # The default timeout, in seconds, to wait for a connection. # @@ -61,6 +65,11 @@ class ConnectionPool # # @option options [ Integer ] :max_size The maximum pool size. Setting # this option to zero creates an unlimited connection pool. + # @option options [ Integer ] :max_connecting The maximum number of + # connections that can be connecting simultaneously. The default is 2. + # This option should be increased if there are many threads that share + # same connection pool and the application is experiencing timeouts + # while waiting for connections to be established. # @option options [ Integer ] :max_pool_size Deprecated. # The maximum pool size. If max_size is also given, max_size and # max_pool_size must be identical. @@ -89,6 +98,7 @@ class ConnectionPool # any connections created by the pool. # # @since 2.0.0, API changed in 2.9.0 + def initialize(server, options = {}) unless server.is_a?(Server) raise ArgumentError, 'First argument must be a Server instance' @@ -146,7 +156,7 @@ def initialize(server, options = {}) # Condition variable to enforce the first check in check_out: max_pool_size. # This condition variable should be signaled when the number of # unavailable connections decreases (pending + pending_connections + - # available_connections). + # checked_out_connections). @size_cv = Mongo::ConditionVariable.new(@lock) # This represents the number of threads that have made it past the size_cv # gate but have not acquired a connection to add to the pending_connections @@ -157,7 +167,7 @@ def initialize(server, options = {}) # Thei condition variable should be signaled when the number of pending # connections decreases. @max_connecting_cv = Mongo::ConditionVariable.new(@lock) - @max_connecting = options.fetch(:max_connecting, 2) + @max_connecting = options.fetch(:max_connecting, DEFAULT_MAX_CONNECTING) ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator)) @@ -1074,7 +1084,8 @@ def raise_check_out_timeout!(connection_global_id) "from pool for #{@server.address}#{connection_global_id_msg} after #{wait_timeout} sec. " + "Connections in pool: #{@available_connections.length} available, " + "#{@checked_out_connections.length} checked out, " + - "#{@pending_connections.length + @connection_requests} pending " + + "#{@pending_connections.length} pending, " + + "#{@connection_requests} connections requests " + "(max size: #{max_size})" raise Error::ConnectionCheckOutTimeout.new(msg, address: @server.address) end @@ -1170,7 +1181,6 @@ def valid_available_connection?(connection, pid, connection_global_id) # one. If no connection exists and the pool is at max size, wait until # a connection is checked back into the pool. # - # @param [ Float ] deadline The deadline to get the connection. # @param [ Integer ] pid The current process id. # @param [ Integer ] connection_global_id The global id for the # connection to check out. @@ -1180,17 +1190,22 @@ def valid_available_connection?(connection, pid, connection_global_id) # @raise [ Error::PoolClosedError ] If the pool has been closed. # @raise [ Timeout::Error ] If the connection pool is at maximum size # and remains so for longer than the wait timeout. - def get_connection(deadline, pid, connection_global_id) + def get_connection(pid, connection_global_id) if connection = next_available_connection(connection_global_id) unless valid_available_connection?(connection, pid, connection_global_id) return nil end + # We've got a connection, so we decrement the number of connection + # requests. + # We do not need to signal condition variable here, because + # because the execution will continue, and we signal later. + @connection_requests -= 1 + # If the connection is connected, it's not considered a # "pending connection". The pending_connections list represents # the set of connections that are awaiting connection. unless connection.connected? - @connection_requests -= 1 @pending_connections << connection end return connection @@ -1207,6 +1222,9 @@ def get_connection(deadline, pid, connection_global_id) Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR ), ) + # We're going to raise, so we need to decrement the number of + # connection requests. + decrement_connection_requests_and_signal raise Error::MissingConnection.new end end @@ -1237,7 +1255,6 @@ def retrieve_and_connect_connection(connection_global_id) connection = nil @lock.synchronize do - # The first gate to checking out a connection. Make sure the number of # unavailable connections is less than the max pool size. until max_size == 0 || unavailable_connections < max_size @@ -1247,60 +1264,97 @@ def retrieve_and_connect_connection(connection_global_id) raise_if_not_ready! end @connection_requests += 1 + connection = wait_for_connection(connection_global_id, deadline) + end - while connection.nil? - # The second gate to checking out a connection. Make sure 1) there - # exists an available connection and 2) we are under max_connecting. - until @available_connections.any? || @pending_connections.length < @max_connecting - wait = deadline - Utils.monotonic_time - raise_check_out_timeout!(connection_global_id) if wait <= 0 - @max_connecting_cv.wait(wait) - raise_if_not_ready! - end + connect_or_raise(connection) unless connection.connected? - connection = get_connection(deadline, Process.pid, connection_global_id) - wait = deadline - Utils.monotonic_time - raise_check_out_timeout!(connection_global_id) if connection.nil? && wait <= 0 + @lock.synchronize do + @checked_out_connections << connection + if @pending_connections.include?(connection) + @pending_connections.delete(connection) end + @max_connecting_cv.signal + # no need to signal size_cv here since the number of unavailable + # connections is unchanged. end - begin - connect_connection(connection) - rescue Exception - # Handshake or authentication failed - @lock.synchronize do - if @pending_connections.include?(connection) - @pending_connections.delete(connection) - else - @connection_requests -= 1 + connection + end + + # Waits for a connection to become available, or raises is no connection + # becomes available before the timeout. + # @param [ Integer ] connection_global_id The global id for the + # connection to check out. + # @param [ Float ] deadline The time at which to stop waiting. + # + # @return [ Mongo::Server::Connection ] The checked out connection. + def wait_for_connection(connection_global_id, deadline) + connection = nil + while connection.nil? + # The second gate to checking out a connection. Make sure 1) there + # exists an available connection and 2) we are under max_connecting. + until @available_connections.any? || @pending_connections.length < @max_connecting + wait = deadline - Utils.monotonic_time + if wait <= 0 + # We are going to raise a timeout error, so the connection + # request is not going to be fulfilled. Decrement the counter + # here. + decrement_connection_requests_and_signal + raise_check_out_timeout!(connection_global_id) end - @max_connecting_cv.signal - @size_cv.signal + @max_connecting_cv.wait(wait) + # We do not need to decrement the connection_requests counter + # or signal here because the pool is not ready yet. + raise_if_not_ready! end - @populate_semaphore.signal - publish_cmap_event( - Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( - @server.address, - Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR - ), - ) - raise + connection = get_connection(Process.pid, connection_global_id) + wait = deadline - Utils.monotonic_time + if connection.nil? && wait <= 0 + # connection is nil here, it means that get_connection method + # did not create a new connection; therefore, it did not decrease + # the connection_requests counter. We need to do it here. + decrement_connection_requests_and_signal + raise_check_out_timeout!(connection_global_id) + end end + connection + end + + # Connects a connection and raises an exception if the connection + # cannot be connected. + # This method also publish corresponding event and ensures that counters + # and condition variables are updated. + def connect_or_raise(connection) + connect_connection(connection) + rescue Exception + # Handshake or authentication failed @lock.synchronize do - @checked_out_connections << connection if @pending_connections.include?(connection) @pending_connections.delete(connection) - else - @connection_requests -= 1 end @max_connecting_cv.signal - # no need to signal size_cv here since the number of unavailable - # connections is unchanged. + @size_cv.signal end + @populate_semaphore.signal + publish_cmap_event( + Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( + @server.address, + Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR + ), + ) + raise + end - connection + + # Decrement connection requests counter and signal the condition + # variables that the number of unavailable connections has decreased. + def decrement_connection_requests_and_signal + @connection_requests -= 1 + @max_connecting_cv.signal + @size_cv.signal end end end diff --git a/lib/mongo/uri/options_mapper.rb b/lib/mongo/uri/options_mapper.rb index b32149f84b..bc7e8f16dc 100644 --- a/lib/mongo/uri/options_mapper.rb +++ b/lib/mongo/uri/options_mapper.rb @@ -285,6 +285,7 @@ def self.uri_option(uri_key, name, **extra) uri_option 'maxStalenessSeconds', :max_staleness, group: :read, type: :max_staleness # Pool options + uri_option 'maxConnecting', :max_connecting, type: :integer uri_option 'minPoolSize', :min_pool_size, type: :integer uri_option 'maxPoolSize', :max_pool_size, type: :integer uri_option 'waitQueueTimeoutMS', :wait_queue_timeout, type: :ms diff --git a/lib/mongo/version.rb b/lib/mongo/version.rb index 80f916063a..eb0246b5af 100644 --- a/lib/mongo/version.rb +++ b/lib/mongo/version.rb @@ -20,5 +20,5 @@ module Mongo # The current version of the driver. # # @since 2.0.0 - VERSION = '2.19.0'.freeze + VERSION = '2.19.1'.freeze end diff --git a/spec/lite_spec_helper.rb b/spec/lite_spec_helper.rb index 0394b97f3b..7bd6675a70 100644 --- a/spec/lite_spec_helper.rb +++ b/spec/lite_spec_helper.rb @@ -16,7 +16,11 @@ TRANSACTIONS_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/transactions/*.yml").sort TRANSACTIONS_API_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/transactions_api/*.yml").sort CHANGE_STREAMS_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/change_streams/*.yml").sort -CMAP_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/cmap/*.yml").sort +CMAP_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/cmap/*.yml").sort.select do |f| + # Skip tests that are flaky on JRuby. + # https://jira.mongodb.org/browse/RUBY-3292 + !defined?(JRUBY_VERSION) || !f.include?('pool-checkout-minPoolSize-connection-maxConnecting.yml') +end AUTH_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/auth/*.yml").sort CLIENT_SIDE_ENCRYPTION_TESTS = Dir.glob("#{CURRENT_PATH}/spec_tests/data/client_side_encryption/*.yml").sort diff --git a/spec/mongo/client_construction_spec.rb b/spec/mongo/client_construction_spec.rb index 0ea5715e8c..442c0480b3 100644 --- a/spec/mongo/client_construction_spec.rb +++ b/spec/mongo/client_construction_spec.rb @@ -797,6 +797,32 @@ end end + context 'when max_connecting is provided' do + let(:client) do + new_local_client_nmio(SINGLE_CLIENT, options) + end + + context 'when max_connecting is a positive integer' do + let(:options) do + { max_connecting: 5 } + end + + it 'sets the max connecting' do + expect(client.options[:max_connecting]).to eq(options[:max_connecting]) + end + end + + context 'when max_connecting is a negative integer' do + let(:options) do + { max_connecting: -5 } + end + + it 'raises an exception' do + expect { client }.to raise_error(Mongo::Error::InvalidMaxConnecting) + end + end + end + context 'when min_pool_size is provided' do let(:client) { new_local_client_nmio(SINGLE_CLIENT, options) } @@ -998,6 +1024,28 @@ expect(client.options).to eq(expected_options) end + context 'when max_connecting is provided' do + context 'when max_connecting is a positive integer' do + let(:uri) do + 'mongodb://127.0.0.1:27017/?maxConnecting=10' + end + + it 'sets the max connecting' do + expect(client.options[:max_connecting]).to eq(10) + end + end + + context 'when max_connecting is a negative integer' do + let(:uri) do + 'mongodb://127.0.0.1:27017/?maxConnecting=0' + end + + it 'raises an exception' do + expect { client }.to raise_error(Mongo::Error::InvalidMaxConnecting) + end + end + end + context 'when min_pool_size is provided' do context 'when max_pool_size is provided' do context 'when the min_pool_size is greater than the max_pool_size' do diff --git a/spec/mongo/uri/srv_protocol_spec.rb b/spec/mongo/uri/srv_protocol_spec.rb index af21a43161..3efc41f910 100644 --- a/spec/mongo/uri/srv_protocol_spec.rb +++ b/spec/mongo/uri/srv_protocol_spec.rb @@ -1018,6 +1018,18 @@ include_examples "roundtrips string" end + context 'when providing maxConnecting' do + + let(:max_connecting) { 10 } + let(:options) { "maxConnecting=#{max_connecting}" } + + it 'sets the max connecting option' do + expect(uri.uri_options[:max_connecting]).to eq(max_connecting) + end + + include_examples "roundtrips string" + end + context 'when providing maxPoolSize' do let(:max_pool_size) { 10 } diff --git a/spec/spec_tests/data/cmap/pool-checkout-minPoolSize-connection-maxConnecting.yml b/spec/spec_tests/data/cmap/pool-checkout-minPoolSize-connection-maxConnecting.yml index 9c7c4cd43b..11acf17e76 100644 --- a/spec/spec_tests/data/cmap/pool-checkout-minPoolSize-connection-maxConnecting.yml +++ b/spec/spec_tests/data/cmap/pool-checkout-minPoolSize-connection-maxConnecting.yml @@ -1,7 +1,6 @@ version: 1 style: integration description: threads blocked by maxConnecting check out minPoolSize connections -# Remove the topology runOn requirement when cmap specs are adjusted for lbs runOn: - # required for blockConnection in fail point @@ -14,7 +13,7 @@ failPoint: failCommands: ["isMaster","hello"] closeConnection: false blockConnection: true - blockTimeMS: 500 + blockTimeMS: 1000 poolOptions: # allows both thread1 and the background thread to start opening connections concurrently minPoolSize: 2 @@ -35,9 +34,8 @@ operations: # - it is long enough to make sure that the background thread opens a connection before thread1 releases its permit; # - it is short enough to allow thread2 to become blocked acquiring a permit to open a connection, and then grab the connection # opened by the background thread, before the background thread releases its permit. - # This wait is not necessary in ruby since the background thread runs immediately. - name: wait - ms: 0 + ms: 200 - name: checkOut thread: thread1 - name: waitForEvent