Skip to content

Commit 8c77163

Browse files
Add headers to service to service communication (#60)
* Add headers to service to service communication * Fix little issue in docs
1 parent 65d9321 commit 8c77163

File tree

3 files changed

+40
-9
lines changed

3 files changed

+40
-9
lines changed

src/context/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,7 @@ impl<'ctx> WorkflowContext<'ctx> {
219219
///
220220
/// ## Scheduling Async Tasks
221221
///
222-
/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls][crate::context::ContextClient#delayed-calls].
223-
///
222+
/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
224223
///
225224
/// ## Durable sleep
226225
/// To sleep in a Restate application for ten seconds, do the following:

src/context/request.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub struct Request<'a, Req, Res = ()> {
7575
ctx: &'a ContextInternal,
7676
request_target: RequestTarget,
7777
idempotency_key: Option<String>,
78+
headers: Vec<(String, String)>,
7879
req: Req,
7980
res: PhantomData<Res>,
8081
}
@@ -85,11 +86,17 @@ impl<'a, Req, Res> Request<'a, Req, Res> {
8586
ctx,
8687
request_target,
8788
idempotency_key: None,
89+
headers: vec![],
8890
req,
8991
res: PhantomData,
9092
}
9193
}
9294

95+
pub fn header(mut self, key: String, value: String) -> Self {
96+
self.headers.push((key, value));
97+
self
98+
}
99+
93100
/// Add idempotency key to the request
94101
pub fn idempotency_key(mut self, idempotency_key: impl Into<String>) -> Self {
95102
self.idempotency_key = Some(idempotency_key.into());
@@ -102,17 +109,26 @@ impl<'a, Req, Res> Request<'a, Req, Res> {
102109
Req: Serialize + 'static,
103110
Res: Deserialize + 'static,
104111
{
105-
self.ctx
106-
.call(self.request_target, self.idempotency_key, self.req)
112+
self.ctx.call(
113+
self.request_target,
114+
self.idempotency_key,
115+
self.headers,
116+
self.req,
117+
)
107118
}
108119

109120
/// Send the request to the service, without waiting for the response.
110121
pub fn send(self) -> impl InvocationHandle
111122
where
112123
Req: Serialize + 'static,
113124
{
114-
self.ctx
115-
.send(self.request_target, self.idempotency_key, self.req, None)
125+
self.ctx.send(
126+
self.request_target,
127+
self.idempotency_key,
128+
self.headers,
129+
self.req,
130+
None,
131+
)
116132
}
117133

118134
/// Schedule the request to the service, without waiting for the response.
@@ -123,6 +139,7 @@ impl<'a, Req, Res> Request<'a, Req, Res> {
123139
self.ctx.send(
124140
self.request_target,
125141
self.idempotency_key,
142+
self.headers,
126143
self.req,
127144
Some(delay),
128145
)

src/endpoint/context.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use futures::future::{BoxFuture, Either, Shared};
1515
use futures::{FutureExt, TryFutureExt};
1616
use pin_project_lite::pin_project;
1717
use restate_sdk_shared_core::{
18-
CoreVM, DoProgressResponse, Error as CoreError, NonEmptyValue, NotificationHandle, RetryPolicy,
19-
RunExitResult, TakeOutputResult, Target, TerminalFailure, Value, VM,
18+
CoreVM, DoProgressResponse, Error as CoreError, Header, NonEmptyValue, NotificationHandle,
19+
RetryPolicy, RunExitResult, TakeOutputResult, Target, TerminalFailure, Value, VM,
2020
};
2121
use std::borrow::Cow;
2222
use std::collections::HashMap;
@@ -371,12 +371,20 @@ impl ContextInternal {
371371
&self,
372372
request_target: RequestTarget,
373373
idempotency_key: Option<String>,
374+
headers: Vec<(String, String)>,
374375
req: Req,
375376
) -> impl CallFuture<Response = Res> + Send {
376377
let mut inner_lock = must_lock!(self.inner);
377378

378379
let mut target: Target = request_target.into();
379380
target.idempotency_key = idempotency_key;
381+
target.headers = headers
382+
.into_iter()
383+
.map(|(k, v)| Header {
384+
key: k.into(),
385+
value: v.into(),
386+
})
387+
.collect();
380388
let call_result = Req::serialize(&req)
381389
.map_err(|e| Error::serialization("call", e))
382390
.and_then(|input| inner_lock.vm.sys_call(target, input).map_err(Into::into));
@@ -443,14 +451,21 @@ impl ContextInternal {
443451
&self,
444452
request_target: RequestTarget,
445453
idempotency_key: Option<String>,
454+
headers: Vec<(String, String)>,
446455
req: Req,
447456
delay: Option<Duration>,
448457
) -> impl InvocationHandle {
449458
let mut inner_lock = must_lock!(self.inner);
450459

451460
let mut target: Target = request_target.into();
452461
target.idempotency_key = idempotency_key;
453-
462+
target.headers = headers
463+
.into_iter()
464+
.map(|(k, v)| Header {
465+
key: k.into(),
466+
value: v.into(),
467+
})
468+
.collect();
454469
let input = match Req::serialize(&req) {
455470
Ok(b) => b,
456471
Err(e) => {

0 commit comments

Comments
 (0)