diff --git a/lib/mongo/retryable.rb b/lib/mongo/retryable.rb index c6330bf486..2508e13efd 100644 --- a/lib/mongo/retryable.rb +++ b/lib/mongo/retryable.rb @@ -46,8 +46,8 @@ module Retryable # @api private # # @return [ Mongo::Server ] A server matching the server preference. - def select_server(cluster, server_selector, session) - server_selector.select_server(cluster, nil, session) + def select_server(cluster, server_selector, session, failed_server = nil) + server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact) end # Returns the read worker for handling retryable reads. diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index 763fa0f75b..a82e67a051 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -190,12 +190,13 @@ def deprecated_legacy_read_with_retry(&block) # # @return [ Result ] The result of the operation. def modern_read_with_retry(session, server_selector, &block) - yield select_server(cluster, server_selector, session) + server = select_server(cluster, server_selector, session) + yield server rescue *retryable_exceptions, Error::OperationFailure, Auth::Unauthorized, Error::PoolError => e e.add_notes('modern retry', 'attempt 1') raise e if session.in_transaction? raise e if !is_retryable_exception?(e) && !e.write_retryable? - retry_read(e, session, server_selector, &block) + retry_read(e, session, server_selector, failed_server: server, &block) end # Attempts to do a "legacy" read with retry. The operation will be @@ -257,12 +258,14 @@ def read_without_retry(session, server_selector, &block) # being run on. # @param [ Mongo::ServerSelector::Selectable ] server_selector Server # selector for the operation. + # @param [ Mongo::Server ] failed_server The server on which the original + # operation failed. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def retry_read(original_error, session, server_selector, &block) + def retry_read(original_error, session, server_selector, failed_server: nil, &block) begin - server = select_server(cluster, server_selector, session) + server = select_server(cluster, server_selector, session, failed_server) rescue Error, Error::AuthError => e original_error.add_note("later retry failed: #{e.class}: #{e}") raise original_error @@ -289,8 +292,6 @@ def retry_read(original_error, session, server_selector, &block) raise original_error end end - end - end end diff --git a/lib/mongo/retryable/write_worker.rb b/lib/mongo/retryable/write_worker.rb index c21265ed56..303ff3af06 100644 --- a/lib/mongo/retryable/write_worker.rb +++ b/lib/mongo/retryable/write_worker.rb @@ -240,7 +240,7 @@ def modern_write_with_retry(session, server, context, &block) # Context#with creates a new context, which is not necessary here # but the API is less prone to misuse this way. - retry_write(e, txn_num, context: context.with(is_retry: true), &block) + retry_write(e, txn_num, context: context.with(is_retry: true), failed_server: server, &block) end # Called after a failed write, this will retry the write no more than @@ -250,9 +250,11 @@ def modern_write_with_retry(session, server, context, &block) # retry. # @param [ Number ] txn_num The transaction number. # @param [ Operation::Context ] context The context for the operation. + # @param [ Mongo::Server ] failed_server The server on which the original + # operation failed. # # @return [ Result ] The result of the operation. - def retry_write(original_error, txn_num, context:, &block) + def retry_write(original_error, txn_num, context:, failed_server: nil, &block) session = context.session # We do not request a scan of the cluster here, because error handling @@ -260,7 +262,7 @@ def retry_write(original_error, txn_num, context:, &block) # server description and/or topology as necessary (specifically, # a socket error or a not master error should have marked the respective # server unknown). Here we just need to wait for server selection. - server = select_server(cluster, ServerSelector.primary, session) + server = select_server(cluster, ServerSelector.primary, session, failed_server) unless server.retry_writes? # Do not need to add "modern retry" here, it should already be on diff --git a/lib/mongo/server_selector/base.rb b/lib/mongo/server_selector/base.rb index b883785129..10eb478449 100644 --- a/lib/mongo/server_selector/base.rb +++ b/lib/mongo/server_selector/base.rb @@ -164,6 +164,10 @@ def ==(other) # for mongos pinning. Added in version 2.10.0. # @param [ true | false ] write_aggregation Whether we need a server that # supports writing aggregations (e.g. with $merge/$out) on secondaries. + # @param [ Array ] deprioritized A list of servers that should + # be selected from only if no other servers are available. This is + # used to avoid selecting the same server twice in a row when + # retrying a command. # # @return [ Mongo::Server ] A server matching the server preference. # @@ -174,8 +178,8 @@ def ==(other) # lint mode is enabled. # # @since 2.0.0 - def select_server(cluster, ping = nil, session = nil, write_aggregation: false) - select_server_impl(cluster, ping, session, write_aggregation).tap do |server| + def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: []) + select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server| if Lint.enabled? && !server.pool.ready? raise Error::LintError, 'Server selector returning a server with a pool which is not ready' end @@ -183,7 +187,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false) end # Parameters and return values are the same as for select_server. - private def select_server_impl(cluster, ping, session, write_aggregation) + private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized) if cluster.topology.is_a?(Cluster::Topology::LoadBalanced) return cluster.servers.first end @@ -266,7 +270,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false) end end - server = try_select_server(cluster, write_aggregation: write_aggregation) + server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized) if server unless cluster.topology.compatible? @@ -321,11 +325,15 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false) # an eligible server. # @param [ true | false ] write_aggregation Whether we need a server that # supports writing aggregations (e.g. with $merge/$out) on secondaries. + # @param [ Array ] deprioritized A list of servers that should + # be selected from only if no other servers are available. This is + # used to avoid selecting the same server twice in a row when + # retrying a command. # # @return [ Server | nil ] A suitable server, if one exists. # # @api private - def try_select_server(cluster, write_aggregation: false) + def try_select_server(cluster, write_aggregation: false, deprioritized: []) servers = if write_aggregation && cluster.replica_set? # 1. Check if ALL servers in cluster support secondary writes. is_write_supported = cluster.servers.reduce(true) do |res, server| @@ -347,7 +355,7 @@ def try_select_server(cluster, write_aggregation: false) # by the selector (e.g. for secondary preferred, the first # server may be a secondary and the second server may be primary) # and we should take the first server here respecting the order - server = servers.first + server = suitable_server(servers, deprioritized) if server if Lint.enabled? @@ -418,6 +426,24 @@ def suitable_servers(cluster) private + # Returns a server from the list of servers that is suitable for + # executing the operation. + # + # @param [ Array ] servers The candidate servers. + # @param [ Array ] deprioritized A list of servers that should + # be selected from only if no other servers are available. + # + # @return [ Server | nil ] The suitable server or nil if no suitable + # server is available. + def suitable_server(servers, deprioritized) + preferred = servers - deprioritized + if preferred.empty? + servers.first + else + preferred.first + end + end + # Convert this server preference definition into a format appropriate # for sending to a MongoDB server (i.e., as a command field). # diff --git a/spec/integration/retryable_reads_errors_spec.rb b/spec/integration/retryable_reads_errors_spec.rb index 4d662c4bf5..81402c8f4d 100644 --- a/spec/integration/retryable_reads_errors_spec.rb +++ b/spec/integration/retryable_reads_errors_spec.rb @@ -20,14 +20,14 @@ let(:failpoint) do { - configureFailPoint: "failCommand", - mode: { times: 1 }, - data: { - failCommands: [ "find" ], - errorCode: 91, - blockConnection: true, - blockTimeMS: 1000 - } + configureFailPoint: "failCommand", + mode: { times: 1 }, + data: { + failCommands: [ "find" ], + errorCode: 91, + blockConnection: true, + blockTimeMS: 1000 + } } end @@ -107,4 +107,157 @@ }) end end + + context 'Retries in a sharded cluster' do + require_topology :sharded + min_server_version '4.2' + require_no_auth + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:find_started_events) do + subscriber.started_events.select { |e| e.command_name == "find" } + end + + let(:find_failed_events) do + subscriber.failed_events.select { |e| e.command_name == "find" } + end + + let(:find_succeeded_events) do + subscriber.succeeded_events.select { |e| e.command_name == "find" } + end + + context 'when another mongos is available' do + + let(:first_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:second_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.last], + direct_connection: false, + database: 'admin' + ) + end + + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first, + SpecConfig.instance.addresses.last, + ], + SpecConfig.instance.test_options.merge(retry_reads: true) + ) + end + + let(:expected_servers) do + [ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.last.to_s + ].sort + end + + before do + skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + + first_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(find), + closeConnection: false, + errorCode: 6 + } + ) + + second_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(find), + closeConnection: false, + errorCode: 6 + } + ) + end + + after do + [first_mongos, second_mongos].each do |admin_client| + admin_client.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + admin_client.close + end + client.close + end + + it 'retries on different mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.find.first }.to raise_error(Mongo::Error::OperationFailure) + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + end + end + + context 'when no other mongos is available' do + let(:mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first + ], + SpecConfig.instance.test_options.merge(retry_reads: true) + ) + end + + before do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(find), + closeConnection: false, + errorCode: 6 + } + ) + end + + after do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + mongos.close + client.close + end + + it 'retries on the same mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.find.first }.not_to raise_error + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.first.to_s + ]) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + expect(find_succeeded_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + end + end + end end diff --git a/spec/integration/retryable_writes_errors_spec.rb b/spec/integration/retryable_writes_errors_spec.rb index 2769089b11..25b5cba8aa 100644 --- a/spec/integration/retryable_writes_errors_spec.rb +++ b/spec/integration/retryable_writes_errors_spec.rb @@ -189,4 +189,160 @@ }) end end + + context 'Retries in a sharded cluster' do + require_topology :sharded + min_server_version '4.2' + require_no_auth + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:insert_started_events) do + subscriber.started_events.select { |e| e.command_name == "insert" } + end + + let(:insert_failed_events) do + subscriber.failed_events.select { |e| e.command_name == "insert" } + end + + let(:insert_succeeded_events) do + subscriber.succeeded_events.select { |e| e.command_name == "insert" } + end + + context 'when another mongos is available' do + + let(:first_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:second_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.last], + direct_connection: false, + database: 'admin' + ) + end + + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first, + SpecConfig.instance.addresses.last, + ], + SpecConfig.instance.test_options.merge(retry_writes: true) + ) + end + + let(:expected_servers) do + [ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.last.to_s + ].sort + end + + before do + skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + + first_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(insert), + closeConnection: false, + errorCode: 6, + errorLabels: ['RetryableWriteError'] + } + ) + + second_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(insert), + closeConnection: false, + errorCode: 6, + errorLabels: ['RetryableWriteError'] + } + ) + end + + after do + [first_mongos, second_mongos].each do |admin_client| + admin_client.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + admin_client.close + end + client.close + end + + it 'retries on different mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.insert_one(x: 1) }.to raise_error(Mongo::Error::OperationFailure) + expect(insert_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + expect(insert_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + end + end + + context 'when no other mongos is available' do + let(:mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first + ], + SpecConfig.instance.test_options.merge(retry_writes: true) + ) + end + + before do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(insert), + closeConnection: false, + errorCode: 6, + errorLabels: ['RetryableWriteError'] + } + ) + end + + after do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + mongos.close + client.close + end + + it 'retries on the same mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.insert_one(x: 1) }.not_to raise_error + expect(insert_started_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.first.to_s + ]) + expect(insert_failed_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + expect(insert_succeeded_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + end + end + end end