From 422d1388a4bc9120e6f701fa563c41e49d7c861e Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 19 May 2023 11:05:17 +0200 Subject: [PATCH 1/6] First implementation attempt --- lib/mongo/retryable.rb | 4 +- lib/mongo/retryable/read_worker.rb | 9 +-- lib/mongo/server_selector/base.rb | 21 ++++-- .../retryable_reads_errors_spec.rb | 70 +++++++++++++++++++ 4 files changed, 92 insertions(+), 12 deletions(-) diff --git a/lib/mongo/retryable.rb b/lib/mongo/retryable.rb index c6330bf486..2508e13efd 100644 --- a/lib/mongo/retryable.rb +++ b/lib/mongo/retryable.rb @@ -46,8 +46,8 @@ module Retryable # @api private # # @return [ Mongo::Server ] A server matching the server preference. - def select_server(cluster, server_selector, session) - server_selector.select_server(cluster, nil, session) + def select_server(cluster, server_selector, session, failed_server = nil) + server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact) end # Returns the read worker for handling retryable reads. diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index 763fa0f75b..2686921400 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -190,12 +190,13 @@ def deprecated_legacy_read_with_retry(&block) # # @return [ Result ] The result of the operation. def modern_read_with_retry(session, server_selector, &block) - yield select_server(cluster, server_selector, session) + server = select_server(cluster, server_selector, session) + yield server rescue *retryable_exceptions, Error::OperationFailure, Auth::Unauthorized, Error::PoolError => e e.add_notes('modern retry', 'attempt 1') raise e if session.in_transaction? raise e if !is_retryable_exception?(e) && !e.write_retryable? - retry_read(e, session, server_selector, &block) + retry_read(e, session, server_selector, server, &block) end # Attempts to do a "legacy" read with retry. The operation will be @@ -260,9 +261,9 @@ def read_without_retry(session, server_selector, &block) # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def retry_read(original_error, session, server_selector, &block) + def retry_read(original_error, session, server_selector, failed_server, &block) begin - server = select_server(cluster, server_selector, session) + server = select_server(cluster, server_selector, session, failed_server) rescue Error, Error::AuthError => e original_error.add_note("later retry failed: #{e.class}: #{e}") raise original_error diff --git a/lib/mongo/server_selector/base.rb b/lib/mongo/server_selector/base.rb index b883785129..62b2127d82 100644 --- a/lib/mongo/server_selector/base.rb +++ b/lib/mongo/server_selector/base.rb @@ -174,8 +174,8 @@ def ==(other) # lint mode is enabled. # # @since 2.0.0 - def select_server(cluster, ping = nil, session = nil, write_aggregation: false) - select_server_impl(cluster, ping, session, write_aggregation).tap do |server| + def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: []) + select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server| if Lint.enabled? && !server.pool.ready? raise Error::LintError, 'Server selector returning a server with a pool which is not ready' end @@ -183,7 +183,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false) end # Parameters and return values are the same as for select_server. - private def select_server_impl(cluster, ping, session, write_aggregation) + private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized) if cluster.topology.is_a?(Cluster::Topology::LoadBalanced) return cluster.servers.first end @@ -266,7 +266,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false) end end - server = try_select_server(cluster, write_aggregation: write_aggregation) + server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized) if server unless cluster.topology.compatible? @@ -325,7 +325,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false) # @return [ Server | nil ] A suitable server, if one exists. # # @api private - def try_select_server(cluster, write_aggregation: false) + def try_select_server(cluster, write_aggregation: false, deprioritized: []) servers = if write_aggregation && cluster.replica_set? # 1. Check if ALL servers in cluster support secondary writes. is_write_supported = cluster.servers.reduce(true) do |res, server| @@ -347,7 +347,7 @@ def try_select_server(cluster, write_aggregation: false) # by the selector (e.g. for secondary preferred, the first # server may be a secondary and the second server may be primary) # and we should take the first server here respecting the order - server = servers.first + server = suitable_server(servers, deprioritized) if server if Lint.enabled? @@ -418,6 +418,15 @@ def suitable_servers(cluster) private + def suitable_server(servers, deprioritized) + preferred = servers - deprioritized + if preferred.empty? + servers.first + else + preferred.first + end + end + # Convert this server preference definition into a format appropriate # for sending to a MongoDB server (i.e., as a command field). # diff --git a/spec/integration/retryable_reads_errors_spec.rb b/spec/integration/retryable_reads_errors_spec.rb index 4d662c4bf5..843e939df0 100644 --- a/spec/integration/retryable_reads_errors_spec.rb +++ b/spec/integration/retryable_reads_errors_spec.rb @@ -107,4 +107,74 @@ }) end end + + context 'retry on different mongos' do + require_topology :sharded + + let(:first_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:second_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.last], + direct_connection: false, + database: 'admin' + ) + end + + let(:client) { authorized_client.with(retry_reads: true) } + + before do + skip 'This test requires two mongos' unless SpecConfig.instance.addresses.length == 2 + + first_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(find), + closeConnection: false, + errorCode: 11600, + errorLabels: ['RetryableWriteError'] + }, + ) + + second_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(find), + closeConnection: false, + errorCode: 11600, + errorLabels: ['RetryableWriteError'] + }, + ) + end + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:find_started_events) do + subscriber.started_events.select { |e| e.command_name == "find" } + end + + let(:find_failed_events) do + subscriber.failed_events.select { |e| e.command_name == "find" } + end + + after do + first_mongos.close + second_mongos.close + end + + it 'retries on different mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.find.first }.to raise_error + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(SpecConfig.instance.addresses.map { |a| a.to_s }.sort) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(SpecConfig.instance.addresses.map { |a| a.to_s }.sort) + end + end end From 7ac9d0ba473d2ae4a944709e1f5f3c21d20bf9d9 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Mon, 22 May 2023 17:13:20 +0200 Subject: [PATCH 2/6] Adjust spec requirement --- spec/integration/retryable_reads_errors_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/integration/retryable_reads_errors_spec.rb b/spec/integration/retryable_reads_errors_spec.rb index 843e939df0..91aa31fec4 100644 --- a/spec/integration/retryable_reads_errors_spec.rb +++ b/spec/integration/retryable_reads_errors_spec.rb @@ -110,6 +110,8 @@ context 'retry on different mongos' do require_topology :sharded + min_server_version '4.2' + require_no_auth let(:first_mongos) do Mongo::Client.new( From 5ff0f85407ddd43247ac58c6a45e4e6367c47759 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 26 May 2023 15:05:19 +0200 Subject: [PATCH 3/6] Improve specs --- .../retryable_reads_errors_spec.rb | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/spec/integration/retryable_reads_errors_spec.rb b/spec/integration/retryable_reads_errors_spec.rb index 91aa31fec4..c6a657cfa0 100644 --- a/spec/integration/retryable_reads_errors_spec.rb +++ b/spec/integration/retryable_reads_errors_spec.rb @@ -132,7 +132,7 @@ let(:client) { authorized_client.with(retry_reads: true) } before do - skip 'This test requires two mongos' unless SpecConfig.instance.addresses.length == 2 + skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 first_mongos.database.command( configureFailPoint: 'failCommand', @@ -140,9 +140,8 @@ data: { failCommands: %w(find), closeConnection: false, - errorCode: 11600, - errorLabels: ['RetryableWriteError'] - }, + errorCode: 11600 + } ) second_mongos.database.command( @@ -151,9 +150,8 @@ data: { failCommands: %w(find), closeConnection: false, - errorCode: 11600, - errorLabels: ['RetryableWriteError'] - }, + errorCode: 11600 + } ) end @@ -167,6 +165,10 @@ subscriber.failed_events.select { |e| e.command_name == "find" } end + let(:expected_servers) do + SpecConfig.instance.addresses.map { |a| a.to_s }.sort + end + after do first_mongos.close second_mongos.close @@ -175,8 +177,8 @@ it 'retries on different mongos' do client.subscribe(Mongo::Monitoring::COMMAND, subscriber) expect { collection.find.first }.to raise_error - expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(SpecConfig.instance.addresses.map { |a| a.to_s }.sort) - expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(SpecConfig.instance.addresses.map { |a| a.to_s }.sort) + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) end end end From 86c85daaac93d85b689a5e83439709b2b8efd9da Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 26 May 2023 15:56:02 +0200 Subject: [PATCH 4/6] Add write spec --- lib/mongo/retryable/read_worker.rb | 4 +- lib/mongo/retryable/write_worker.rb | 6 +- .../retryable_writes_errors_spec.rb | 76 +++++++++++++++++++ 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index 2686921400..2a54b70013 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -196,7 +196,7 @@ def modern_read_with_retry(session, server_selector, &block) e.add_notes('modern retry', 'attempt 1') raise e if session.in_transaction? raise e if !is_retryable_exception?(e) && !e.write_retryable? - retry_read(e, session, server_selector, server, &block) + retry_read(e, session, server_selector, failed_server: server, &block) end # Attempts to do a "legacy" read with retry. The operation will be @@ -261,7 +261,7 @@ def read_without_retry(session, server_selector, &block) # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def retry_read(original_error, session, server_selector, failed_server, &block) + def retry_read(original_error, session, server_selector, failed_server: nil, &block) begin server = select_server(cluster, server_selector, session, failed_server) rescue Error, Error::AuthError => e diff --git a/lib/mongo/retryable/write_worker.rb b/lib/mongo/retryable/write_worker.rb index c21265ed56..c75f002657 100644 --- a/lib/mongo/retryable/write_worker.rb +++ b/lib/mongo/retryable/write_worker.rb @@ -240,7 +240,7 @@ def modern_write_with_retry(session, server, context, &block) # Context#with creates a new context, which is not necessary here # but the API is less prone to misuse this way. - retry_write(e, txn_num, context: context.with(is_retry: true), &block) + retry_write(e, txn_num, context: context.with(is_retry: true), failed_server: server, &block) end # Called after a failed write, this will retry the write no more than @@ -252,7 +252,7 @@ def modern_write_with_retry(session, server, context, &block) # @param [ Operation::Context ] context The context for the operation. # # @return [ Result ] The result of the operation. - def retry_write(original_error, txn_num, context:, &block) + def retry_write(original_error, txn_num, context:, failed_server: nil, &block) session = context.session # We do not request a scan of the cluster here, because error handling @@ -260,7 +260,7 @@ def retry_write(original_error, txn_num, context:, &block) # server description and/or topology as necessary (specifically, # a socket error or a not master error should have marked the respective # server unknown). Here we just need to wait for server selection. - server = select_server(cluster, ServerSelector.primary, session) + server = select_server(cluster, ServerSelector.primary, session, failed_server) unless server.retry_writes? # Do not need to add "modern retry" here, it should already be on diff --git a/spec/integration/retryable_writes_errors_spec.rb b/spec/integration/retryable_writes_errors_spec.rb index 1c149e080e..6e8adee3d9 100644 --- a/spec/integration/retryable_writes_errors_spec.rb +++ b/spec/integration/retryable_writes_errors_spec.rb @@ -189,4 +189,80 @@ }) end end + + context 'retry on different mongos' do + require_topology :sharded + min_server_version '4.2' + require_no_auth + + let(:first_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:second_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.last], + direct_connection: false, + database: 'admin' + ) + end + + let(:client) { authorized_client.with(retry_reads: true) } + + before do + skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + + first_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(insert), + closeConnection: false, + errorCode: 11600, + errorLabels: ['RetryableWriteError'] + } + ) + + second_mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(insert), + closeConnection: false, + errorCode: 11600, + errorLabels: ['RetryableWriteError'] + } + ) + end + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:find_started_events) do + subscriber.started_events.select { |e| e.command_name == "insert" } + end + + let(:find_failed_events) do + subscriber.failed_events.select { |e| e.command_name == "insert" } + end + + let(:expected_servers) do + SpecConfig.instance.addresses.map { |a| a.to_s }.sort + end + + after do + first_mongos.close + second_mongos.close + end + + it 'retries on different mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.insert_one(a: 1) }.to raise_error + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + end + end end From 138296f8eb63b622e9bc69af364d027340d6095c Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 3 Aug 2023 21:14:34 +0200 Subject: [PATCH 5/6] Add documenting comments --- lib/mongo/retryable/read_worker.rb | 2 ++ lib/mongo/retryable/write_worker.rb | 2 ++ lib/mongo/server_selector/base.rb | 17 +++++++++++++++++ 3 files changed, 21 insertions(+) diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index 2a54b70013..31add12170 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -258,6 +258,8 @@ def read_without_retry(session, server_selector, &block) # being run on. # @param [ Mongo::ServerSelector::Selectable ] server_selector Server # selector for the operation. + # @param [ Mongo::Server ] failed_server The server on which the original + # operation failed. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. diff --git a/lib/mongo/retryable/write_worker.rb b/lib/mongo/retryable/write_worker.rb index c75f002657..303ff3af06 100644 --- a/lib/mongo/retryable/write_worker.rb +++ b/lib/mongo/retryable/write_worker.rb @@ -250,6 +250,8 @@ def modern_write_with_retry(session, server, context, &block) # retry. # @param [ Number ] txn_num The transaction number. # @param [ Operation::Context ] context The context for the operation. + # @param [ Mongo::Server ] failed_server The server on which the original + # operation failed. # # @return [ Result ] The result of the operation. def retry_write(original_error, txn_num, context:, failed_server: nil, &block) diff --git a/lib/mongo/server_selector/base.rb b/lib/mongo/server_selector/base.rb index 62b2127d82..10eb478449 100644 --- a/lib/mongo/server_selector/base.rb +++ b/lib/mongo/server_selector/base.rb @@ -164,6 +164,10 @@ def ==(other) # for mongos pinning. Added in version 2.10.0. # @param [ true | false ] write_aggregation Whether we need a server that # supports writing aggregations (e.g. with $merge/$out) on secondaries. + # @param [ Array ] deprioritized A list of servers that should + # be selected from only if no other servers are available. This is + # used to avoid selecting the same server twice in a row when + # retrying a command. # # @return [ Mongo::Server ] A server matching the server preference. # @@ -321,6 +325,10 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false, # an eligible server. # @param [ true | false ] write_aggregation Whether we need a server that # supports writing aggregations (e.g. with $merge/$out) on secondaries. + # @param [ Array ] deprioritized A list of servers that should + # be selected from only if no other servers are available. This is + # used to avoid selecting the same server twice in a row when + # retrying a command. # # @return [ Server | nil ] A suitable server, if one exists. # @@ -418,6 +426,15 @@ def suitable_servers(cluster) private + # Returns a server from the list of servers that is suitable for + # executing the operation. + # + # @param [ Array ] servers The candidate servers. + # @param [ Array ] deprioritized A list of servers that should + # be selected from only if no other servers are available. + # + # @return [ Server | nil ] The suitable server or nil if no suitable + # server is available. def suitable_server(servers, deprioritized) preferred = servers - deprioritized if preferred.empty? From bd4629d585ec748406ca83b2e2cd4abe87b47bfe Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 18 Aug 2023 15:17:57 +0200 Subject: [PATCH 6/6] Use proper error codes in prose tests --- lib/mongo/retryable/read_worker.rb | 2 - .../retryable_reads_errors_spec.rb | 181 +++++++++++++----- .../retryable_writes_errors_spec.rb | 166 +++++++++++----- 3 files changed, 253 insertions(+), 96 deletions(-) diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index 31add12170..a82e67a051 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -292,8 +292,6 @@ def retry_read(original_error, session, server_selector, failed_server: nil, &bl raise original_error end end - end - end end diff --git a/spec/integration/retryable_reads_errors_spec.rb b/spec/integration/retryable_reads_errors_spec.rb index c6a657cfa0..81402c8f4d 100644 --- a/spec/integration/retryable_reads_errors_spec.rb +++ b/spec/integration/retryable_reads_errors_spec.rb @@ -20,14 +20,14 @@ let(:failpoint) do { - configureFailPoint: "failCommand", - mode: { times: 1 }, - data: { - failCommands: [ "find" ], - errorCode: 91, - blockConnection: true, - blockTimeMS: 1000 - } + configureFailPoint: "failCommand", + mode: { times: 1 }, + data: { + failCommands: [ "find" ], + errorCode: 91, + blockConnection: true, + blockTimeMS: 1000 + } } end @@ -108,77 +108,156 @@ end end - context 'retry on different mongos' do + context 'Retries in a sharded cluster' do require_topology :sharded min_server_version '4.2' require_no_auth - let(:first_mongos) do - Mongo::Client.new( - [SpecConfig.instance.addresses.first], - direct_connection: true, - database: 'admin' - ) + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:find_started_events) do + subscriber.started_events.select { |e| e.command_name == "find" } end - let(:second_mongos) do - Mongo::Client.new( - [SpecConfig.instance.addresses.last], - direct_connection: false, - database: 'admin' - ) + let(:find_failed_events) do + subscriber.failed_events.select { |e| e.command_name == "find" } end - let(:client) { authorized_client.with(retry_reads: true) } + let(:find_succeeded_events) do + subscriber.succeeded_events.select { |e| e.command_name == "find" } + end - before do - skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + context 'when another mongos is available' do - first_mongos.database.command( - configureFailPoint: 'failCommand', + let(:first_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:second_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.last], + direct_connection: false, + database: 'admin' + ) + end + + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first, + SpecConfig.instance.addresses.last, + ], + SpecConfig.instance.test_options.merge(retry_reads: true) + ) + end + + let(:expected_servers) do + [ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.last.to_s + ].sort + end + + before do + skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + + first_mongos.database.command( + configureFailPoint: 'failCommand', mode: { times: 1 }, data: { failCommands: %w(find), closeConnection: false, - errorCode: 11600 + errorCode: 6 } - ) + ) - second_mongos.database.command( - configureFailPoint: 'failCommand', + second_mongos.database.command( + configureFailPoint: 'failCommand', mode: { times: 1 }, data: { failCommands: %w(find), closeConnection: false, - errorCode: 11600 + errorCode: 6 } - ) - end + ) + end - let(:subscriber) { Mrss::EventSubscriber.new } + after do + [first_mongos, second_mongos].each do |admin_client| + admin_client.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + admin_client.close + end + client.close + end - let(:find_started_events) do - subscriber.started_events.select { |e| e.command_name == "find" } + it 'retries on different mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.find.first }.to raise_error(Mongo::Error::OperationFailure) + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + end end - let(:find_failed_events) do - subscriber.failed_events.select { |e| e.command_name == "find" } - end + context 'when no other mongos is available' do + let(:mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end - let(:expected_servers) do - SpecConfig.instance.addresses.map { |a| a.to_s }.sort - end + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first + ], + SpecConfig.instance.test_options.merge(retry_reads: true) + ) + end - after do - first_mongos.close - second_mongos.close - end + before do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(find), + closeConnection: false, + errorCode: 6 + } + ) + end - it 'retries on different mongos' do - client.subscribe(Mongo::Monitoring::COMMAND, subscriber) - expect { collection.find.first }.to raise_error - expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) - expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + after do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + mongos.close + client.close + end + + it 'retries on the same mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.find.first }.not_to raise_error + expect(find_started_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.first.to_s + ]) + expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + expect(find_succeeded_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + end end end end diff --git a/spec/integration/retryable_writes_errors_spec.rb b/spec/integration/retryable_writes_errors_spec.rb index 6352d9031a..25b5cba8aa 100644 --- a/spec/integration/retryable_writes_errors_spec.rb +++ b/spec/integration/retryable_writes_errors_spec.rb @@ -190,79 +190,159 @@ end end - context 'retry on different mongos' do + context 'Retries in a sharded cluster' do require_topology :sharded min_server_version '4.2' require_no_auth - let(:first_mongos) do - Mongo::Client.new( - [SpecConfig.instance.addresses.first], - direct_connection: true, - database: 'admin' - ) + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:insert_started_events) do + subscriber.started_events.select { |e| e.command_name == "insert" } + end + + let(:insert_failed_events) do + subscriber.failed_events.select { |e| e.command_name == "insert" } end - let(:second_mongos) do - Mongo::Client.new( - [SpecConfig.instance.addresses.last], - direct_connection: false, - database: 'admin' - ) + let(:insert_succeeded_events) do + subscriber.succeeded_events.select { |e| e.command_name == "insert" } end - let(:client) { authorized_client.with(retry_reads: true) } + context 'when another mongos is available' do - before do - skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + let(:first_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end + + let(:second_mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.last], + direct_connection: false, + database: 'admin' + ) + end - first_mongos.database.command( - configureFailPoint: 'failCommand', + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first, + SpecConfig.instance.addresses.last, + ], + SpecConfig.instance.test_options.merge(retry_writes: true) + ) + end + + let(:expected_servers) do + [ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.last.to_s + ].sort + end + + before do + skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2 + + first_mongos.database.command( + configureFailPoint: 'failCommand', mode: { times: 1 }, data: { failCommands: %w(insert), closeConnection: false, - errorCode: 11600, + errorCode: 6, errorLabels: ['RetryableWriteError'] } - ) + ) - second_mongos.database.command( - configureFailPoint: 'failCommand', + second_mongos.database.command( + configureFailPoint: 'failCommand', mode: { times: 1 }, data: { failCommands: %w(insert), closeConnection: false, - errorCode: 11600, + errorCode: 6, errorLabels: ['RetryableWriteError'] } - ) - end + ) + end - let(:subscriber) { Mrss::EventSubscriber.new } + after do + [first_mongos, second_mongos].each do |admin_client| + admin_client.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + admin_client.close + end + client.close + end - let(:find_started_events) do - subscriber.started_events.select { |e| e.command_name == "insert" } + it 'retries on different mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.insert_one(x: 1) }.to raise_error(Mongo::Error::OperationFailure) + expect(insert_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + expect(insert_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + end end - let(:find_failed_events) do - subscriber.failed_events.select { |e| e.command_name == "insert" } - end + context 'when no other mongos is available' do + let(:mongos) do + Mongo::Client.new( + [SpecConfig.instance.addresses.first], + direct_connection: true, + database: 'admin' + ) + end - let(:expected_servers) do - SpecConfig.instance.addresses.map { |a| a.to_s }.sort - end + let(:client) do + new_local_client( + [ + SpecConfig.instance.addresses.first + ], + SpecConfig.instance.test_options.merge(retry_writes: true) + ) + end - after do - first_mongos.close - second_mongos.close - end + before do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w(insert), + closeConnection: false, + errorCode: 6, + errorLabels: ['RetryableWriteError'] + } + ) + end - it 'retries on different mongos' do - client.subscribe(Mongo::Monitoring::COMMAND, subscriber) - expect { collection.insert_one(a: 1) }.to raise_error - expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) - expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers) + after do + mongos.database.command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + mongos.close + client.close + end + + it 'retries on the same mongos' do + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + expect { collection.insert_one(x: 1) }.not_to raise_error + expect(insert_started_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s, + SpecConfig.instance.addresses.first.to_s + ]) + expect(insert_failed_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + expect(insert_succeeded_events.map { |e| e.address.to_s }.sort).to eq([ + SpecConfig.instance.addresses.first.to_s + ]) + end end end end