Skip to content

Commit f144620

Browse files
RUST-1553 Add support for document sequences (OP_MSG payload type 1) (#1009)
1 parent 6f48d65 commit f144620

File tree

28 files changed

+388
-991
lines changed

28 files changed

+388
-991
lines changed

src/client/auth/sasl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl SaslStart {
4242
body.insert("options", doc! { "skipEmptyExchange": true });
4343
}
4444

45-
let mut command = Command::new("saslStart".into(), self.source, body);
45+
let mut command = Command::new("saslStart", self.source, body);
4646
if let Some(server_api) = self.server_api {
4747
command.set_server_api(&server_api);
4848
}
@@ -81,7 +81,7 @@ impl SaslContinue {
8181
"payload": Binary { subtype: BinarySubtype::Generic, bytes: self.payload },
8282
};
8383

84-
let mut command = Command::new("saslContinue".into(), self.source, body);
84+
let mut command = Command::new("saslContinue", self.source, body);
8585
if let Some(server_api) = self.server_api {
8686
command.set_server_api(&server_api);
8787
}

src/client/auth/x509.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub(crate) fn build_client_first(
2525
auth_command_doc.insert("username", username);
2626
}
2727

28-
let mut command = Command::new("authenticate".into(), "$external".into(), auth_command_doc);
28+
let mut command = Command::new("authenticate", "$external", auth_command_doc);
2929
if let Some(server_api) = server_api {
3030
command.set_server_api(server_api);
3131
}

src/client/executor.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ use crate::{
2323
WatchArgs,
2424
},
2525
cmap::{
26-
conn::PinnedConnectionHandle,
26+
conn::{
27+
wire::{next_request_id, Message},
28+
PinnedConnectionHandle,
29+
},
2730
Connection,
2831
ConnectionPool,
29-
RawCommand,
3032
RawCommandResponse,
3133
},
3234
cursor::{session::SessionCursor, Cursor, CursorSpecification},
@@ -573,51 +575,43 @@ impl Client {
573575

574576
let connection_info = connection.info();
575577
let service_id = connection.service_id();
576-
let request_id = crate::cmap::conn::next_request_id();
578+
let request_id = next_request_id();
577579

578580
if let Some(ref server_api) = self.inner.options.server_api {
579581
cmd.set_server_api(server_api);
580582
}
581583

582584
let should_redact = cmd.should_redact();
585+
let should_compress = cmd.should_compress();
583586

584587
let cmd_name = cmd.name.clone();
585588
let target_db = cmd.target_db.clone();
586589

587-
let serialized = op.serialize_command(cmd)?;
590+
#[allow(unused_mut)]
591+
let mut message = Message::from_command(cmd, Some(request_id))?;
588592
#[cfg(feature = "in-use-encryption-unstable")]
589-
let serialized = {
593+
{
590594
let guard = self.inner.csfle.read().await;
591595
if let Some(ref csfle) = *guard {
592596
if csfle.opts().bypass_auto_encryption != Some(true) {
593-
self.auto_encrypt(csfle, RawDocument::from_bytes(&serialized)?, &target_db)
594-
.await?
595-
.into_bytes()
596-
} else {
597-
serialized
597+
let encrypted_payload = self
598+
.auto_encrypt(csfle, &message.document_payload, &target_db)
599+
.await?;
600+
message.document_payload = encrypted_payload;
598601
}
599-
} else {
600-
serialized
601602
}
602-
};
603-
let raw_cmd = RawCommand {
604-
name: cmd_name.clone(),
605-
target_db,
606-
exhaust_allowed: false,
607-
bytes: serialized,
608-
};
603+
}
609604

610605
self.emit_command_event(|| {
611606
let command_body = if should_redact {
612607
Document::new()
613608
} else {
614-
Document::from_reader(raw_cmd.bytes.as_slice())
615-
.unwrap_or_else(|e| doc! { "serialization error": e.to_string() })
609+
message.get_command_document()
616610
};
617611
CommandEvent::Started(CommandStartedEvent {
618612
command: command_body,
619-
db: raw_cmd.target_db.clone(),
620-
command_name: raw_cmd.name.clone(),
613+
db: target_db.clone(),
614+
command_name: cmd_name.clone(),
621615
request_id,
622616
connection: connection_info.clone(),
623617
service_id,
@@ -626,7 +620,7 @@ impl Client {
626620
.await;
627621

628622
let start_time = Instant::now();
629-
let command_result = match connection.send_raw_command(raw_cmd, request_id).await {
623+
let command_result = match connection.send_message(message, should_compress).await {
630624
Ok(response) => {
631625
async fn handle_response<T: Operation>(
632626
client: &Client,

src/cmap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use derivative::Derivative;
1515

1616
pub use self::conn::ConnectionInfo;
1717
pub(crate) use self::{
18-
conn::{Command, Connection, RawCommand, RawCommandResponse, StreamDescription},
18+
conn::{Command, Connection, RawCommandResponse, StreamDescription},
1919
status::PoolGenerationSubscriber,
2020
worker::PoolGeneration,
2121
};

src/cmap/conn.rs

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod command;
22
mod stream_description;
3-
mod wire;
3+
pub(crate) mod wire;
44

55
use std::{
66
sync::Arc,
@@ -33,9 +33,8 @@ use crate::{
3333
options::ServerAddress,
3434
runtime::AsyncStream,
3535
};
36-
pub(crate) use command::{Command, RawCommand, RawCommandResponse};
36+
pub(crate) use command::{Command, RawCommandResponse};
3737
pub(crate) use stream_description::StreamDescription;
38-
pub(crate) use wire::next_request_id;
3938

4039
/// User-facing information about a connection to the database.
4140
#[derive(Clone, Debug, Serialize)]
@@ -273,7 +272,7 @@ impl Connection {
273272
}
274273
}
275274

276-
async fn send_message(
275+
pub(crate) async fn send_message(
277276
&mut self,
278277
message: Message,
279278
to_compress: bool,
@@ -318,7 +317,10 @@ impl Connection {
318317
let response_message = response_message_result?;
319318
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);
320319

321-
RawCommandResponse::new(self.address.clone(), response_message)
320+
Ok(RawCommandResponse::new(
321+
self.address.clone(),
322+
response_message,
323+
))
322324
}
323325

324326
/// Executes a `Command` and returns a `CommandResponse` containing the result from the server.
@@ -332,23 +334,7 @@ impl Connection {
332334
request_id: impl Into<Option<i32>>,
333335
) -> Result<RawCommandResponse> {
334336
let to_compress = command.should_compress();
335-
let message = Message::with_command(command, request_id.into())?;
336-
self.send_message(message, to_compress).await
337-
}
338-
339-
/// Executes a `RawCommand` and returns a `CommandResponse` containing the result from the
340-
/// server.
341-
///
342-
/// An `Ok(...)` result simply means the server received the command and that the driver
343-
/// received the response; it does not imply anything about the success of the command
344-
/// itself.
345-
pub(crate) async fn send_raw_command(
346-
&mut self,
347-
command: RawCommand,
348-
request_id: impl Into<Option<i32>>,
349-
) -> Result<RawCommandResponse> {
350-
let to_compress = command.should_compress();
351-
let message = Message::with_raw_command(command, request_id.into());
337+
let message = Message::from_command(command, request_id.into())?;
352338
self.send_message(message, to_compress).await
353339
}
354340

@@ -379,7 +365,10 @@ impl Connection {
379365
let response_message = response_message_result?;
380366
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);
381367

382-
RawCommandResponse::new(self.address.clone(), response_message)
368+
Ok(RawCommandResponse::new(
369+
self.address.clone(),
370+
response_message,
371+
))
383372
}
384373

385374
/// Gets the connection's StreamDescription.

src/cmap/conn/command.rs

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use bson::{RawDocument, RawDocumentBuf};
22
use serde::{de::DeserializeOwned, Deserialize, Serialize};
33

4-
use super::wire::Message;
4+
use super::wire::{message::DocumentSequence, Message};
55
use crate::{
6-
bson::{rawdoc, Document},
7-
bson_util::extend_raw_document_buf,
8-
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
6+
bson::Document,
7+
client::{options::ServerApi, ClusterTime},
98
error::{Error, ErrorKind, Result},
109
hello::{HelloCommandResponse, HelloReply},
1110
operation::{CommandErrorBody, CommandResponse},
@@ -14,28 +13,14 @@ use crate::{
1413
ClientSession,
1514
};
1615

17-
/// A command that has been serialized to BSON.
18-
#[derive(Debug)]
19-
pub(crate) struct RawCommand {
20-
pub(crate) name: String,
21-
pub(crate) target_db: String,
22-
/// Whether or not the server may respond to this command multiple times via the moreToComeBit.
23-
pub(crate) exhaust_allowed: bool,
24-
pub(crate) bytes: Vec<u8>,
25-
}
26-
27-
impl RawCommand {
28-
pub(crate) fn should_compress(&self) -> bool {
29-
let name = self.name.to_lowercase();
30-
!REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
31-
}
32-
}
33-
3416
/// Driver-side model of a database command.
3517
#[serde_with::skip_serializing_none]
3618
#[derive(Clone, Debug, Serialize, Default)]
3719
#[serde(rename_all = "camelCase")]
38-
pub(crate) struct Command<T = Document> {
20+
pub(crate) struct Command<T = Document>
21+
where
22+
T: Serialize,
23+
{
3924
#[serde(skip)]
4025
pub(crate) name: String,
4126

@@ -45,6 +30,9 @@ pub(crate) struct Command<T = Document> {
4530
#[serde(flatten)]
4631
pub(crate) body: T,
4732

33+
#[serde(skip)]
34+
pub(crate) document_sequences: Vec<DocumentSequence>,
35+
4836
#[serde(rename = "$db")]
4937
pub(crate) target_db: String,
5038

@@ -70,13 +58,17 @@ pub(crate) struct Command<T = Document> {
7058
recovery_token: Option<Document>,
7159
}
7260

73-
impl<T> Command<T> {
74-
pub(crate) fn new(name: String, target_db: String, body: T) -> Self {
61+
impl<T> Command<T>
62+
where
63+
T: Serialize,
64+
{
65+
pub(crate) fn new(name: impl ToString, target_db: impl ToString, body: T) -> Self {
7566
Self {
76-
name,
77-
target_db,
67+
name: name.to_string(),
68+
target_db: target_db.to_string(),
7869
exhaust_allowed: false,
7970
body,
71+
document_sequences: Vec::new(),
8072
lsid: None,
8173
cluster_time: None,
8274
server_api: None,
@@ -100,6 +92,7 @@ impl<T> Command<T> {
10092
target_db,
10193
exhaust_allowed: false,
10294
body,
95+
document_sequences: Vec::new(),
10396
lsid: None,
10497
cluster_time: None,
10598
server_api: None,
@@ -112,6 +105,17 @@ impl<T> Command<T> {
112105
}
113106
}
114107

108+
pub(crate) fn add_document_sequence(
109+
&mut self,
110+
identifier: impl ToString,
111+
documents: Vec<RawDocumentBuf>,
112+
) {
113+
self.document_sequences.push(DocumentSequence {
114+
identifier: identifier.to_string(),
115+
documents,
116+
});
117+
}
118+
115119
pub(crate) fn set_session(&mut self, session: &ClientSession) {
116120
self.lsid = Some(session.id().clone())
117121
}
@@ -178,19 +182,6 @@ impl<T> Command<T> {
178182
}
179183
}
180184

181-
impl Command<RawDocumentBuf> {
182-
pub(crate) fn into_bson_bytes(mut self) -> Result<Vec<u8>> {
183-
let mut command = self.body;
184-
185-
// Clear the body of the command to avoid re-serializing.
186-
self.body = rawdoc! {};
187-
let rest_of_command = bson::to_raw_document_buf(&self)?;
188-
189-
extend_raw_document_buf(&mut command, rest_of_command)?;
190-
Ok(command.into_bytes())
191-
}
192-
}
193-
194185
#[derive(Debug, Clone)]
195186
pub(crate) struct RawCommandResponse {
196187
pub(crate) source: ServerAddress,
@@ -220,9 +211,8 @@ impl RawCommandResponse {
220211
)
221212
}
222213

223-
pub(crate) fn new(source: ServerAddress, message: Message) -> Result<Self> {
224-
let raw = message.single_document_response()?;
225-
Ok(Self::new_raw(source, RawDocumentBuf::from_bytes(raw)?))
214+
pub(crate) fn new(source: ServerAddress, message: Message) -> Self {
215+
Self::new_raw(source, message.document_payload)
226216
}
227217

228218
pub(crate) fn new_raw(source: ServerAddress, raw: RawDocumentBuf) -> Self {

src/cmap/conn/stream_description.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl StreamDescription {
9999
max_bson_object_size: 16 * 1024 * 1024,
100100
max_write_batch_size: 100_000,
101101
hello_ok: false,
102-
max_message_size_bytes: Default::default(),
102+
max_message_size_bytes: 48_000_000,
103103
service_id: None,
104104
}
105105
}

src/cmap/conn/wire.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
mod header;
2-
mod message;
2+
pub(crate) mod message;
33
mod util;
44

55
pub(crate) use self::{

0 commit comments

Comments
 (0)