Skip to content

RUBY-2748 Retry reads/writes on another mongos #2717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/mongo/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ module Retryable
# @api private
#
# @return [ Mongo::Server ] A server matching the server preference.
def select_server(cluster, server_selector, session)
server_selector.select_server(cluster, nil, session)
def select_server(cluster, server_selector, session, failed_server = nil)
server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're just wanting to squeeze a possible nil out of a single element array, another idiom I find a little simpler is to use the Array initializer:

> Array(nil)
[]
> Array(5)
[5]
> Array([5])
[5]

Ultimately it's personal preference, though -- [failed_server].compact is fine, too.

end

# Returns the read worker for handling retryable reads.
Expand Down
13 changes: 7 additions & 6 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,13 @@ def deprecated_legacy_read_with_retry(&block)
#
# @return [ Result ] The result of the operation.
def modern_read_with_retry(session, server_selector, &block)
yield select_server(cluster, server_selector, session)
server = select_server(cluster, server_selector, session)
yield server
rescue *retryable_exceptions, Error::OperationFailure, Auth::Unauthorized, Error::PoolError => e
e.add_notes('modern retry', 'attempt 1')
raise e if session.in_transaction?
raise e if !is_retryable_exception?(e) && !e.write_retryable?
retry_read(e, session, server_selector, &block)
retry_read(e, session, server_selector, failed_server: server, &block)
end

# Attempts to do a "legacy" read with retry. The operation will be
Expand Down Expand Up @@ -257,12 +258,14 @@ def read_without_retry(session, server_selector, &block)
# being run on.
# @param [ Mongo::ServerSelector::Selectable ] server_selector Server
# selector for the operation.
# @param [ Mongo::Server ] failed_server The server on which the original
# operation failed.
# @param [ Proc ] block The block to execute.
#
# @return [ Result ] The result of the operation.
def retry_read(original_error, session, server_selector, &block)
def retry_read(original_error, session, server_selector, failed_server: nil, &block)
begin
server = select_server(cluster, server_selector, session)
server = select_server(cluster, server_selector, session, failed_server)
rescue Error, Error::AuthError => e
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
Expand All @@ -289,8 +292,6 @@ def retry_read(original_error, session, server_selector, &block)
raise original_error
end
end

end

end
end
8 changes: 5 additions & 3 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def modern_write_with_retry(session, server, context, &block)

# Context#with creates a new context, which is not necessary here
# but the API is less prone to misuse this way.
retry_write(e, txn_num, context: context.with(is_retry: true), &block)
retry_write(e, txn_num, context: context.with(is_retry: true), failed_server: server, &block)
end

# Called after a failed write, this will retry the write no more than
Expand All @@ -250,17 +250,19 @@ def modern_write_with_retry(session, server, context, &block)
# retry.
# @param [ Number ] txn_num The transaction number.
# @param [ Operation::Context ] context The context for the operation.
# @param [ Mongo::Server ] failed_server The server on which the original
# operation failed.
#
# @return [ Result ] The result of the operation.
def retry_write(original_error, txn_num, context:, &block)
def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
session = context.session

# We do not request a scan of the cluster here, because error handling
# for the error which triggered the retry should have updated the
# server description and/or topology as necessary (specifically,
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
server = select_server(cluster, ServerSelector.primary, session)
server = select_server(cluster, ServerSelector.primary, session, failed_server)

unless server.retry_writes?
# Do not need to add "modern retry" here, it should already be on
Expand Down
38 changes: 32 additions & 6 deletions lib/mongo/server_selector/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def ==(other)
# for mongos pinning. Added in version 2.10.0.
# @param [ true | false ] write_aggregation Whether we need a server that
# supports writing aggregations (e.g. with $merge/$out) on secondaries.
# @param [ Array<Server> ] deprioritized A list of servers that should
# be selected from only if no other servers are available. This is
# used to avoid selecting the same server twice in a row when
# retrying a command.
#
# @return [ Mongo::Server ] A server matching the server preference.
#
Expand All @@ -174,16 +178,16 @@ def ==(other)
# lint mode is enabled.
#
# @since 2.0.0
def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
select_server_impl(cluster, ping, session, write_aggregation).tap do |server|
def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: [])
select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server|
if Lint.enabled? && !server.pool.ready?
raise Error::LintError, 'Server selector returning a server with a pool which is not ready'
end
end
end

# Parameters and return values are the same as for select_server.
private def select_server_impl(cluster, ping, session, write_aggregation)
private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized)
if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
return cluster.servers.first
end
Expand Down Expand Up @@ -266,7 +270,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
end
end

server = try_select_server(cluster, write_aggregation: write_aggregation)
server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized)

