Skip to content

Commit 9228f5f

Browse files
RUBY-1813 Discard ServerSessions involved in network errors (#2825)
* RUBY-1813 Discard ServerSessions involved in network errors * no need to test the now-non-existant SessionPool.create method * Tweak from code review Co-authored-by: Dmitry Rybakov <dmitry.rybakov@mongodb.com> * Another tweak from code review Co-authored-by: Dmitry Rybakov <dmitry.rybakov@mongodb.com> --------- Co-authored-by: Dmitry Rybakov <dmitry.rybakov@mongodb.com>
1 parent 69b0ec1 commit 9228f5f

File tree

11 files changed

+507
-93
lines changed

11 files changed

+507
-93
lines changed

lib/mongo/cluster.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
157157
# @sdam_flow_lock covers just the sdam flow. Note it does not apply
158158
# to @topology replacements which are done under @update_lock.
159159
@sdam_flow_lock = Mutex.new
160-
Session::SessionPool.create(self)
160+
@session_pool = Session::SessionPool.new(self)
161161

162162
if seeds.empty? && load_balanced?
163163
raise ArgumentError, 'Load-balanced clusters with no seeds are prohibited'

lib/mongo/collection.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ def inspect
790790
def insert_one(document, opts = {})
791791
QueryCache.clear_namespace(namespace)
792792

793-
client.send(:with_session, opts) do |session|
793+
client.with_session(opts) do |session|
794794
write_concern = if opts[:write_concern]
795795
WriteConcern.get(opts[:write_concern])
796796
else

lib/mongo/operation/shared/executable.rb

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717

18+
require 'mongo/error'
19+
1820
module Mongo
1921
module Operation
2022

@@ -30,40 +32,42 @@ def do_execute(connection, context, options = {})
3032
session&.materialize_if_needed
3133
unpin_maybe(session, connection) do
3234
add_error_labels(connection, context) do
33-
add_server_diagnostics(connection) do
34-
get_result(connection, context, options).tap do |result|
35-
if session
36-
if session.in_transaction? &&
37-
connection.description.load_balancer?
38-
then
39-
if session.pinned_connection_global_id
40-
unless session.pinned_connection_global_id == connection.global_id
41-
raise(
42-
Error::InternalDriverError,
43-
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
44-
)
35+
check_for_network_error do
36+
add_server_diagnostics(connection) do
37+
get_result(connection, context, options).tap do |result|
38+
if session
39+
if session.in_transaction? &&
40+
connection.description.load_balancer?
41+
then
42+
if session.pinned_connection_global_id
43+
unless session.pinned_connection_global_id == connection.global_id
44+
raise(
45+
Error::InternalDriverError,
46+
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
47+
)
48+
end
49+
else
50+
session.pin_to_connection(connection.global_id)
51+
connection.pin
4552
end
46-
else
47-
session.pin_to_connection(connection.global_id)
48-
connection.pin
4953
end
50-
end
5154

52-
if session.snapshot? && !session.snapshot_timestamp
53-
session.snapshot_timestamp = result.snapshot_timestamp
55+
if session.snapshot? && !session.snapshot_timestamp
56+
session.snapshot_timestamp = result.snapshot_timestamp
57+
end
5458
end
55-
end
5659

57-
if result.has_cursor_id? &&
58-
connection.description.load_balancer?
59-
then
60-
if result.cursor_id == 0
61-
connection.unpin
62-
else
63-
connection.pin
60+
if result.has_cursor_id? &&
61+
connection.description.load_balancer?
62+
then
63+
if result.cursor_id == 0
64+
connection.unpin
65+
else
66+
connection.pin
67+
end
6468
end
69+
process_result(result, connection)
6570
end
66-
process_result(result, connection)
6771
end
6872
end
6973
end
@@ -144,6 +148,18 @@ def process_result_for_sdam(result, connection)
144148
connection.server.scan_semaphore.signal
145149
end
146150
end
151+
152+
NETWORK_ERRORS = [
153+
Error::SocketError,
154+
Error::SocketTimeoutError
155+
].freeze
156+
157+
def check_for_network_error
158+
yield
159+
rescue *NETWORK_ERRORS
160+
session&.dirty!
161+
raise
162+
end
147163
end
148164
end
149165
end

lib/mongo/operation/shared/response_handling.rb

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -50,35 +50,33 @@ def validate_result(result, connection, context)
5050
# the operation is performed.
5151
# @param [ Mongo::Operation::Context ] context The operation context.
5252
def add_error_labels(connection, context)
53-
begin
54-
yield
55-
rescue Mongo::Error::SocketError => e
56-
if context.in_transaction? && !context.committing_transaction?
57-
e.add_label('TransientTransactionError')
58-
end
59-
if context.committing_transaction?
60-
e.add_label('UnknownTransactionCommitResult')
61-
end
53+
yield
54+
rescue Mongo::Error::SocketError => e
55+
if context.in_transaction? && !context.committing_transaction?
56+
e.add_label('TransientTransactionError')
57+
end
58+
if context.committing_transaction?
59+
e.add_label('UnknownTransactionCommitResult')
60+
end
6261

63-
maybe_add_retryable_write_error_label!(e, connection, context)
64-
65-
raise e
66-
rescue Mongo::Error::SocketTimeoutError => e
67-
maybe_add_retryable_write_error_label!(e, connection, context)
68-
raise e
69-
rescue Mongo::Error::OperationFailure => e
70-
if context.committing_transaction?
71-
if e.write_retryable? || e.wtimeout? || (e.write_concern_error? &&
72-
!Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code)
73-
) || e.max_time_ms_expired?
74-
e.add_label('UnknownTransactionCommitResult')
75-
end
62+
maybe_add_retryable_write_error_label!(e, connection, context)
63+
64+
raise e
65+
rescue Mongo::Error::SocketTimeoutError => e
66+
maybe_add_retryable_write_error_label!(e, connection, context)
67+
raise e
68+
rescue Mongo::Error::OperationFailure => e
69+
if context.committing_transaction?
70+
if e.write_retryable? || e.wtimeout? || (e.write_concern_error? &&
71+
!Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code)
72+
) || e.max_time_ms_expired?
73+
e.add_label('UnknownTransactionCommitResult')
7674
end
75+
end
7776

