From 1be499da0846198099ce12a65296c03a2640a4ef Mon Sep 17 00:00:00 2001 From: Oleksii Kachaiev Date: Mon, 2 Sep 2019 19:18:16 -0700 Subject: [PATCH 1/5] Rename: server -> accept_loop, client -> connection_loop, client_writer -> connection_writer_loop --- docs/src/tutorial/accept_loop.md | 6 ++--- docs/src/tutorial/all_together.md | 20 +++++++------- docs/src/tutorial/clean_shutdown.md | 10 +++---- .../connecting_readers_and_writers.md | 6 ++--- docs/src/tutorial/handling_disconnection.md | 26 +++++++++---------- docs/src/tutorial/receiving_messages.md | 12 ++++----- docs/src/tutorial/sending_messages.md | 10 +++---- 7 files changed, 45 insertions(+), 45 deletions(-) diff --git a/docs/src/tutorial/accept_loop.md b/docs/src/tutorial/accept_loop.md index c46019df3..d83fe0092 100644 --- a/docs/src/tutorial/accept_loop.md +++ b/docs/src/tutorial/accept_loop.md @@ -31,7 +31,7 @@ type Result = std::result::Result Now we can write the server's accept loop: ```rust -async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1 +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1 let listener = TcpListener::bind(addr).await?; // 2 let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { // 3 @@ -41,7 +41,7 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1 } ``` -1. We mark the `server` function as `async`, which allows us to use `.await` syntax inside. +1. We mark the `accept_loop` function as `async`, which allows us to use `.await` syntax inside. 2. `TcpListener::bind` call returns a future, which we `.await` to extract the `Result`, and then `?` to get a `TcpListener`. Note how `.await` and `?` work nicely together. This is exactly how `std::net::TcpListener` works, but with `.await` added. @@ -62,7 +62,7 @@ Finally, let's add main: ```rust fn main() -> Result<()> { - let fut = server("127.0.0.1:8080"); + let fut = accept_loop("127.0.0.1:8080"); task::block_on(fut) } ``` diff --git a/docs/src/tutorial/all_together.md b/docs/src/tutorial/all_together.md index 64fba7111..876b5ecdf 100644 --- a/docs/src/tutorial/all_together.md +++ b/docs/src/tutorial/all_together.md @@ -28,24 +28,24 @@ type Receiver = mpsc::UnboundedReceiver; fn main() -> Result<()> { - task::block_on(server("127.0.0.1:8080")) + task::block_on(accept_loop("127.0.0.1:8080")) } -async fn server(addr: impl ToSocketAddrs) -> Result<()> { +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1 - let _broker_handle = task::spawn(broker(broker_receiver)); + let _broker_handle = task::spawn(broker_loop(broker_receiver)); let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); - spawn_and_log_error(client(broker_sender.clone(), stream)); + spawn_and_log_error(connection_loop(broker_sender.clone(), stream)); } Ok(()) } -async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { +async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result<()> { let stream = Arc::new(stream); // 2 let reader = BufReader::new(&*stream); let mut lines = reader.lines(); @@ -75,7 +75,7 @@ async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { Ok(()) } -async fn client_writer( +async fn connection_writer_loop( mut messages: Receiver, stream: Arc, ) -> Result<()> { @@ -99,7 +99,7 @@ enum Event { }, } -async fn broker(mut events: Receiver) -> Result<()> { +async fn broker_loop(mut events: Receiver) -> Result<()> { let mut peers: HashMap> = HashMap::new(); while let Some(event) = events.next().await { @@ -117,7 +117,7 @@ async fn broker(mut events: Receiver) -> Result<()> { Entry::Vacant(entry) => { let (client_sender, client_receiver) = mpsc::unbounded(); entry.insert(client_sender); // 4 - spawn_and_log_error(client_writer(client_receiver, stream)); // 5 + spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5 } } } @@ -127,8 +127,8 @@ async fn broker(mut events: Receiver) -> Result<()> { } ``` -1. Inside the `server`, we create the broker's channel and `task`. -2. Inside `client`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `client_writer`. +1. Inside the `accept_loop`, we create the broker's channel and `task`. +2. Inside `connection_loop`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `connection_writer_loop`. 3. On login, we notify the broker. Note that we `.unwrap` on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well. 4. Similarly, we forward parsed messages to the broker, assuming that it is alive. diff --git a/docs/src/tutorial/clean_shutdown.md b/docs/src/tutorial/clean_shutdown.md index 0f54dac6a..7d3f4f8e4 100644 --- a/docs/src/tutorial/clean_shutdown.md +++ b/docs/src/tutorial/clean_shutdown.md @@ -21,16 +21,16 @@ However, we never wait for broker and writers, which might cause some messages t Let's add waiting to the server: ```rust -async fn server(addr: impl ToSocketAddrs) -> Result<()> { +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let (broker_sender, broker_receiver) = mpsc::unbounded(); - let broker = task::spawn(broker(broker_receiver)); + let broker = task::spawn(broker_loop(broker_receiver)); let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); - spawn_and_log_error(client(broker_sender.clone(), stream)); + spawn_and_log_error(connection_loop(broker_sender.clone(), stream)); } drop(broker_sender); // 1 broker.await?; // 5 @@ -41,7 +41,7 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { And to the broker: ```rust -async fn broker(mut events: Receiver) -> Result<()> { +async fn broker_loop(mut events: Receiver) -> Result<()> { let mut writers = Vec::new(); let mut peers: HashMap> = HashMap::new(); @@ -60,7 +60,7 @@ async fn broker(mut events: Receiver) -> Result<()> { Entry::Vacant(entry) => { let (client_sender, client_receiver) = mpsc::unbounded(); entry.insert(client_sender); - let handle = spawn_and_log_error(client_writer(client_receiver, stream)); + let handle = spawn_and_log_error(connection_writer_loop(client_receiver, stream)); writers.push(handle); // 4 } } diff --git a/docs/src/tutorial/connecting_readers_and_writers.md b/docs/src/tutorial/connecting_readers_and_writers.md index 531656b38..0c2aa4787 100644 --- a/docs/src/tutorial/connecting_readers_and_writers.md +++ b/docs/src/tutorial/connecting_readers_and_writers.md @@ -1,7 +1,7 @@ ## Connecting Readers and Writers -So how do we make sure that messages read in `client` flow into the relevant `client_writer`? +So how do we make sure that messages read in `connection_loop` flow into the relevant `connection_writer_loop`? We should somehow maintain an `peers: HashMap>` map which allows a client to find destination channels. However, this map would be a bit of shared mutable state, so we'll have to wrap an `RwLock` over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message. @@ -24,7 +24,7 @@ enum Event { // 1 }, } -async fn broker(mut events: Receiver) -> Result<()> { +async fn broker_loop(mut events: Receiver) -> Result<()> { let mut peers: HashMap> = HashMap::new(); // 2 while let Some(event) = events.next().await { @@ -42,7 +42,7 @@ async fn broker(mut events: Receiver) -> Result<()> { Entry::Vacant(entry) => { let (client_sender, client_receiver) = mpsc::unbounded(); entry.insert(client_sender); // 4 - spawn_and_log_error(client_writer(client_receiver, stream)); // 5 + spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5 } } } diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 4cda69fd8..c6a96bba3 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -15,7 +15,7 @@ There's a more minimal solution however, which makes clever use of RAII. Closing a channel is a synchronization event, so we don't need to send a shutdown message, we can just drop the sender. This way, we statically guarantee that we issue shutdown exactly once, even if we early return via `?` or panic. -First, let's add a shutdown channel to the `client`: +First, let's add a shutdown channel to the `connection_loop`: ```rust #[derive(Debug)] @@ -35,7 +35,7 @@ enum Event { }, } -async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { +async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result<()> { // ... let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); // 3 @@ -53,14 +53,14 @@ async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { 2. We pass the shutdown channel to the writer task 3. In the reader, we create a `_shutdown_sender` whose only purpose is to get dropped. -In the `client_writer`, we now need to choose between shutdown and message channels. +In the `connection_writer_loop`, we now need to choose between shutdown and message channels. We use the `select` macro for this purpose: ```rust use futures::select; use futures::FutureExt; -async fn client_writer( +async fn connection_writer_loop( messages: &mut Receiver, stream: Arc, mut shutdown: Receiver, // 1 @@ -86,7 +86,7 @@ async fn client_writer( 2. Because of `select`, we can't use a `while let` loop, so we desugar it further into a `loop`. 3. In the shutdown case we use `match void {}` as a statically-checked `unreachable!()`. -Another problem is that between the moment we detect disconnection in `client_writer` and the moment when we actually remove the peer from the `peers` map, new messages might be pushed into the peer's channel. +Another problem is that between the moment we detect disconnection in `connection_writer_loop` and the moment when we actually remove the peer from the `peers` map, new messages might be pushed into the peer's channel. To not lose these messages completely, we'll return the messages channel back to the broker. This also allows us to establish a useful invariant that the message channel strictly outlives the peer in the `peers` map, and makes the broker itself infailable. @@ -123,26 +123,26 @@ type Receiver = mpsc::UnboundedReceiver; enum Void {} fn main() -> Result<()> { - task::block_on(server("127.0.0.1:8080")) + task::block_on(accept_loop("127.0.0.1:8080")) } -async fn server(addr: impl ToSocketAddrs) -> Result<()> { +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let (broker_sender, broker_receiver) = mpsc::unbounded(); - let broker = task::spawn(broker(broker_receiver)); + let broker = task::spawn(broker_loop(broker_receiver)); let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); - spawn_and_log_error(client(broker_sender.clone(), stream)); + spawn_and_log_error(connection_loop(broker_sender.clone(), stream)); } drop(broker_sender); broker.await; Ok(()) } -async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { +async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result<()> { let stream = Arc::new(stream); let reader = BufReader::new(&*stream); let mut lines = reader.lines(); @@ -177,7 +177,7 @@ async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { Ok(()) } -async fn client_writer( +async fn connection_writer_loop( messages: &mut Receiver, stream: Arc, mut shutdown: Receiver, @@ -212,7 +212,7 @@ enum Event { }, } -async fn broker(mut events: Receiver) { +async fn broker_loop(mut events: Receiver) { let (disconnect_sender, mut disconnect_receiver) = // 1 mpsc::unbounded::<(String, Receiver)>(); let mut peers: HashMap> = HashMap::new(); @@ -246,7 +246,7 @@ async fn broker(mut events: Receiver) { entry.insert(client_sender); let mut disconnect_sender = disconnect_sender.clone(); spawn_and_log_error(async move { - let res = client_writer(&mut client_receiver, stream, shutdown).await; + let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await; disconnect_sender.send((name, client_receiver)).await // 4 .unwrap(); res diff --git a/docs/src/tutorial/receiving_messages.md b/docs/src/tutorial/receiving_messages.md index 3dc1903ca..e5662f698 100644 --- a/docs/src/tutorial/receiving_messages.md +++ b/docs/src/tutorial/receiving_messages.md @@ -12,18 +12,18 @@ use async_std::io::BufReader; use async_std::net::TcpStream; use async_std::io::BufReader; -async fn server(addr: impl ToSocketAddrs) -> Result<()> { +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); - let _handle = task::spawn(client(stream)); // 1 + let _handle = task::spawn(connection_loop(stream)); // 1 } Ok(()) } -async fn client(stream: TcpStream) -> Result<()> { +async fn connection_loop(stream: TcpStream) -> Result<()> { let reader = BufReader::new(&stream); // 2 let mut lines = reader.lines(); @@ -47,7 +47,7 @@ async fn client(stream: TcpStream) -> Result<()> { ``` 1. We use `task::spawn` function to spawn an independent task for working with each client. - That is, after accepting the client the `server` loop immediately starts waiting for the next one. + That is, after accepting the client the `accept_loop` immediately starts waiting for the next one. This is the core benefit of event-driven architecture: we serve many clients concurrently, without spending many hardware threads. 2. Luckily, the "split byte stream into lines" functionality is already implemented. @@ -61,12 +61,12 @@ async fn client(stream: TcpStream) -> Result<()> { ## Managing Errors -One serious problem in the above solution is that, while we correctly propagate errors in the `client`, we just drop the error on the floor afterwards! +One serious problem in the above solution is that, while we correctly propagate errors in the `connection_loop`, we just drop the error on the floor afterwards! That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined. We can "fix" it by waiting for the task to be joined, like this: ```rust -let handle = task::spawn(client(stream)); +let handle = task::spawn(connection_loop(stream)); handle.await? ``` diff --git a/docs/src/tutorial/sending_messages.md b/docs/src/tutorial/sending_messages.md index 0722bc014..80c3353ed 100644 --- a/docs/src/tutorial/sending_messages.md +++ b/docs/src/tutorial/sending_messages.md @@ -1,13 +1,13 @@ ## Sending Messages Now it's time to implement the other half -- sending messages. -A most obvious way to implement sending is to give each `client` access to the write half of `TcpStream` of each other clients. +A most obvious way to implement sending is to give each `connection_loop` access to the write half of `TcpStream` of each other clients. That way, a client can directly `.write_all` a message to recipients. However, this would be wrong: if Alice sends `bob: foo`, and Charley sends `bob: bar`, Bob might actually receive `fobaor`. Sending a message over a socket might require several syscalls, so two concurrent `.write_all`'s might interfere with each other! As a rule of thumb, only a single task should write to each `TcpStream`. -So let's create a `client_writer` task which receives messages over a channel and writes them to the socket. +So let's create a `connection_writer_loop` task which receives messages over a channel and writes them to the socket. This task would be the point of serialization of messages. if Alice and Charley send two messages to Bob at the same time, Bob will see the messages in the same order as they arrive in the channel. @@ -19,7 +19,7 @@ use std::sync::Arc; type Sender = mpsc::UnboundedSender; // 2 type Receiver = mpsc::UnboundedReceiver; -async fn client_writer( +async fn connection_writer_loop( mut messages: Receiver, stream: Arc, // 3 ) -> Result<()> { @@ -33,5 +33,5 @@ async fn client_writer( 1. We will use channels from the `futures` crate. 2. For simplicity, we will use `unbounded` channels, and won't be discussing backpressure in this tutorial. -3. As `client` and `client_writer` share the same `TcpStream`, we need to put it into an `Arc`. - Note that because `client` only reads from the stream and `client_writer` only writes to the stream, we don't get a race here. +3. As `connection_loop` and `connection_writer_loop` share the same `TcpStream`, we need to put it into an `Arc`. + Note that because `client` only reads from the stream and `connection_writer_loop` only writes to the stream, we don't get a race here. From d41f350370b77147ec169f882207fd286c20a745 Mon Sep 17 00:00:00 2001 From: Oleksii Kachaiev Date: Thu, 19 Sep 2019 08:45:44 -0700 Subject: [PATCH 2/5] Use edition2018 for rust code blocks --- docs/src/tutorial/accept_loop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/tutorial/accept_loop.md b/docs/src/tutorial/accept_loop.md index 4322bdf1f..9d4a662ce 100644 --- a/docs/src/tutorial/accept_loop.md +++ b/docs/src/tutorial/accept_loop.md @@ -25,7 +25,7 @@ type Result = std::result::Result Now we can write the server's accept loop: -```rust +```rust,edition2018 # extern crate async_std; # use async_std::{ # net::{TcpListener, ToSocketAddrs}, From e7dc3f94d5c0867485628b5ef942b310723af71d Mon Sep 17 00:00:00 2001 From: Oleksii Kachaiev Date: Thu, 19 Sep 2019 14:23:29 -0700 Subject: [PATCH 3/5] Rename fn server -> accept loop in comments --- docs/src/tutorial/accept_loop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/tutorial/accept_loop.md b/docs/src/tutorial/accept_loop.md index 9d4a662ce..058663fcd 100644 --- a/docs/src/tutorial/accept_loop.md +++ b/docs/src/tutorial/accept_loop.md @@ -73,7 +73,7 @@ Finally, let's add main: # # type Result = std::result::Result>; # -# async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1 +# async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1 # let listener = TcpListener::bind(addr).await?; // 2 # let mut incoming = listener.incoming(); # while let Some(stream) = incoming.next().await { // 3 From afd53a17fbef92151050a6c23bf16e1f7e835f74 Mon Sep 17 00:00:00 2001 From: Oleksii Kachaiev Date: Thu, 19 Sep 2019 16:18:09 -0700 Subject: [PATCH 4/5] Rename client_writer to connection_writer_loop --- docs/src/tutorial/connecting_readers_and_writers.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/tutorial/connecting_readers_and_writers.md b/docs/src/tutorial/connecting_readers_and_writers.md index 360580562..51520f5ad 100644 --- a/docs/src/tutorial/connecting_readers_and_writers.md +++ b/docs/src/tutorial/connecting_readers_and_writers.md @@ -28,7 +28,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined # type Sender = mpsc::UnboundedSender; # type Receiver = mpsc::UnboundedReceiver; # -# async fn client_writer( +# async fn connection_writer_loop( # mut messages: Receiver, # stream: Arc, # ) -> Result<()> { From ee2dbf1ec66eeeb99d5a4710f868b9e288dd16a8 Mon Sep 17 00:00:00 2001 From: Oleksii Kachaiev Date: Thu, 19 Sep 2019 22:05:18 -0700 Subject: [PATCH 5/5] Rename client_writer to connection_writer_loop --- docs/src/tutorial/clean_shutdown.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/src/tutorial/clean_shutdown.md b/docs/src/tutorial/clean_shutdown.md index 1b906b3e7..234067a3a 100644 --- a/docs/src/tutorial/clean_shutdown.md +++ b/docs/src/tutorial/clean_shutdown.md @@ -83,7 +83,7 @@ Let's add waiting to the server: # Ok(()) # } # -# async fn client_writer_loop( +# async fn connection_writer_loop( # mut messages: Receiver, # stream: Arc, # ) -> Result<()> { @@ -126,7 +126,7 @@ Let's add waiting to the server: # Entry::Vacant(entry) => { # let (client_sender, client_receiver) = mpsc::unbounded(); # entry.insert(client_sender); // 4 -# spawn_and_log_error(client_writer_loop(client_receiver, stream)); // 5 +# spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5 # } # } # } @@ -175,7 +175,7 @@ And to the broker: # type Sender = mpsc::UnboundedSender; # type Receiver = mpsc::UnboundedReceiver; # -# async fn client_writer_loop( +# async fn connection_writer_loop( # mut messages: Receiver, # stream: Arc, # ) -> Result<()> {