Skip to content

Commit 2d66934

Browse files
committed
Add InnerClient::unpipelined_send() method.
Needed for CopyBoth (streaming replication) mode, where the client may send new messages that are part of the existing request.
1 parent 015b29e commit 2d66934

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

tokio-postgres/src/client.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub struct InnerClient {
7070
impl InnerClient {
7171
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
7272
let (sender, receiver) = mpsc::channel(1);
73-
let request = Request { messages, sender };
73+
let request = Request { messages: messages, sender: Some(sender) };
7474
self.sender
7575
.unbounded_send(request)
7676
.map_err(|_| Error::closed())?;
@@ -81,6 +81,18 @@ impl InnerClient {
8181
})
8282
}
8383

84+
// Send a message for the existing entry in the pipeline; don't
85+
// create a new entry in the pipeline. This is needed for CopyBoth
86+
// mode (i.e. streaming replication), where the client may send a
87+
// new message that is part of the existing request.
88+
pub fn unpipelined_send(&self, messages: RequestMessages) -> Result<(), Error> {
89+
let request = Request { messages: messages, sender: None };
90+
self.sender
91+
.unbounded_send(request)
92+
.map_err(|_| Error::closed())?;
93+
Ok(())
94+
}
95+
8496
pub fn typeinfo(&self) -> Option<Statement> {
8597
self.state.lock().typeinfo.clone()
8698
}

tokio-postgres/src/connection.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub enum RequestMessages {
2525

2626
pub struct Request {
2727
pub messages: RequestMessages,
28-
pub sender: mpsc::Sender<BackendMessages>,
28+
pub sender: Option<mpsc::Sender<BackendMessages>>,
2929
}
3030

3131
pub struct Response {
@@ -183,9 +183,11 @@ where
183183
match self.receiver.poll_next_unpin(cx) {
184184
Poll::Ready(Some(request)) => {
185185
trace!("polled new request");
186-
self.responses.push_back(Response {
187-
sender: request.sender,
188-
});
186+
if let Some(sender) = request.sender {
187+
self.responses.push_back(Response {
188+
sender: sender,
189+
});
190+
}
189191
Poll::Ready(Some(request.messages))
190192
}
191193
Poll::Ready(None) => Poll::Ready(None),

0 commit comments

Comments
 (0)