Skip to content

Commit cd72f10

Browse files
Example with periodic task (#28)
1 parent e430fec commit cd72f10

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

examples/cron.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use restate_sdk::prelude::*;
2+
use std::time::Duration;
3+
4+
/// This example shows how to implement a periodic task, by invoking itself in a loop.
5+
///
6+
/// The `start()` handler schedules the first call to `run()`, and then each `run()` will re-schedule itself.
7+
///
8+
/// To "break" the loop, we use a flag we persist in state, which is removed when `stop()` is invoked.
9+
/// Its presence determines whether the task is active or not.
10+
///
11+
/// To start it:
12+
///
13+
/// ```shell
14+
/// $ curl -v http://localhost:8080/PeriodicTask/my-periodic-task/start
15+
/// ```
16+
#[restate_sdk::object]
17+
trait PeriodicTask {
18+
/// Schedules the periodic task to start
19+
async fn start() -> Result<(), TerminalError>;
20+
/// Stops the periodic task
21+
async fn stop() -> Result<(), TerminalError>;
22+
/// Business logic of the periodic task
23+
async fn run() -> Result<(), TerminalError>;
24+
}
25+
26+
struct PeriodicTaskImpl;
27+
28+
const ACTIVE: &str = "active";
29+
30+
impl PeriodicTask for PeriodicTaskImpl {
31+
async fn start(&self, context: ObjectContext<'_>) -> Result<(), TerminalError> {
32+
if context
33+
.get::<bool>(ACTIVE)
34+
.await?
35+
.is_some_and(|enabled| enabled)
36+
{
37+
// If it's already activated, just do nothing
38+
return Ok(());
39+
}
40+
41+
// Schedule the periodic task
42+
PeriodicTaskImpl::schedule_next(&context);
43+
44+
// Mark the periodic task as active
45+
context.set(ACTIVE, true);
46+
47+
Ok(())
48+
}
49+
50+
async fn stop(&self, context: ObjectContext<'_>) -> Result<(), TerminalError> {
51+
// Remove the active flag
52+
context.clear(ACTIVE);
53+
54+
Ok(())
55+
}
56+
57+
async fn run(&self, context: ObjectContext<'_>) -> Result<(), TerminalError> {
58+
if context.get::<bool>(ACTIVE).await?.is_none() {
59+
// Task is inactive, do nothing
60+
return Ok(());
61+
}
62+
63+
// --- Periodic task business logic!
64+
println!("Triggered the periodic task!");
65+
66+
// Schedule the periodic task
67+
PeriodicTaskImpl::schedule_next(&context);
68+
69+
Ok(())
70+
}
71+
}
72+
73+
impl PeriodicTaskImpl {
74+
fn schedule_next(context: &ObjectContext<'_>) {
75+
// To schedule, create a client to the callee handler (in this case, we're calling ourselves)
76+
context
77+
.object_client::<PeriodicTaskClient>(context.key())
78+
.run()
79+
// And send with a delay
80+
.send_with_delay(Duration::from_secs(10));
81+
}
82+
}
83+
84+
#[tokio::main]
85+
async fn main() {
86+
tracing_subscriber::fmt::init();
87+
HttpServer::new(Endpoint::builder().bind(PeriodicTaskImpl.serve()).build())
88+
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
89+
.await;
90+
}

0 commit comments

Comments
 (0)