Skip to content

Commit dea3586

Browse files
committed
move a-chat tutorial's code to this repo
1 parent dde4b89 commit dea3586

File tree

6 files changed

+238
-2
lines changed

6 files changed

+238
-2
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ matrix:
4242
script:
4343
- cargo check --features unstable --all --benches --bins --examples --tests
4444
- cargo test --features unstable --all
45+
- cd examples/a-chat && cargo check

docs/src/tutorial/index.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ How do you distribute the messages?
88

99
In this tutorial, we will show you how to write one in `async-std`.
1010

11-
You can also find the tutorial in [our repository](https://github.com/async-rs/a-chat).
12-
11+
You can also find the tutorial in [our repository](https://github.com/async-rs/async-std/blob/master/examples/a-chat.rs).

examples/a-chat/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
**/*.rs.bk

examples/a-chat/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "a-chat"
3+
version = "0.1.0"
4+
authors = ["Aleksey Kladov <aleksey.kladov@gmail.com>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
futures-preview = { version = "0.3.0-alpha.17", features = [ "async-await", "nightly" ] }
9+
async-std = { version = "0.99", path = "../../" }

examples/a-chat/src/bin/client.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::net::ToSocketAddrs;
2+
3+
use futures::select;
4+
use futures::FutureExt;
5+
6+
use async_std::{
7+
prelude::*,
8+
net::TcpStream,
9+
task,
10+
io::{stdin, BufReader},
11+
};
12+
13+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
14+
15+
16+
fn main() -> Result<()> {
17+
task::block_on(try_main("127.0.0.1:8080"))
18+
}
19+
20+
async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
21+
let stream = TcpStream::connect(addr).await?;
22+
let (reader, mut writer) = (&stream, &stream);
23+
let reader = BufReader::new(reader);
24+
let mut lines_from_server = futures::StreamExt::fuse(reader.lines());
25+
26+
let stdin = BufReader::new(stdin());
27+
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines());
28+
loop {
29+
select! {
30+
line = lines_from_server.next().fuse() => match line {
31+
Some(line) => {
32+
let line = line?;
33+
println!("{}", line);
34+
},
35+
None => break,
36+
},
37+
line = lines_from_stdin.next().fuse() => match line {
38+
Some(line) => {
39+
let line = line?;
40+
writer.write_all(line.as_bytes()).await?;
41+
writer.write_all(b"\n").await?;
42+
}
43+
None => break,
44+
}
45+
}
46+
}
47+
Ok(())
48+
}

examples/a-chat/src/bin/server.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use std::{
2+
net::ToSocketAddrs,
3+
sync::Arc,
4+
collections::hash_map::{HashMap, Entry},
5+
};
6+
7+
use futures::{
8+
channel::mpsc,
9+
SinkExt,
10+
FutureExt,
11+
select,
12+
};
13+
14+
use async_std::{
15+
io::BufReader,
16+
prelude::*,
17+
task,
18+
net::{TcpListener, TcpStream},
19+
};
20+
21+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
22+
type Sender<T> = mpsc::UnboundedSender<T>;
23+
type Receiver<T> = mpsc::UnboundedReceiver<T>;
24+
25+
#[derive(Debug)]
26+
enum Void {}
27+
28+
fn main() -> Result<()> {
29+
task::block_on(accept_loop("127.0.0.1:8080"))
30+
}
31+
32+
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
33+
let listener = TcpListener::bind(addr).await?;
34+
35+
let (broker_sender, broker_receiver) = mpsc::unbounded();
36+
let broker = task::spawn(broker_loop(broker_receiver));
37+
let mut incoming = listener.incoming();
38+
while let Some(stream) = incoming.next().await {
39+
let stream = stream?;
40+
println!("Accepting from: {}", stream.peer_addr()?);
41+
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
42+
}
43+
drop(broker_sender);
44+
broker.await;
45+
Ok(())
46+
}
47+
48+
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
49+
let stream = Arc::new(stream);
50+
let reader = BufReader::new(&*stream);
51+
let mut lines = reader.lines();
52+
53+
let name = match lines.next().await {
54+
None => Err("peer disconnected immediately")?,
55+
Some(line) => line?,
56+
};
57+
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
58+
broker.send(Event::NewPeer {
59+
name: name.clone(),
60+
stream: Arc::clone(&stream),
61+
shutdown: shutdown_receiver,
62+
}).await.unwrap();
63+
64+
while let Some(line) = lines.next().await {
65+
let line = line?;
66+
let (dest, msg) = match line.find(':') {
67+
None => continue,
68+
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
69+
};
70+
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
71+
let msg: String = msg.trim().to_string();
72+
73+
broker.send(Event::Message {
74+
from: name.clone(),
75+
to: dest,
76+
msg,
77+
}).await.unwrap();
78+
}
79+
80+
Ok(())
81+
}
82+
83+
async fn connection_writer_loop(
84+
messages: &mut Receiver<String>,
85+
stream: Arc<TcpStream>,
86+
mut shutdown: Receiver<Void>,
87+
) -> Result<()> {
88+
let mut stream = &*stream;
89+
loop {
90+
select! {
91+
msg = messages.next().fuse() => match msg {
92+
Some(msg) => stream.write_all(msg.as_bytes()).await?,
93+
None => break,
94+
},
95+
void = shutdown.next().fuse() => match void {
96+
Some(void) => match void {},
97+
None => break,
98+
}
99+
}
100+
}
101+
Ok(())
102+
}
103+
104+
#[derive(Debug)]
105+
enum Event {
106+
NewPeer {
107+
name: String,
108+
stream: Arc<TcpStream>,
109+
shutdown: Receiver<Void>,
110+
},
111+
Message {
112+
from: String,
113+
to: Vec<String>,
114+
msg: String,
115+
},
116+
}
117+
118+
async fn broker_loop(mut events: Receiver<Event>) {
119+
let (disconnect_sender, mut disconnect_receiver) =
120+
mpsc::unbounded::<(String, Receiver<String>)>();
121+
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
122+
123+
loop {
124+
let event = select! {
125+
event = events.next().fuse() => match event {
126+
None => break,
127+
Some(event) => event,
128+
},
129+
disconnect = disconnect_receiver.next().fuse() => {
130+
let (name, _pending_messages) = disconnect.unwrap();
131+
assert!(peers.remove(&name).is_some());
132+
continue;
133+
},
134+
};
135+
match event {
136+
Event::Message { from, to, msg } => {
137+
for addr in to {
138+
if let Some(peer) = peers.get_mut(&addr) {
139+
peer.send(format!("from {}: {}\n", from, msg)).await
140+
.unwrap()
141+
}
142+
}
143+
}
144+
Event::NewPeer { name, stream, shutdown } => {
145+
match peers.entry(name.clone()) {
146+
Entry::Occupied(..) => (),
147+
Entry::Vacant(entry) => {
148+
let (client_sender, mut client_receiver) = mpsc::unbounded();
149+
entry.insert(client_sender);
150+
let mut disconnect_sender = disconnect_sender.clone();
151+
spawn_and_log_error(async move {
152+
let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await;
153+
disconnect_sender.send((name, client_receiver)).await
154+
.unwrap();
155+
res
156+
});
157+
}
158+
}
159+
}
160+
}
161+
}
162+
drop(peers);
163+
drop(disconnect_sender);
164+
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
165+
}
166+
}
167+
168+
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
169+
where
170+
F: Future<Output = Result<()>> + Send + 'static,
171+
{
172+
task::spawn(async move {
173+
if let Err(e) = fut.await {
174+
eprintln!("{}", e)
175+
}
176+
})
177+
}

0 commit comments

Comments
 (0)