78-
maybe_add_retryable_write_error_label!(e, connection, context)
77+
maybe_add_retryable_write_error_label!(e, connection, context)
7978

80-
raise e
81-
end
79+
raise e
8280
end
8381

8482
# Unpins the session and/or the connection if the yielded to block

lib/mongo/session.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,23 @@ def snapshot?
123123
# @since 2.5.0
124124
attr_reader :operation_time
125125

126+
# Sets the dirty state to the given value for the underlying server
127+
# session. If there is no server session, this does nothing.
128+
#
129+
# @param [ true | false ] mark whether to mark the server session as
130+
# dirty, or not.
131+
def dirty!(mark = true)
132+
@server_session&.dirty!(mark)
133+
end
134+
135+
# @return [ true | false | nil ] whether the underlying server session is
136+
# dirty. If no server session exists for this session, returns nil.
137+
#
138+
# @api private
139+
def dirty?
140+
@server_session&.dirty?
141+
end
142+
126143
# @return [ Hash ] The options for the transaction currently being executed
127144
# on this session.
128145
#

lib/mongo/session/server_session.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717

18+
require 'mongo/session/server_session/dirtyable'
19+
1820
module Mongo
1921

2022
class Session
@@ -25,6 +27,7 @@ class Session
2527
#
2628
# @since 2.5.0
2729
class ServerSession
30+
include Dirtyable
2831

2932
# Regex for removing dashes from the UUID string.
3033
#
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright (C) 2024 MongoDB Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
module Mongo
18+
class Session
19+
class ServerSession
20+
# Functionality for manipulating and querying a session's
21+
# "dirty" state, per the last paragraph at
22+
# https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#server-session-pool
23+
#
24+
# If a driver has a server session pool and a network error is
25+
# encountered when executing any command with a ClientSession, the
26+
# driver MUST mark the associated ServerSession as dirty. Dirty server
27+
# sessions are discarded when returned to the server session pool. It is
28+
# valid for a dirty session to be used for subsequent commands (e.g. an
29+
# implicit retry attempt, a later command in a bulk write, or a later
30+
# operation on an explicit session), however, it MUST remain dirty for
31+
# the remainder of its lifetime regardless if later commands succeed.
32+
#
33+
# @api private
34+
module Dirtyable
35+
# Query whether the server session has been marked dirty or not.
36+
#
37+
# @return [ true | false ] the server session's dirty state
38+
def dirty?
39+
@dirty
40+
end
41+
42+
# Mark the server session as dirty (the default) or clean.
43+
#
44+
# @param [ true | false ] mark whether the mark the server session
45+
# dirty or not.
46+
def dirty!(mark = true)
47+
@dirty = mark
48+
end
49+
end
50+
end
51+
end
52+
end

