Skip to content

Commit ebca16e

Browse files
committed
Make endpoint handle function independent of other handlers
1 parent cd72f10 commit ebca16e

File tree

1 file changed

+51
-29
lines changed

1 file changed

+51
-29
lines changed

src/endpoint/mod.rs

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -333,49 +333,71 @@ pub struct BidiStreamRunner {
333333

334334
impl BidiStreamRunner {
335335
pub async fn handle(
336-
mut self,
337-
mut input_rx: InputReceiver,
336+
self,
337+
input_rx: InputReceiver,
338338
output_tx: OutputSender,
339339
) -> Result<(), Error> {
340-
Self::init_loop_vm(&mut self.vm, &mut input_rx).await?;
341-
342340
// Retrieve the service from the Arc
343341
let svc = self
344342
.endpoint
345343
.svcs
346344
.get(&self.svc_name)
347345
.expect("service must exist at this point");
348346

349-
// Initialize handler context
350-
let (handler_state_tx, handler_state_rx) = HandlerStateNotifier::new();
351-
let ctx = ContextInternal::new(
347+
handle(
348+
input_rx,
349+
output_tx,
352350
self.vm,
353351
self.svc_name,
354352
self.handler_name,
355-
input_rx,
356-
output_tx,
357-
handler_state_tx,
358-
);
359-
360-
// Start user code
361-
let user_code_fut = InterceptErrorFuture::new(ctx.clone(), svc.handle(ctx.clone()));
362-
363-
// Wrap it in handler state aware future
364-
HandlerStateAwareFuture::new(ctx.clone(), handler_state_rx, user_code_fut).await
353+
svc,
354+
)
355+
.await
365356
}
357+
}
366358

367-
async fn init_loop_vm(vm: &mut CoreVM, input_rx: &mut InputReceiver) -> Result<(), ErrorInner> {
368-
while !vm.is_ready_to_execute().map_err(ErrorInner::VM)? {
369-
match input_rx.recv().await {
370-
Some(Ok(b)) => vm.notify_input(b),
371-
Some(Err(e)) => vm.notify_error(
372-
"Error when reading the body".into(),
373-
e.to_string().into(),
374-
None,
375-
),
376-
None => vm.notify_input_closed(),
377-
}
359+
pub async fn handle<
360+
S: Service<Future = BoxFuture<'static, Result<(), Error>>> + Send + Sync + 'static,
361+
>(
362+
mut input_rx: InputReceiver,
363+
output_tx: OutputSender,
364+
vm: CoreVM,
365+
svc_name: String,
366+
handler_name: String,
367+
svc: &S,
368+
) -> Result<(), Error> {
369+
let mut vm = vm;
370+
init_loop_vm(&mut vm, &mut input_rx).await?;
371+
372+
// Initialize handler context
373+
let (handler_state_tx, handler_state_rx) = HandlerStateNotifier::new();
374+
let ctx = ContextInternal::new(
375+
vm,
376+
svc_name,
377+
handler_name,
378+
input_rx,
379+
output_tx,
380+
handler_state_tx,
381+
);
382+
383+
// Start user code
384+
let user_code_fut = InterceptErrorFuture::new(ctx.clone(), svc.handle(ctx.clone()));
385+
386+
// Wrap it in handler state aware future
387+
HandlerStateAwareFuture::new(ctx.clone(), handler_state_rx, user_code_fut).await
388+
}
389+
390+
async fn init_loop_vm(vm: &mut CoreVM, input_rx: &mut InputReceiver) -> Result<(), ErrorInner> {
391+
while !vm.is_ready_to_execute().map_err(ErrorInner::VM)? {
392+
match input_rx.recv().await {
393+
Some(Ok(b)) => vm.notify_input(b),
394+
Some(Err(e)) => vm.notify_error(
395+
"Error when reading the body".into(),
396+
e.to_string().into(),
397+
None,
398+
),
399+
None => vm.notify_input_closed(),
378400
}
379-
Ok(())
380401
}
402+
Ok(())
381403
}

0 commit comments

Comments
 (0)