Skip to content

Rename server functions to follow *_loop convention #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/src/tutorial/accept_loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Now we can write the server's accept loop:
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1

let listener = TcpListener::bind(addr).await?; // 2
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await { // 3
Expand All @@ -44,7 +45,7 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
}
```

1. We mark the `server` function as `async`, which allows us to use `.await` syntax inside.
1. We mark the `accept_loop` function as `async`, which allows us to use `.await` syntax inside.
2. `TcpListener::bind` call returns a future, which we `.await` to extract the `Result`, and then `?` to get a `TcpListener`.
Note how `.await` and `?` work nicely together.
This is exactly how `std::net::TcpListener` works, but with `.await` added.
Expand Down Expand Up @@ -72,7 +73,7 @@ Finally, let's add main:
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
# async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
# async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1
# let listener = TcpListener::bind(addr).await?; // 2
# let mut incoming = listener.incoming();
# while let Some(stream) = incoming.next().await { // 3
Expand All @@ -83,7 +84,7 @@ Finally, let's add main:
#
// main
fn run() -> Result<()> {
let fut = server("127.0.0.1:8080");
let fut = accept_loop("127.0.0.1:8080");
task::block_on(fut)
}
```
Expand Down
20 changes: 10 additions & 10 deletions docs/src/tutorial/all_together.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Receiver<T> = mpsc::UnboundedReceiver<T>;

// main
fn run() -> Result<()> {
task::block_on(server("127.0.0.1:8080"))
task::block_on(accept_loop("127.0.0.1:8080"))
}

fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
Expand All @@ -39,21 +39,21 @@ where
})
}

async fn server(addr: impl ToSocketAddrs) -> Result<()> {
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;

let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1
let _broker_handle = task::spawn(broker(broker_receiver));
let _broker_handle = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(client(broker_sender.clone(), stream));
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
}
Ok(())
}

async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream); // 2
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();
Expand Down Expand Up @@ -83,7 +83,7 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
Ok(())
}