if server
unless cluster.topology.compatible?
Expand Down Expand Up @@ -321,11 +325,15 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
# an eligible server.
# @param [ true | false ] write_aggregation Whether we need a server that
# supports writing aggregations (e.g. with $merge/$out) on secondaries.
# @param [ Array<Server> ] deprioritized A list of servers that should
# be selected from only if no other servers are available. This is
# used to avoid selecting the same server twice in a row when
# retrying a command.
#
# @return [ Server | nil ] A suitable server, if one exists.
#
# @api private
def try_select_server(cluster, write_aggregation: false)
def try_select_server(cluster, write_aggregation: false, deprioritized: [])
servers = if write_aggregation && cluster.replica_set?
# 1. Check if ALL servers in cluster support secondary writes.
is_write_supported = cluster.servers.reduce(true) do |res, server|
Expand All @@ -347,7 +355,7 @@ def try_select_server(cluster, write_aggregation: false)
# by the selector (e.g. for secondary preferred, the first
# server may be a secondary and the second server may be primary)
# and we should take the first server here respecting the order
server = servers.first
server = suitable_server(servers, deprioritized)

if server
if Lint.enabled?
Expand Down Expand Up @@ -418,6 +426,24 @@ def suitable_servers(cluster)

private

# Returns a server from the list of servers that is suitable for
# executing the operation.
#
# @param [ Array<Server> ] servers The candidate servers.
# @param [ Array<Server> ] deprioritized A list of servers that should
# be selected from only if no other servers are available.
#
# @return [ Server | nil ] The suitable server or nil if no suitable
# server is available.
def suitable_server(servers, deprioritized)
preferred = servers - deprioritized
if preferred.empty?
servers.first
else
preferred.first
end
end

# Convert this server preference definition into a format appropriate
# for sending to a MongoDB server (i.e., as a command field).
#
Expand Down
169 changes: 161 additions & 8 deletions spec/integration/retryable_reads_errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

let(:failpoint) do
{
configureFailPoint: "failCommand",
mode: { times: 1 },
data: {
failCommands: [ "find" ],
errorCode: 91,
blockConnection: true,
blockTimeMS: 1000
}
configureFailPoint: "failCommand",
mode: { times: 1 },
data: {
failCommands: [ "find" ],
errorCode: 91,
blockConnection: true,
blockTimeMS: 1000
}
}
end

Expand Down Expand Up @@ -107,4 +107,157 @@
})
end
end

context 'Retries in a sharded cluster' do
require_topology :sharded
min_server_version '4.2'
require_no_auth

let(:subscriber) { Mrss::EventSubscriber.new }

let(:find_started_events) do
subscriber.started_events.select { |e| e.command_name == "find" }
end

let(:find_failed_events) do
subscriber.failed_events.select { |e| e.command_name == "find" }
end

let(:find_succeeded_events) do
subscriber.succeeded_events.select { |e| e.command_name == "find" }
end

context 'when another mongos is available' do

let(:first_mongos) do
Mongo::Client.new(
[SpecConfig.instance.addresses.first],
direct_connection: true,
database: 'admin'
)
end

let(:second_mongos) do
Mongo::Client.new(
[SpecConfig.instance.addresses.last],
direct_connection: false,
database: 'admin'
)
end

let(:client) do
new_local_client(
[
SpecConfig.instance.addresses.first,
SpecConfig.instance.addresses.last,
],
SpecConfig.instance.test_options.merge(retry_reads: true)
)
end

let(:expected_servers) do
[
SpecConfig.instance.addresses.first.to_s,
SpecConfig.instance.addresses.last.to_s
].sort
end

before do
skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2

first_mongos.database.command(
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: %w(find),
closeConnection: false,
errorCode: 6
}
)

second_mongos.database.command(
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: %w(find),
closeConnection: false,
errorCode: 6
}
)
end

after do
[first_mongos, second_mongos].each do |admin_client|
admin_client.database.command(
configureFailPoint: 'failCommand',
mode: 'off'
)
admin_client.close
end
client.close
end

it 'retries on different mongos' do
client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
expect { collection.find.first }.to raise_error(Mongo::Error::OperationFailure)
expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers)
expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers)
end
end

context 'when no other mongos is available' do
let(:mongos) do
Mongo::Client.new(
[SpecConfig.instance.addresses.first],
direct_connection: true,
database: 'admin'
)
end

let(:client) do
new_local_client(
[
SpecConfig.instance.addresses.first
],
SpecConfig.instance.test_options.merge(retry_reads: true)
)
end

before do
mongos.database.command(
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: %w(find),
closeConnection: false,
errorCode: 6
}
)
end

after do
mongos.database.command(
configureFailPoint: 'failCommand',
mode: 'off'
)
mongos.close
client.close
end

it 'retries on the same mongos' do
client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
expect { collection.find.first }.not_to raise_error
expect(find_started_events.map { |e| e.address.to_s }.sort).to eq([
SpecConfig.instance.addresses.first.to_s,
SpecConfig.instance.addresses.first.to_s
])
expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq([
SpecConfig.instance.addresses.first.to_s
])
expect(find_succeeded_events.map { |e| e.address.to_s }.sort).to eq([
SpecConfig.instance.addresses.first.to_s
])
end
end
end
end
Loading