Skip to content

Commit 9507ce0

Browse files
authored
Split for_each in sync and async modes (#76)
* Split for_each in sync and async ways * Updated readme version 0.13 * Added debug to NodeEvent when Signal implement it.
1 parent 3398591 commit 9507ce0

File tree

6 files changed

+79
-35
lines changed

6 files changed

+79
-35
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## Release 0.13.0
4+
- Updated `NodeListener::for_each` to works fully synchronous. `Send` trait of the event callback parameter has been removed to works fine with objects like `Rc` `Cell` or references.
5+
This function no longer returns a `NodeTask`.
6+
- New method `NodeListener::for_each_async` has been added to support the previous behaviour of
7+
`for_each`.
8+
- Added `Debug` to `NodeEvent`.
9+
310
## Release 0.12.2
411
- Reduced *WebSocket* latency.
512

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "message-io"
3-
version = "0.12.2"
3+
version = "0.13.0"
44
authors = ["lemunozm <lemunozm@gmail.com>"]
55
edition = "2018"
66
readme = "README.md"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ You could change the transport of your application in literally one line.
6666
Add to your `Cargo.toml` (all the transports included by default):
6767
```toml
6868
[dependencies]
69-
message-io = "0.12"
69+
message-io = "0.13"
7070
```
7171
If you **only** want to use a subset of the available transport battery,
7272
you can select them by their associated features `tcp`, `udp`, and `websocket`.
7373
For example, in order to include only *TCP* and *UDP*, add to your `Cargo.toml`:
7474
```toml
7575
[dependencies]
76-
message-io = { version = "0.12", default-features = false, features = ["tcp", "udp"] }
76+
message-io = { version = "0.13", default-features = false, features = ["tcp", "udp"] }
7777
```
7878

7979
**Warning**: Version **0.12** comes with important API changes ([changelog](CHANGELOG.md))

examples/throughput/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn throughput_message_io(transport: Transport, packet_size: usize) {
5555
let mut received_bytes = 0;
5656
let handler = handler.clone();
5757

58-
listener.for_each(move |event| match event.network() {
58+
listener.for_each_async(move |event| match event.network() {
5959
NetEvent::Connected(_, _) => {
6060
t_ready.send(()).unwrap();
6161
}

src/node.rs

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ lazy_static::lazy_static! {
1313
static ref SAMPLING_TIMEOUT: Duration = Duration::from_millis(50);
1414
}
1515

16-
/// Event returned by [`NodeListener::for_each()`] when some network or signal is received.
16+
/// Event returned by [`NodeListener::for_each()`] and [`NodeListener::for_each_async()`]
17+
/// when some network event or signal is received.
1718
pub enum NodeEvent<'a, S> {
1819
/// The `NodeEvent` is an event that comes from the network.
1920
/// See [`NetEvent`] to know about the different network events.
@@ -26,6 +27,15 @@ pub enum NodeEvent<'a, S> {
2627
Signal(S),
2728
}
2829

30+
impl<'a, S: std::fmt::Debug> std::fmt::Debug for NodeEvent<'a, S> {
31+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32+
match self {
33+
NodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({:?})", net_event),
34+
NodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({:?})", signal),
35+
}
36+
}
37+
}
38+
2939
impl<'a, S> NodeEvent<'a, S> {
3040
/// Assume the event is a [`NodeEvent::Network`], panics if not.
3141
pub fn network(self) -> NetEvent<'a> {
@@ -171,6 +181,19 @@ impl StoredNetEvent {
171181
}
172182
}
173183

184+
struct SendableEventCallback<S>(Arc<Mutex<dyn FnMut(NodeEvent<S>)>>);
185+
186+
// This struct is used to allow passing no Sendable objects into the listener jobs.
187+
// Although it is unsafe, it is safely handled by the for_each / for_each_async functions.
188+
// (see its internal comments)
189+
unsafe impl<S> Send for SendableEventCallback<S> {}
190+
191+
impl<S> Clone for SendableEventCallback<S> {
192+
fn clone(&self) -> Self {
193+
Self(self.0.clone())
194+
}
195+
}
196+
174197
/// Main entity to manipulates the network and signal events easily.
175198
/// The node run asynchronously.
176199
pub struct NodeListener<S: Send + 'static> {
@@ -209,15 +232,10 @@ impl<S: Send + 'static> NodeListener<S> {
209232

210233
/// Iterate indefinitely over all generated `NetEvent`.
211234
/// This function will work until [`NodeHandler::stop`] was called.
212-
/// A `NodeTask` representing the asynchronous job is returned.
213-
/// Destroying this object will result in blocking the current thread until
214-
/// [`NodeHandler::stop`] was called.
215235
///
216-
/// In order to allow the node working asynchronously, you can move the `NodeTask` to a
217-
/// an object with a longer lifetime.
218-
///
219-
/// # Examples
220-
/// **Synchronous** usage:
236+
/// Note that any events generated before calling this function (e.g. some connection was done)
237+
/// will be storage and offered once you call `for_each()`.
238+
/// # Example
221239
/// ```
222240
/// use message_io::node::{self, NodeEvent};
223241
/// use message_io::network::Transport;
@@ -230,12 +248,27 @@ impl<S: Send + 'static> NodeListener<S> {
230248
/// NodeEvent::Network(net_event) => { /* Your logic here */ },
231249
/// NodeEvent::Signal(_) => handler.stop(),
232250
/// });
233-
/// // Blocked here until handler.stop() was called (1 sec) because the returned value
234-
/// // of for_each() is not used (it is dropped just after called the method).
251+
/// // Blocked here until handler.stop() was called (1 sec).
235252
/// println!("Node is stopped");
236253
/// ```
254+
pub fn for_each(self, event_callback: impl FnMut(NodeEvent<S>) + 'static) {
255+
let sendable_callback = SendableEventCallback(Arc::new(Mutex::new(event_callback)));
256+
let mut task = self.for_each_impl(sendable_callback);
257+
258+
// Although the event_callback is not sync, we ensure with this wait() that no more events
259+
// will be processed when the control is returned to the user.
260+
task.wait();
261+
}
262+
263+
/// Similar to [`NodeListener::for_each()`] but it returns the control to the user
264+
/// after call it. The events would be processed asynchronously.
265+
/// A `NodeTask` representing this asynchronous job is returned.
266+
/// Destroying this object will result in blocking the current thread until
267+
/// [`NodeHandler::stop`] was called.
268+
///
269+
/// In order to allow the node working asynchronously, you can move the `NodeTask` to a
270+
/// an object with a longer lifetime.
237271
///
238-
/// **Asynchronous** usage:
239272
/// ```
240273
/// use message_io::node::{self, NodeEvent};
241274
/// use message_io::network::Transport;
@@ -248,32 +281,36 @@ impl<S: Send + 'static> NodeListener<S> {
248281
/// NodeEvent::Network(net_event) => { /* Your logic here */ },
249282
/// NodeEvent::Signal(_) => handler.stop(),
250283
/// });
251-
/// // for_each() will act asynchronous during 'task' lifetime.
284+
/// // for_each_async() will act asynchronous during 'task' lifetime.
252285
///
253286
/// // ...
254287
/// println!("Node is running");
255288
/// // ...
256289
///
257290
/// drop(task); // Blocked here until handler.stop() was called (1 sec).
258-
/// //also task.wait(); can be called doing the same (but taking a mutable reference).
291+
/// // Also task.wait(); can be called doing the same (but taking a mutable reference).
259292
///
260293
/// println!("Node is stopped");
261294
/// ```
262-
/// Note that any events generated before calling this function will be storage
263-
/// and offered once you call `for_each()`.
264-
pub fn for_each(
265-
mut self,
295+
pub fn for_each_async(
296+
self,
266297
event_callback: impl FnMut(NodeEvent<S>) + Send + 'static,
267298
) -> NodeTask {
299+
let sendable_callback = SendableEventCallback(Arc::new(Mutex::new(event_callback)));
300+
301+
// The signature of this functions add the `Send` to the `event_callback` that
302+
// `SendableEventCallback` removed, so the usage is safe.
303+
self.for_each_impl(sendable_callback)
304+
}
305+
306+
fn for_each_impl(mut self, multiplexed: SendableEventCallback<S>) -> NodeTask {
268307
// Stop cache events
269308
self.cache_running.store(false, Ordering::Relaxed);
270309
let (mut network_processor, mut cache) = self.network_cache_thread.join();
271310

272-
let multiplexed = Arc::new(Mutex::new(event_callback));
273-
274311
// To avoid processing stops while the node is configuring,
275312
// the user callback locked until the function ends.
276-
let _locked = multiplexed.lock().expect(OTHER_THREAD_ERR);
313+
let _locked = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
277314

278315
let network_thread = {
279316
let multiplexed = multiplexed.clone();
@@ -282,7 +319,7 @@ impl<S: Send + 'static> NodeListener<S> {
282319
NamespacedThread::spawn("node-network-thread", move || {
283320
// Dispatch the catched events first.
284321
while let Some(event) = cache.pop_front() {
285-
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
322+
let mut event_callback = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
286323
let net_event = event.borrow();
287324
log::trace!("Read from cache {:?}", net_event);
288325
event_callback(NodeEvent::Network(net_event));
@@ -293,7 +330,7 @@ impl<S: Send + 'static> NodeListener<S> {
293330

294331
while running.load(Ordering::Relaxed) {
295332
network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
296-
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
333+
let mut event_callback = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
297334
if running.load(Ordering::Relaxed) {
298335
event_callback(NodeEvent::Network(net_event));
299336
}
@@ -310,7 +347,7 @@ impl<S: Send + 'static> NodeListener<S> {
310347
NamespacedThread::spawn("node-signal-thread", move || {
311348
while running.load(Ordering::Relaxed) {
312349
if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT) {
313-
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
350+
let mut event_callback = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
314351
if running.load(Ordering::Relaxed) {
315352
event_callback(NodeEvent::Signal(signal));
316353
}
@@ -329,11 +366,11 @@ impl<S: Send + 'static> Drop for NodeListener<S> {
329366
}
330367
}
331368

332-
/// Entity used to ensure the lifetime of [`NodeListener::for_each()`] call.
369+
/// Entity used to ensure the lifetime of [`NodeListener::for_each_async()`] call.
333370
/// The node will process events asynchronously while this entity lives.
334371
/// The destruction of this entity will block until the task is finished.
335-
/// If you want to "unblock" the thread that drops this entity call to:
336-
/// [`NodeHandler::stop()`]
372+
/// If you want to "unblock" the thread that drops this entity call to
373+
/// [`NodeHandler::stop()`] before or from another thread.
337374
pub struct NodeTask {
338375
network_thread: NamespacedThread<()>,
339376
signal_thread: NamespacedThread<()>,
@@ -385,7 +422,7 @@ mod tests {
385422
let checked = Arc::new(AtomicBool::new(false));
386423
let inner_checked = checked.clone();
387424
let inner_handler = handler.clone();
388-
let _node_task = listener.for_each(move |event| match event.signal() {
425+
let _node_task = listener.for_each_async(move |event| match event.signal() {
389426
"stop" => inner_handler.stop(),
390427
"check" => inner_checked.store(true, Ordering::Relaxed),
391428
_ => unreachable!(),
@@ -406,7 +443,7 @@ mod tests {
406443
handler.signals().send_with_timer((), Duration::from_millis(1000));
407444

408445
let inner_handler = handler.clone();
409-
listener.for_each(move |_| inner_handler.stop()).wait();
446+
listener.for_each_async(move |_| inner_handler.stop()).wait();
410447

411448
assert!(!handler.is_running());
412449
}
@@ -418,7 +455,7 @@ mod tests {
418455
handler.signals().send_with_timer((), Duration::from_millis(1000));
419456

420457
let inner_handler = handler.clone();
421-
let mut task = listener.for_each(move |_| inner_handler.stop());
458+
let mut task = listener.for_each_async(move |_| inner_handler.stop());
422459
assert!(handler.is_running());
423460
task.wait();
424461
assert!(!handler.is_running());

0 commit comments

Comments
 (0)