async fn client_writer(
async fn connection_writer_loop(
mut messages: Receiver<String>,
stream: Arc<TcpStream>,
) -> Result<()> {
Expand All @@ -107,7 +107,7 @@ enum Event {
},
}

async fn broker(mut events: Receiver<Event>) -> Result<()> {
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
let mut peers: HashMap<String, Sender<String>> = HashMap::new();

while let Some(event) = events.next().await {
Expand All @@ -126,7 +126,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
Entry::Vacant(entry) => {
let (client_sender, client_receiver) = mpsc::unbounded();
entry.insert(client_sender); // 4
spawn_and_log_error(client_writer(client_receiver, stream)); // 5
spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
}
}
}
Expand All @@ -136,8 +136,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
}
```

1. Inside the `server`, we create the broker's channel and `task`.
2. Inside `client`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `client_writer`.
1. Inside the `accept_loop`, we create the broker's channel and `task`.
2. Inside `connection_loop`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `connection_writer_loop`.
3. On login, we notify the broker.
Note that we `.unwrap` on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well.
4. Similarly, we forward parsed messages to the broker, assuming that it is alive.
20 changes: 10 additions & 10 deletions docs/src/tutorial/clean_shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Let's add waiting to the server:
# }
#
#
# async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
# async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
# let stream = Arc::new(stream); // 2
# let reader = BufReader::new(&*stream);
# let mut lines = reader.lines();
Expand Down Expand Up @@ -83,7 +83,7 @@ Let's add waiting to the server:
# Ok(())
# }
#
# async fn client_writer(
# async fn connection_writer_loop(
# mut messages: Receiver<String>,
# stream: Arc<TcpStream>,
# ) -> Result<()> {
Expand All @@ -107,7 +107,7 @@ Let's add waiting to the server:
# },
# }
#
# async fn broker(mut events: Receiver<Event>) -> Result<()> {
# async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
# let mut peers: HashMap<String, Sender<String>> = HashMap::new();
#
# while let Some(event) = events.next().await {
Expand All @@ -126,7 +126,7 @@ Let's add waiting to the server:
# Entry::Vacant(entry) => {
# let (client_sender, client_receiver) = mpsc::unbounded();
# entry.insert(client_sender); // 4
# spawn_and_log_error(client_writer(client_receiver, stream)); // 5
# spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
# }
# }
# }
Expand All @@ -135,16 +135,16 @@ Let's add waiting to the server:
# Ok(())
# }
#
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;

let (broker_sender, broker_receiver) = mpsc::unbounded();
let broker_handle = task::spawn(broker(broker_receiver));
let broker_handle = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(client(broker_sender.clone(), stream));
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
}
drop(broker_sender); // 1
broker_handle.await?; // 5
Expand Down Expand Up @@ -175,7 +175,7 @@ And to the broker:
# type Sender<T> = mpsc::UnboundedSender<T>;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
#
# async fn client_writer(
# async fn connection_writer_loop(
# mut messages: Receiver<String>,
# stream: Arc<TcpStream>,
# ) -> Result<()> {
Expand Down Expand Up @@ -210,7 +210,7 @@ And to the broker:
# },
# }
#
async fn broker(mut events: Receiver<Event>) -> Result<()> {
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
let mut writers = Vec::new();
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
while let Some(event) = events.next().await { // 2
Expand All @@ -229,7 +229,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
Entry::Vacant(entry) => {
let (client_sender, client_receiver) = mpsc::unbounded();
entry.insert(client_sender);
let handle = spawn_and_log_error(client_writer(client_receiver, stream));
let handle = spawn_and_log_error(connection_writer_loop(client_receiver, stream));
writers.push(handle); // 4
}
}
Expand Down
8 changes: 4 additions & 4 deletions docs/src/tutorial/connecting_readers_and_writers.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

## Connecting Readers and Writers

So how do we make sure that messages read in `client` flow into the relevant `client_writer`?
So how do we make sure that messages read in `connection_loop` flow into the relevant `connection_writer_loop`?
We should somehow maintain an `peers: HashMap<String, Sender<String>>` map which allows a client to find destination channels.
However, this map would be a bit of shared mutable state, so we'll have to wrap an `RwLock` over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message.

Expand All @@ -28,7 +28,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
# type Sender<T> = mpsc::UnboundedSender<T>;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
#
# async fn client_writer(
# async fn connection_writer_loop(
# mut messages: Receiver<String>,
# stream: Arc<TcpStream>,
# ) -> Result<()> {
Expand Down Expand Up @@ -65,7 +65,7 @@ enum Event { // 1
},
}

async fn broker(mut events: Receiver<Event>) -> Result<()> {
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
let mut peers: HashMap<String, Sender<String>> = HashMap::new(); // 2

while let Some(event) = events.next().await {
Expand All @@ -84,7 +84,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
Entry::Vacant(entry) => {
let (client_sender, client_receiver) = mpsc::unbounded();
entry.insert(client_sender); // 4
spawn_and_log_error(client_writer(client_receiver, stream)); // 5
spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
}
}
}
Expand Down
26 changes: 13 additions & 13 deletions docs/src/tutorial/handling_disconnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ There's a more minimal solution however, which makes clever use of RAII.
Closing a channel is a synchronization event, so we don't need to send a shutdown message, we can just drop the sender.
This way, we statically guarantee that we issue shutdown exactly once, even if we early return via `?` or panic.

First, let's add a shutdown channel to the `client`:
First, let's add a shutdown channel to the `connection_loop`:

```rust,edition2018
# extern crate async_std;
Expand Down Expand Up @@ -47,7 +47,7 @@ enum Event {
},
}

