Skip to content

Commit 0ee49d6

Browse files
committed
move a-chat tutorial's code to this repo
1 parent dde4b89 commit 0ee49d6

File tree

4 files changed

+247
-2
lines changed

4 files changed

+247
-2
lines changed

docs/src/tutorial/index.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ How do you distribute the messages?
88

99
In this tutorial, we will show you how to write one in `async-std`.
1010

11-
You can also find the tutorial in [our repository](https://github.com/async-rs/a-chat).
12-
11+
You can also find the tutorial in [our repository](https://github.com/async-rs/async-std/blob/master/examples/a-chat).

examples/a-chat/client.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::net::ToSocketAddrs;
2+
3+
use futures::select;
4+
use futures::FutureExt;
5+
6+
use async_std::{
7+
io::{stdin, BufReader},
8+
net::TcpStream,
9+
prelude::*,
10+
task,
11+
};
12+
13+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
14+
15+
pub(crate) fn main() -> Result<()> {
16+
task::block_on(try_main("127.0.0.1:8080"))
17+
}
18+
19+
async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
20+
let stream = TcpStream::connect(addr).await?;
21+
let (reader, mut writer) = (&stream, &stream);
22+
let reader = BufReader::new(reader);
23+
let mut lines_from_server = futures::StreamExt::fuse(reader.lines());
24+
25+
let stdin = BufReader::new(stdin());
26+
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines());
27+
loop {
28+
select! {
29+
line = lines_from_server.next().fuse() => match line {
30+
Some(line) => {
31+
let line = line?;
32+
println!("{}", line);
33+
},
34+
None => break,
35+
},
36+
line = lines_from_stdin.next().fuse() => match line {
37+
Some(line) => {
38+
let line = line?;
39+
writer.write_all(line.as_bytes()).await?;
40+
writer.write_all(b"\n").await?;
41+
}
42+
None => break,
43+
}
44+
}
45+
}
46+
Ok(())
47+
}

examples/a-chat/main.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
mod client;
2+
mod server;
3+
4+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
5+
6+
fn main() -> Result<()> {
7+
let mut args = std::env::args();
8+
match (args.nth(1).as_ref().map(String::as_str), args.next()) {
9+
(Some("client"), None) => client::main(),
10+
(Some("server"), None) => server::main(),
11+
_ => Err("Usage: a-chat [client|server]")?,
12+
}
13+
}

