Skip to content

Commit 2b2b266

Browse files
committed
SERVER-36963 SessionsCollectionRS maintains a RemoteCommandTargeter
1 parent ef9f9b4 commit 2b2b266

File tree

2 files changed

+113
-118
lines changed

2 files changed

+113
-118
lines changed

src/mongo/db/sessions_collection_rs.cpp

Lines changed: 83 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <memory>
3636
#include <utility>
3737

38+
#include "mongo/bson/bsonobj.h"
3839
#include "mongo/client/authenticate.h"
3940
#include "mongo/client/connection_string.h"
4041
#include "mongo/client/query.h"
@@ -48,106 +49,79 @@
4849
#include "mongo/rpc/get_status_from_command_result.h"
4950

5051
namespace mongo {
51-
namespace {
5252

53-
BSONObj lsidQuery(const LogicalSessionId& lsid) {
54-
return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON());
55-
}
53+
auto SessionsCollectionRS::_makePrimaryConnection(OperationContext* opCtx) {
54+
// Find the primary
55+
if (stdx::lock_guard lk(_mutex); !_targeter) {
56+
// There is an assumption here that for the lifetime of a given process, the
57+
// ReplicationCoordiation will only return configs for a single replica set
58+
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
59+
auto config = coord->getConfig();
60+
uassert(ErrorCodes::NotYetInitialized,
61+
"Replication has not yet been configured",
62+
config.isInitialized());
5663

57-
Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbConnection>* conn) {
58-
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
59-
auto config = coord->getConfig();
60-
if (!config.isInitialized()) {
61-
return {ErrorCodes::NotYetInitialized, "Replication has not yet been configured"};
64+
RemoteCommandTargeterFactoryImpl factory;
65+
_targeter = factory.create(config.getConnectionString());
6266
}
6367

64-
// Find the primary
65-
RemoteCommandTargeterFactoryImpl factory;
66-
auto targeter = factory.create(config.getConnectionString());
67-
auto res = targeter->findHost(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly));
68-
if (!res.isOK()) {
69-
return res.getStatus();
70-
}
68+
auto res = uassertStatusOK(
69+
_targeter->findHost(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
7170

72-
auto hostname = res.getValue().toString();
71+
auto conn = std::make_unique<ScopedDbConnection>(res.toString());
7372

7473
// Make a connection to the primary, auth, then send
75-
try {
76-
conn->emplace(hostname);
77-
if (auth::isInternalAuthSet()) {
78-
uassertStatusOK((*conn)->get()->authenticateInternalUser());
79-
}
80-
return Status::OK();
81-
} catch (...) {
82-
return exceptionToStatus();
74+
if (auth::isInternalAuthSet()) {
75+
uassertStatusOK(conn->get()->authenticateInternalUser());
8376
}
84-
}
8577

86-
template <typename Callback>
87-
auto runIfStandaloneOrPrimary(const NamespaceString& ns, OperationContext* opCtx, Callback callback)
88-
-> boost::optional<decltype(std::declval<Callback>()())> {
89-
bool isStandaloneOrPrimary;
90-
{
91-
Lock::DBLock lk(opCtx, ns.db(), MODE_IS);
92-
Lock::CollectionLock lock(opCtx, NamespaceString::kLogicalSessionsNamespace, MODE_IS);
78+
return conn;
79+
}
9380

94-
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
81+
bool SessionsCollectionRS::_isStandaloneOrPrimary(const NamespaceString& ns,
82+
OperationContext* opCtx) {
83+
Lock::DBLock lk(opCtx, ns.db(), MODE_IS);
84+
Lock::CollectionLock lock(opCtx, NamespaceString::kLogicalSessionsNamespace, MODE_IS);
9585

96-
// There is a window here where we may transition from Primary to
97-
// Secondary after we release the locks we take above. In this case,
98-
// the callback we run below may return a NotMaster error, or a stale
99-
// read. However, this is preferable to running the callback while
100-
// we hold locks, since that can lead to a deadlock.
101-
isStandaloneOrPrimary = coord->canAcceptWritesForDatabase(opCtx, ns.db());
102-
}
103-
104-
if (isStandaloneOrPrimary) {
105-
return callback();
106-
}
86+
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
10787

108-
return boost::none;
88+
return coord->canAcceptWritesForDatabase(opCtx, ns.db());
10989
}
11090

111-
template <typename Callback>
112-
auto sendToPrimary(OperationContext* opCtx, Callback callback)
113-
-> decltype(std::declval<Callback>()(static_cast<DBClientBase*>(nullptr))) {
114-
boost::optional<ScopedDbConnection> conn;
115-
auto res = makePrimaryConnection(opCtx, &conn);
116-
if (!res.isOK()) {
117-
return res;
91+
template <typename LocalCallback, typename RemoteCallback>
92+
auto SessionsCollectionRS::_dispatch(const NamespaceString& ns,
93+
OperationContext* opCtx,
94+
LocalCallback&& localCallback,
95+
RemoteCallback&& remoteCallback)
96+
-> CommonResultT<LocalCallback, RemoteCallback> {
97+
if (_isStandaloneOrPrimary(ns, opCtx)) {
98+
return std::forward<LocalCallback>(localCallback)();
11899
}
119100

120-
auto val = callback(conn->get());
101+
try {
102+
// There is a window here where we may transition from Primary to Secondary after we release
103+
// the locks we take in _isStandaloneOrPrimary(). In this case, the callback we run below
104+
// may throw a NotMaster error, or a stale read. However, this is preferable to running the
105+
// callback while we hold locks, since that can lead to a deadlock.
106+
107+
auto conn = _makePrimaryConnection(opCtx);
108+
DBClientBase* client = conn->get();
109+
110+
auto sosw = std::forward<RemoteCallback>(remoteCallback)(client);
111+
if (!sosw.isOK()) {
112+
conn->kill();
113+
return sosw;
114+
}
121115

122-
if (val.isOK()) {
123116
conn->done();
124-
} else {
125-
conn->kill();
126-
}
127-
128-
return std::move(val);
129-
}
130-
131-
template <typename LocalCallback, typename RemoteCallback>
132-
auto dispatch(const NamespaceString& ns,
133-
OperationContext* opCtx,
134-
LocalCallback localCallback,
135-
RemoteCallback remoteCallback)
136-
-> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) {
137-
// If we are the primary, write directly to ourself.
138-
auto result = runIfStandaloneOrPrimary(ns, opCtx, [&] { return localCallback(); });
139-
140-
if (result) {
141-
return *result;
117+
return sosw;
118+
} catch (...) {
119+
return exceptionToStatus();
142120
}
143-
144-
return sendToPrimary(opCtx, remoteCallback);
145121
}
146122

147-
} // namespace
148-
149123
Status SessionsCollectionRS::setupSessionsCollection(OperationContext* opCtx) {
150-
return dispatch(
124+
return _dispatch(
151125
NamespaceString::kLogicalSessionsNamespace,
152126
opCtx,
153127
[&] {
@@ -209,49 +183,49 @@ Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
209183
const LogicalSessionRecordSet& sessions) {
210184
const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end());
211185

212-
return dispatch(NamespaceString::kLogicalSessionsNamespace,
213-
opCtx,
214-
[&] {
215-
DBDirectClient client(opCtx);
216-
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
217-
sessionsVector,
218-
makeSendFnForBatchWrite(
219-
NamespaceString::kLogicalSessionsNamespace, &client));
220-
},
221-
[&](DBClientBase* client) {
222-
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
223-
sessionsVector,
224-
makeSendFnForBatchWrite(
225-
NamespaceString::kLogicalSessionsNamespace, client));
226-
});
186+
return _dispatch(NamespaceString::kLogicalSessionsNamespace,
187+
opCtx,
188+
[&] {
189+
DBDirectClient client(opCtx);
190+
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
191+
sessionsVector,
192+
makeSendFnForBatchWrite(
193+
NamespaceString::kLogicalSessionsNamespace, &client));
194+
},
195+
[&](DBClientBase* client) {
196+
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
197+
sessionsVector,
198+
makeSendFnForBatchWrite(
199+
NamespaceString::kLogicalSessionsNamespace, client));
200+
});
227201
}
228202

229203
Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
230204
const LogicalSessionIdSet& sessions) {
231205
const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
232206

233-
return dispatch(NamespaceString::kLogicalSessionsNamespace,
234-
opCtx,
235-
[&] {
236-
DBDirectClient client(opCtx);
237-
return doRemove(NamespaceString::kLogicalSessionsNamespace,
238-
sessionsVector,
239-
makeSendFnForBatchWrite(
240-
NamespaceString::kLogicalSessionsNamespace, &client));
241-
},
242-
[&](DBClientBase* client) {
243-
return doRemove(NamespaceString::kLogicalSessionsNamespace,
244-
sessionsVector,
245-
makeSendFnForBatchWrite(
246-
NamespaceString::kLogicalSessionsNamespace, client));
247-
});
207+
return _dispatch(NamespaceString::kLogicalSessionsNamespace,
208+
opCtx,
209+
[&] {
210+
DBDirectClient client(opCtx);
211+
return doRemove(NamespaceString::kLogicalSessionsNamespace,
212+
sessionsVector,
213+
makeSendFnForBatchWrite(
214+
NamespaceString::kLogicalSessionsNamespace, &client));
215+
},
216+
[&](DBClientBase* client) {
217+
return doRemove(NamespaceString::kLogicalSessionsNamespace,
218+
sessionsVector,
219+
makeSendFnForBatchWrite(
220+
NamespaceString::kLogicalSessionsNamespace, client));
221+
});
248222
}
249223

250224
StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
251225
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
252226
const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
253227

254-
return dispatch(
228+
return _dispatch(
255229
NamespaceString::kLogicalSessionsNamespace,
256230
opCtx,
257231
[&] {

src/mongo/db/sessions_collection_rs.h

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,25 @@
3030
#pragma once
3131

3232
#include <memory>
33+
#include <type_traits>
3334

34-
#include "mongo/client/connection_string.h"
3535
#include "mongo/client/connpool.h"
3636
#include "mongo/client/remote_command_targeter.h"
3737
#include "mongo/db/logical_session_id.h"
38+
#include "mongo/db/namespace_string.h"
3839
#include "mongo/db/sessions_collection.h"
40+
#include "mongo/stdx/mutex.h"
3941
#include "mongo/util/time_support.h"
4042

4143
namespace mongo {
4244

43-
class DBDirectClient;
4445
class OperationContext;
45-
class RemoteCommandTargeter;
4646

4747
/**
4848
* Accesses the sessions collection for replica set members.
4949
*/
50-
class SessionsCollectionRS : public SessionsCollection {
50+
class SessionsCollectionRS final : public SessionsCollection {
5151
public:
52-
/**
53-
* Constructs a new SessionsCollectionRS.
54-
*/
55-
SessionsCollectionRS() = default;
56-
5752
/**
5853
* Ensures that the sessions collection exists and has the proper indexes.
5954
*/
@@ -89,6 +84,32 @@ class SessionsCollectionRS : public SessionsCollection {
8984
*/
9085
StatusWith<LogicalSessionIdSet> findRemovedSessions(
9186
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
87+
88+
private:
89+
auto _makePrimaryConnection(OperationContext* opCtx);
90+
91+
bool _isStandaloneOrPrimary(const NamespaceString& ns, OperationContext* opCtx);
92+
93+
template <typename LocalCallback, typename RemoteCallback>
94+
struct CommonResult {
95+
using LocalReturnType = std::invoke_result_t<LocalCallback>;
96+
using RemoteReturnType = std::invoke_result_t<RemoteCallback, DBClientBase*>;
97+
static_assert(std::is_same_v<LocalReturnType, RemoteReturnType>,
98+
"LocalCallback and RemoteCallback must have the same return type");
99+
100+
using Type = RemoteReturnType;
101+
};
102+
template <typename LocalCallback, typename RemoteCallback>
103+
using CommonResultT = typename CommonResult<LocalCallback, RemoteCallback>::Type;
104+
105+
template <typename LocalCallback, typename RemoteCallback>
106+
CommonResultT<LocalCallback, RemoteCallback> _dispatch(const NamespaceString& ns,
107+
OperationContext* opCtx,
108+
LocalCallback&& localCallback,
109+
RemoteCallback&& remoteCallback);
110+
111+
stdx::mutex _mutex;
112+
std::unique_ptr<RemoteCommandTargeter> _targeter;
92113
};
93114

94115
} // namespace mongo

0 commit comments

Comments
 (0)