async fn client(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()> {
async fn connection_loop(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()> {
// ...
# let name: String = unimplemented!();
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); // 3
Expand All @@ -65,7 +65,7 @@ async fn client(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()>
2. We pass the shutdown channel to the writer task
3. In the reader, we create a `_shutdown_sender` whose only purpose is to get dropped.

In the `client_writer`, we now need to choose between shutdown and message channels.
In the `connection_writer_loop`, we now need to choose between shutdown and message channels.
We use the `select` macro for this purpose:

```rust,edition2018
Expand All @@ -84,7 +84,7 @@ use futures_util::{select, FutureExt, StreamExt};
# #[derive(Debug)]
# enum Void {} // 1

async fn client_writer(
async fn connection_writer_loop(
messages: &mut Receiver<String>,
stream: Arc<TcpStream>,
shutdown: Receiver<Void>, // 1
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn client_writer(
2. Because of `select`, we can't use a `while let` loop, so we desugar it further into a `loop`.
3. In the shutdown case we use `match void {}` as a statically-checked `unreachable!()`.

Another problem is that between the moment we detect disconnection in `client_writer` and the moment when we actually remove the peer from the `peers` map, new messages might be pushed into the peer's channel.
Another problem is that between the moment we detect disconnection in `connection_writer_loop` and the moment when we actually remove the peer from the `peers` map, new messages might be pushed into the peer's channel.
To not lose these messages completely, we'll return the messages channel back to the broker.
This also allows us to establish a useful invariant that the message channel strictly outlives the peer in the `peers` map, and makes the broker itself infailable.

Expand Down Expand Up @@ -146,25 +146,25 @@ enum Void {}

// main
fn run() -> Result<()> {
task::block_on(server("127.0.0.1:8080"))
task::block_on(accept_loop("127.0.0.1:8080"))
}

async fn server(addr: impl ToSocketAddrs) -> Result<()> {
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let (broker_sender, broker_receiver) = mpsc::unbounded();
let broker_handle = task::spawn(broker(broker_receiver));
let broker_handle = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(client(broker_sender.clone(), stream));
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
}
drop(broker_sender);
broker_handle.await;
Ok(())
}

async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream);
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
Ok(())
}

async fn client_writer(
async fn connection_writer_loop(
messages: &mut Receiver<String>,
stream: Arc<TcpStream>,
shutdown: Receiver<Void>,
Expand Down Expand Up @@ -236,7 +236,7 @@ enum Event {
},
}

async fn broker(events: Receiver<Event>) {
async fn broker_loop(events: Receiver<Event>) {
let (disconnect_sender, mut disconnect_receiver) = // 1
mpsc::unbounded::<(String, Receiver<String>)>();
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
Expand Down Expand Up @@ -271,7 +271,7 @@ async fn broker(events: Receiver<Event>) {
entry.insert(client_sender);
let mut disconnect_sender = disconnect_sender.clone();
spawn_and_log_error(async move {
let res = client_writer(&mut client_receiver, stream, shutdown).await;
let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await;
disconnect_sender.send((name, client_receiver)).await // 4
.unwrap();
res
Expand Down
14 changes: 7 additions & 7 deletions docs/src/tutorial/receiving_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ We need to:
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
let _handle = task::spawn(client(stream)); // 1
let _handle = task::spawn(connection_loop(stream)); // 1
}
Ok(())
}

async fn client(stream: TcpStream) -> Result<()> {
async fn connection_loop(stream: TcpStream) -> Result<()> {
let reader = BufReader::new(&stream); // 2
let mut lines = reader.lines();

Expand All @@ -53,7 +53,7 @@ async fn client(stream: TcpStream) -> Result<()> {
```

1. We use `task::spawn` function to spawn an independent task for working with each client.
That is, after accepting the client the `server` loop immediately starts waiting for the next one.
That is, after accepting the client the `accept_loop` immediately starts waiting for the next one.
This is the core benefit of event-driven architecture: we serve many clients concurrently, without spending many hardware threads.

2. Luckily, the "split byte stream into lines" functionality is already implemented.
Expand All @@ -67,7 +67,7 @@ async fn client(stream: TcpStream) -> Result<()> {

## Managing Errors

One serious problem in the above solution is that, while we correctly propagate errors in the `client`, we just drop the error on the floor afterwards!
One serious problem in the above solution is that, while we correctly propagate errors in the `connection_loop`, we just drop the error on the floor afterwards!
That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined.
We can "fix" it by waiting for the task to be joined, like this:

Expand All @@ -83,7 +83,7 @@ We can "fix" it by waiting for the task to be joined, like this:
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
# async fn client(stream: TcpStream) -> Result<()> {
# async fn connection_loop(stream: TcpStream) -> Result<()> {
# let reader = BufReader::new(&stream); // 2
# let mut lines = reader.lines();
#
Expand All @@ -106,7 +106,7 @@ We can "fix" it by waiting for the task to be joined, like this:
# }
#
# async move |stream| {
let handle = task::spawn(client(stream));
let handle = task::spawn(connection_loop(stream));
handle.await
# };
```
Expand Down
Loading