examples/a-chat/server.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use std::{
2+
collections::hash_map::{Entry, HashMap},
3+
net::ToSocketAddrs,
4+
sync::Arc,
5+
};
6+
7+
use futures::{channel::mpsc, select, FutureExt, SinkExt};
8+
9+
use async_std::{
10+
io::BufReader,
11+
net::{TcpListener, TcpStream},
12+
prelude::*,
13+
task,
14+
};
15+
16+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
17+
type Sender<T> = mpsc::UnboundedSender<T>;
18+
type Receiver<T> = mpsc::UnboundedReceiver<T>;
19+
20+
#[derive(Debug)]
21+
enum Void {}
22+
23+
pub(crate) fn main() -> Result<()> {
24+
task::block_on(accept_loop("127.0.0.1:8080"))
25+
}
26+
27+
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
28+
let listener = TcpListener::bind(addr).await?;
29+
30+
let (broker_sender, broker_receiver) = mpsc::unbounded();
31+
let broker = task::spawn(broker_loop(broker_receiver));
32+
let mut incoming = listener.incoming();
33+
while let Some(stream) = incoming.next().await {
34+
let stream = stream?;
35+
println!("Accepting from: {}", stream.peer_addr()?);
36+
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
37+
}
38+
drop(broker_sender);
39+
broker.await;
40+
Ok(())
41+
}
42+
43+
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
44+
let stream = Arc::new(stream);
45+
let reader = BufReader::new(&*stream);
46+
let mut lines = reader.lines();
47+
48+
let name = match lines.next().await {
49+
None => Err("peer disconnected immediately")?,
50+
Some(line) => line?,
51+
};
52+
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
53+
broker
54+
.send(Event::NewPeer {
55+
name: name.clone(),
56+
stream: Arc::clone(&stream),
57+
shutdown: shutdown_receiver,
58+
})
59+
.await
60+
.unwrap();
61+
62+
while let Some(line) = lines.next().await {
63+
let line = line?;
64+
let (dest, msg) = match line.find(':') {
65+
None => continue,
66+
Some(idx) => (&line[..idx], line[idx + 1..].trim()),
67+
};
68+
let dest: Vec<String> = dest
69+
.split(',')
70+
.map(|name| name.trim().to_string())
71+
.collect();
72+
let msg: String = msg.trim().to_string();
73+
74+
broker
75+
.send(Event::Message {
76+
from: name.clone(),
77+
to: dest,
78+
msg,
79+
})
80+
.await
81+
.unwrap();
82+
}
83+
84+
Ok(())
85+
}
86+
87+
async fn connection_writer_loop(
88+
messages: &mut Receiver<String>,
89+
stream: Arc<TcpStream>,
90+
mut shutdown: Receiver<Void>,
91+
) -> Result<()> {
92+
let mut stream = &*stream;
93+
loop {
94+
select! {
95+
msg = messages.next().fuse() => match msg {
96+
Some(msg) => stream.write_all(msg.as_bytes()).await?,
97+
None => break,
98+
},
99+
void = shutdown.next().fuse() => match void {
100+
Some(void) => match void {},
101+
None => break,
102+
}
103+
}
104+
}
105+
Ok(())
106+
}
107+
108+
#[derive(Debug)]
109+
enum Event {
110+
NewPeer {
111+
name: String,
112+
stream: Arc<TcpStream>,
113+
shutdown: Receiver<Void>,
114+
},
115+
Message {
116+
from: String,
117+
to: Vec<String>,
118+
msg: String,
119+
},
120+
}
121+
122+
async fn broker_loop(mut events: Receiver<Event>) {
123+
let (disconnect_sender, mut disconnect_receiver) =
124+
mpsc::unbounded::<(String, Receiver<String>)>();
125+
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
126+
127+
loop {
128+
let event = select! {
129+
event = events.next().fuse() => match event {
130+
None => break,
131+
Some(event) => event,
132+
},
133+
disconnect = disconnect_receiver.next().fuse() => {
134+
let (name, _pending_messages) = disconnect.unwrap();
135+
assert!(peers.remove(&name).is_some());
136+
continue;
137+
},
138+
};
139+
match event {
140+
Event::Message { from, to, msg } => {
141+
for addr in to {
142+
if let Some(peer) = peers.get_mut(&addr) {
143+
peer.send(format!("from {}: {}\n", from, msg))
144+
.await
145+
.unwrap()
146+
}
147+
}
148+
}
149+
Event::NewPeer {
150+
name,
151+
stream,
152+
shutdown,
153+
} => match peers.entry(name.clone()) {
154+
Entry::Occupied(..) => (),
155+
Entry::Vacant(entry) => {
156+
let (client_sender, mut client_receiver) = mpsc::unbounded();
157+
entry.insert(client_sender);
158+
let mut disconnect_sender = disconnect_sender.clone();
159+
spawn_and_log_error(async move {
160+
let res =
161+
connection_writer_loop(&mut client_receiver, stream, shutdown).await;
162+
disconnect_sender
163+
.send((name, client_receiver))
164+
.await
165+
.unwrap();
166+
res
167+
});
168+
}
169+
},
170+
}
171+
}
172+
drop(peers);
173+
drop(disconnect_sender);
174+
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {}
175+
}
176+
177+
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
178+
where
179+
F: Future<Output = Result<()>> + Send + 'static,
180+
{
181+
task::spawn(async move {
182+
if let Err(e) = fut.await {
183+
eprintln!("{}", e)
184+
}
185+
})
186+
}

0 commit comments

Comments
 (0)