lib/mongo/session/session_pool.rb

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,6 @@ class Session
2525
#
2626
# @since 2.5.0
2727
class SessionPool
28-
29-
# Create a SessionPool.
30-
#
31-
# @example
32-
# SessionPool.create(cluster)
33-
#
34-
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
35-
# session pool.
36-
#
37-
# @since 2.5.0
38-
def self.create(cluster)
39-
pool = new(cluster)
40-
cluster.instance_variable_set(:@session_pool, pool)
41-
end
42-
4328
# Initialize a SessionPool.
4429
#
4530
# @example
@@ -105,9 +90,7 @@ def checkin(session)
10590

10691
@mutex.synchronize do
10792
prune!
108-
unless about_to_expire?(session)
109-
@queue.unshift(session)
110-
end
93+
@queue.unshift(session) if return_to_queue?(session)
11194
end
11295
end
11396

@@ -136,6 +119,17 @@ def end_sessions
136119

137120
private
138121

122+
# Query whether the given session is okay to return to the
123+
# pool's queue.
124+
#
125+
# @param [ Session::ServerSession ] session the session to query
126+
#
127+
# @return [ true | false ] whether to return the session to the
128+
# queue.
129+
def return_to_queue?(session)
130+
!session.dirty? && !about_to_expire?(session)
131+
end
132+
139133
def about_to_expire?(session)
140134
if session.nil?
141135
raise ArgumentError, 'session cannot be nil'

spec/mongo/session/session_pool_spec.rb

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,6 @@
1717
end
1818
end
1919

20-
describe '.create' do
21-
22-
let!(:pool) do
23-
described_class.create(cluster)
24-
end
25-
26-
it 'creates a session pool' do
27-
expect(pool).to be_a(Mongo::Session::SessionPool)
28-
end
29-
30-
it 'adds the pool as an instance variable on the cluster' do
31-
expect(cluster.session_pool).to eq(pool)
32-
end
33-
end
34-
3520
describe '#initialize' do
3621

3722
let(:pool) do
@@ -181,7 +166,7 @@
181166
describe '#end_sessions' do
182167

183168
let(:pool) do
184-
described_class.create(client.cluster)
169+
client.cluster.session_pool
185170
end
186171

187172
let!(:session_a) do

spec/runners/unified/support_operations.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,15 @@ def assert_session_dirty(op)
7070
consume_test_runner(op)
7171
use_arguments(op) do |args|
7272
session = entities.get(:session, args.use!('session'))
73-
# https://jira.mongodb.org/browse/RUBY-1813
74-
true
73+
session.dirty? || raise(Error::ResultMismatch, 'expected session to be dirty')
7574
end
7675
end
7776

7877
def assert_session_not_dirty(op)
7978
consume_test_runner(op)
8079
use_arguments(op) do |args|
8180
session = entities.get(:session, args.use!('session'))
82-
# https://jira.mongodb.org/browse/RUBY-1813
83-
true
81+
session.dirty? && raise(Error::ResultMismatch, 'expected session to be not dirty')
8482
end
8583
end
8684

@@ -92,7 +90,7 @@ def assert_same_lsid_on_last_two_commands(op, expected: true)
9290
unless subscriber.started_events.length >= 2
9391
raise Error::ResultMismatch, "Must have at least 2 events, have #{subscriber.started_events.length}"
9492
end
95-
lsids = subscriber.started_events[-2...-1].map do |cmd|
93+
lsids = subscriber.started_events[-2..-1].map do |cmd|
9694
cmd.command.fetch('lsid')
9795
end
9896
if expected

0 commit comments

Comments
 (0)