diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 509a66f2e3..eac6d6229d 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -157,7 +157,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) # @sdam_flow_lock covers just the sdam flow. Note it does not apply # to @topology replacements which are done under @update_lock. @sdam_flow_lock = Mutex.new - Session::SessionPool.create(self) + @session_pool = Session::SessionPool.new(self) if seeds.empty? && load_balanced? raise ArgumentError, 'Load-balanced clusters with no seeds are prohibited' diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index 58d56e1b94..a2b2076b7d 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -790,7 +790,7 @@ def inspect def insert_one(document, opts = {}) QueryCache.clear_namespace(namespace) - client.send(:with_session, opts) do |session| + client.with_session(opts) do |session| write_concern = if opts[:write_concern] WriteConcern.get(opts[:write_concern]) else diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 229479e24b..9b61476631 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/error' + module Mongo module Operation @@ -30,40 +32,42 @@ def do_execute(connection, context, options = {}) session&.materialize_if_needed unpin_maybe(session, connection) do add_error_labels(connection, context) do - add_server_diagnostics(connection) do - get_result(connection, context, options).tap do |result| - if session - if session.in_transaction? && - connection.description.load_balancer? - then - if session.pinned_connection_global_id - unless session.pinned_connection_global_id == connection.global_id - raise( - Error::InternalDriverError, - "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}" - ) + check_for_network_error do + add_server_diagnostics(connection) do + get_result(connection, context, options).tap do |result| + if session + if session.in_transaction? && + connection.description.load_balancer? + then + if session.pinned_connection_global_id + unless session.pinned_connection_global_id == connection.global_id + raise( + Error::InternalDriverError, + "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}" + ) + end + else + session.pin_to_connection(connection.global_id) + connection.pin end - else - session.pin_to_connection(connection.global_id) - connection.pin end - end - if session.snapshot? && !session.snapshot_timestamp - session.snapshot_timestamp = result.snapshot_timestamp + if session.snapshot? && !session.snapshot_timestamp + session.snapshot_timestamp = result.snapshot_timestamp + end end - end - if result.has_cursor_id? && - connection.description.load_balancer? - then - if result.cursor_id == 0 - connection.unpin - else - connection.pin + if result.has_cursor_id? && + connection.description.load_balancer? + then + if result.cursor_id == 0 + connection.unpin + else + connection.pin + end end + process_result(result, connection) end - process_result(result, connection) end end end @@ -144,6 +148,18 @@ def process_result_for_sdam(result, connection) connection.server.scan_semaphore.signal end end + + NETWORK_ERRORS = [ + Error::SocketError, + Error::SocketTimeoutError + ].freeze + + def check_for_network_error + yield + rescue *NETWORK_ERRORS + session&.dirty! + raise + end end end end diff --git a/lib/mongo/operation/shared/response_handling.rb b/lib/mongo/operation/shared/response_handling.rb index af4c83ca75..799721a8de 100644 --- a/lib/mongo/operation/shared/response_handling.rb +++ b/lib/mongo/operation/shared/response_handling.rb @@ -50,35 +50,33 @@ def validate_result(result, connection, context) # the operation is performed. # @param [ Mongo::Operation::Context ] context The operation context. def add_error_labels(connection, context) - begin - yield - rescue Mongo::Error::SocketError => e - if context.in_transaction? && !context.committing_transaction? - e.add_label('TransientTransactionError') - end - if context.committing_transaction? - e.add_label('UnknownTransactionCommitResult') - end + yield + rescue Mongo::Error::SocketError => e + if context.in_transaction? && !context.committing_transaction? + e.add_label('TransientTransactionError') + end + if context.committing_transaction? + e.add_label('UnknownTransactionCommitResult') + end - maybe_add_retryable_write_error_label!(e, connection, context) - - raise e - rescue Mongo::Error::SocketTimeoutError => e - maybe_add_retryable_write_error_label!(e, connection, context) - raise e - rescue Mongo::Error::OperationFailure => e - if context.committing_transaction? - if e.write_retryable? || e.wtimeout? || (e.write_concern_error? && - !Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code) - ) || e.max_time_ms_expired? - e.add_label('UnknownTransactionCommitResult') - end + maybe_add_retryable_write_error_label!(e, connection, context) + + raise e + rescue Mongo::Error::SocketTimeoutError => e + maybe_add_retryable_write_error_label!(e, connection, context) + raise e + rescue Mongo::Error::OperationFailure => e + if context.committing_transaction? + if e.write_retryable? || e.wtimeout? || (e.write_concern_error? && + !Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code) + ) || e.max_time_ms_expired? + e.add_label('UnknownTransactionCommitResult') end + end - maybe_add_retryable_write_error_label!(e, connection, context) + maybe_add_retryable_write_error_label!(e, connection, context) - raise e - end + raise e end # Unpins the session and/or the connection if the yielded to block diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index bc12896af0..d3725adfba 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -123,6 +123,23 @@ def snapshot? # @since 2.5.0 attr_reader :operation_time + # Sets the dirty state to the given value for the underlying server + # session. If there is no server session, this does nothing. + # + # @param [ true | false ] mark whether to mark the server session as + # dirty, or not. + def dirty!(mark = true) + @server_session&.dirty!(mark) + end + + # @return [ true | false | nil ] whether the underlying server session is + # dirty. If no server session exists for this session, returns nil. + # + # @api private + def dirty? + @server_session&.dirty? + end + # @return [ Hash ] The options for the transaction currently being executed # on this session. # diff --git a/lib/mongo/session/server_session.rb b/lib/mongo/session/server_session.rb index 6d0410903a..6f7283c79e 100644 --- a/lib/mongo/session/server_session.rb +++ b/lib/mongo/session/server_session.rb @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/session/server_session/dirtyable' + module Mongo class Session @@ -25,6 +27,7 @@ class Session # # @since 2.5.0 class ServerSession + include Dirtyable # Regex for removing dashes from the UUID string. # diff --git a/lib/mongo/session/server_session/dirtyable.rb b/lib/mongo/session/server_session/dirtyable.rb new file mode 100644 index 0000000000..0df262c85b --- /dev/null +++ b/lib/mongo/session/server_session/dirtyable.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +# Copyright (C) 2024 MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Session + class ServerSession + # Functionality for manipulating and querying a session's + # "dirty" state, per the last paragraph at + # https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#server-session-pool + # + # If a driver has a server session pool and a network error is + # encountered when executing any command with a ClientSession, the + # driver MUST mark the associated ServerSession as dirty. Dirty server + # sessions are discarded when returned to the server session pool. It is + # valid for a dirty session to be used for subsequent commands (e.g. an + # implicit retry attempt, a later command in a bulk write, or a later + # operation on an explicit session), however, it MUST remain dirty for + # the remainder of its lifetime regardless if later commands succeed. + # + # @api private + module Dirtyable + # Query whether the server session has been marked dirty or not. + # + # @return [ true | false ] the server session's dirty state + def dirty? + @dirty + end + + # Mark the server session as dirty (the default) or clean. + # + # @param [ true | false ] mark whether the mark the server session + # dirty or not. + def dirty!(mark = true) + @dirty = mark + end + end + end + end +end diff --git a/lib/mongo/session/session_pool.rb b/lib/mongo/session/session_pool.rb index e89d634c31..4fb832a813 100644 --- a/lib/mongo/session/session_pool.rb +++ b/lib/mongo/session/session_pool.rb @@ -25,21 +25,6 @@ class Session # # @since 2.5.0 class SessionPool - - # Create a SessionPool. - # - # @example - # SessionPool.create(cluster) - # - # @param [ Mongo::Cluster ] cluster The cluster that will be associated with this - # session pool. - # - # @since 2.5.0 - def self.create(cluster) - pool = new(cluster) - cluster.instance_variable_set(:@session_pool, pool) - end - # Initialize a SessionPool. # # @example @@ -105,9 +90,7 @@ def checkin(session) @mutex.synchronize do prune! - unless about_to_expire?(session) - @queue.unshift(session) - end + @queue.unshift(session) if return_to_queue?(session) end end @@ -136,6 +119,17 @@ def end_sessions private + # Query whether the given session is okay to return to the + # pool's queue. + # + # @param [ Session::ServerSession ] session the session to query + # + # @return [ true | false ] whether to return the session to the + # queue. + def return_to_queue?(session) + !session.dirty? && !about_to_expire?(session) + end + def about_to_expire?(session) if session.nil? raise ArgumentError, 'session cannot be nil' diff --git a/spec/mongo/session/session_pool_spec.rb b/spec/mongo/session/session_pool_spec.rb index 1dedebed25..caac0117be 100644 --- a/spec/mongo/session/session_pool_spec.rb +++ b/spec/mongo/session/session_pool_spec.rb @@ -17,21 +17,6 @@ end end - describe '.create' do - - let!(:pool) do - described_class.create(cluster) - end - - it 'creates a session pool' do - expect(pool).to be_a(Mongo::Session::SessionPool) - end - - it 'adds the pool as an instance variable on the cluster' do - expect(cluster.session_pool).to eq(pool) - end - end - describe '#initialize' do let(:pool) do @@ -181,7 +166,7 @@ describe '#end_sessions' do let(:pool) do - described_class.create(client.cluster) + client.cluster.session_pool end let!(:session_a) do diff --git a/spec/runners/unified/support_operations.rb b/spec/runners/unified/support_operations.rb index f8b4e53af3..a99d310958 100644 --- a/spec/runners/unified/support_operations.rb +++ b/spec/runners/unified/support_operations.rb @@ -70,8 +70,7 @@ def assert_session_dirty(op) consume_test_runner(op) use_arguments(op) do |args| session = entities.get(:session, args.use!('session')) - # https://jira.mongodb.org/browse/RUBY-1813 - true + session.dirty? || raise(Error::ResultMismatch, 'expected session to be dirty') end end @@ -79,8 +78,7 @@ def assert_session_not_dirty(op) consume_test_runner(op) use_arguments(op) do |args| session = entities.get(:session, args.use!('session')) - # https://jira.mongodb.org/browse/RUBY-1813 - true + session.dirty? && raise(Error::ResultMismatch, 'expected session to be not dirty') end end @@ -92,7 +90,7 @@ def assert_same_lsid_on_last_two_commands(op, expected: true) unless subscriber.started_events.length >= 2 raise Error::ResultMismatch, "Must have at least 2 events, have #{subscriber.started_events.length}" end - lsids = subscriber.started_events[-2...-1].map do |cmd| + lsids = subscriber.started_events[-2..-1].map do |cmd| cmd.command.fetch('lsid') end if expected diff --git a/spec/spec_tests/data/sessions_unified/driver-sessions-dirty-session-errors.yml b/spec/spec_tests/data/sessions_unified/driver-sessions-dirty-session-errors.yml new file mode 100644 index 0000000000..b7f2917efc --- /dev/null +++ b/spec/spec_tests/data/sessions_unified/driver-sessions-dirty-session-errors.yml @@ -0,0 +1,351 @@ +description: "driver-sessions-dirty-session-errors" + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "4.0" + topologies: [ replicaset ] + - minServerVersion: "4.1.8" + topologies: [ sharded ] + +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeEvents: [ commandStartedEvent ] + - database: + id: &database0 database0 + client: *client0 + databaseName: &database0Name session-tests + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: &collection0Name test + - session: + id: &session0 session0 + client: *client0 + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1 } + +tests: + - description: "Dirty explicit session is discarded (insert)" + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ insert ] + closeConnection: true + - name: assertSessionNotDirty + object: testRunner + arguments: + session: *session0 + - name: insertOne + object: *collection0 + arguments: + session: *session0 + document: { _id: 2 } + expectResult: { $$unsetOrMatches: { insertedId: { $$unsetOrMatches: 2 } } } + - name: assertSessionDirty + object: testRunner + arguments: + session: *session0 + - name: insertOne + object: *collection0 + arguments: + session: *session0 + document: { _id: 3 } + expectResult: { $$unsetOrMatches: { insertedId: { $$unsetOrMatches: 3 } } } + - name: assertSessionDirty + object: testRunner + arguments: + session: *session0 + - name: endSession + object: *session0 + - &find_with_implicit_session + name: find + object: *collection0 + arguments: + filter: { _id: -1 } + expectResult: [] + - name: assertDifferentLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client0 + expectEvents: + - client: *client0 + events: + - commandStartedEvent: &insert_attempt + command: + insert: *collection0Name + documents: + - { _id: 2 } + ordered: true + lsid: { $$sessionLsid: *session0 } + txnNumber: 1 + commandName: insert + databaseName: *database0Name + - commandStartedEvent: *insert_attempt + - commandStartedEvent: + command: + insert: *collection0Name + documents: + - { _id: 3 } + ordered: true + lsid: { $$sessionLsid: *session0 } + txnNumber: 2 + commandName: insert + databaseName: *database0Name + - commandStartedEvent: &find_with_implicit_session_event + command: + find: *collection0Name + filter: { _id: -1 } + # There is no explicit session to use with $$sessionLsid, so + # just assert an arbitrary lsid document + lsid: { $$type: object } + commandName: find + databaseName: *database0Name + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1 } + - { _id: 2 } + - { _id: 3 } + + - description: "Dirty explicit session is discarded (findAndModify)" + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ findAndModify ] + closeConnection: true + - name: assertSessionNotDirty + object: testRunner + arguments: + session: *session0 + - name: findOneAndUpdate + object: *collection0 + arguments: + session: *session0 + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: Before + expectResult: { _id: 1 } + - name: assertSessionDirty + object: testRunner + arguments: + session: *session0 + - name: endSession + object: *session0 + - *find_with_implicit_session + - name: assertDifferentLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client0 + expectEvents: + - client: *client0 + events: + - commandStartedEvent: &findAndModify_attempt + command: + findAndModify: *collection0Name + query: { _id: 1 } + update: { $inc: { x: 1 } } + new: false + lsid: { $$sessionLsid: *session0 } + txnNumber: 1 + readConcern: { $$exists: false } + writeConcern: { $$exists: false } + commandName: findAndModify + databaseName: *database0Name + - commandStartedEvent: *findAndModify_attempt + - commandStartedEvent: *find_with_implicit_session_event + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 1 } + + - description: "Dirty implicit session is discarded (insert)" + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ insert ] + closeConnection: true + - name: insertOne + object: *collection0 + arguments: + document: { _id: 2 } + expectResult: { $$unsetOrMatches: { insertedId: { $$unsetOrMatches: 2 } } } + - *find_with_implicit_session + - name: assertDifferentLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client0 + expectEvents: + - client: *client0 + events: + - commandStartedEvent: &insert_attempt + command: + insert: *collection0Name + documents: + - { _id: 2 } + ordered: true + lsid: { $$type: object } + txnNumber: 1 + commandName: insert + databaseName: *database0Name + - commandStartedEvent: *insert_attempt + - commandStartedEvent: *find_with_implicit_session_event + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1 } + - { _id: 2 } + + - description: "Dirty implicit session is discarded (findAndModify)" + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ findAndModify ] + closeConnection: true + - name: findOneAndUpdate + object: *collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: Before + expectResult: { _id: 1 } + - *find_with_implicit_session + - name: assertDifferentLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client0 + expectEvents: + - client: *client0 + events: + - commandStartedEvent: &findAndModify_attempt + command: + findAndModify: *collection0Name + query: { _id: 1 } + update: { $inc: { x: 1 } } + new: false + lsid: { $$type: object } + txnNumber: 1 + readConcern: { $$exists: false } + writeConcern: { $$exists: false } + commandName: findAndModify + databaseName: *database0Name + - commandStartedEvent: *findAndModify_attempt + - commandStartedEvent: *find_with_implicit_session_event + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 1 } + + - description: "Dirty implicit session is discarded (read returning cursor)" + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ aggregate ] + closeConnection: true + - name: aggregate + object: *collection0 + arguments: + pipeline: [ { $project: { _id: 1 } } ] + expectResult: [ { _id: 1 } ] + - *find_with_implicit_session + - name: assertDifferentLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client0 + expectEvents: + - client: *client0 + events: + - commandStartedEvent: &aggregate_attempt + command: + aggregate: *collection0Name + pipeline: [ { $project: { _id: 1 } } ] + lsid: { $$type: object } + commandName: aggregate + databaseName: *database0Name + - commandStartedEvent: *aggregate_attempt + - commandStartedEvent: *find_with_implicit_session_event + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1 } + + - description: "Dirty implicit session is discarded (read not returning cursor)" + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ aggregate ] + closeConnection: true + - name: countDocuments + object: *collection0 + arguments: + filter: {} + expectResult: 1 + - *find_with_implicit_session + - name: assertDifferentLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client0 + expectEvents: + - client: *client0 + events: + - commandStartedEvent: &countDocuments_attempt + command: + aggregate: *collection0Name + pipeline: [ { $match: {} }, { $group: { _id: 1, n: { $sum: 1 } } } ] + lsid: { $$type: object } + commandName: aggregate + databaseName: *database0Name + - commandStartedEvent: *countDocuments_attempt + - commandStartedEvent: *find_with_implicit_session_event + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1 }