diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index c539328..90f907a 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -333,12 +333,10 @@ pub struct BidiStreamRunner { impl BidiStreamRunner { pub async fn handle( - mut self, - mut input_rx: InputReceiver, + self, + input_rx: InputReceiver, output_tx: OutputSender, ) -> Result<(), Error> { - Self::init_loop_vm(&mut self.vm, &mut input_rx).await?; - // Retrieve the service from the Arc let svc = self .endpoint @@ -346,36 +344,59 @@ impl BidiStreamRunner { .get(&self.svc_name) .expect("service must exist at this point"); - // Initialize handler context - let (handler_state_tx, handler_state_rx) = HandlerStateNotifier::new(); - let ctx = ContextInternal::new( + handle( + input_rx, + output_tx, self.vm, self.svc_name, self.handler_name, - input_rx, - output_tx, - handler_state_tx, - ); - - // Start user code - let user_code_fut = InterceptErrorFuture::new(ctx.clone(), svc.handle(ctx.clone())); - - // Wrap it in handler state aware future - HandlerStateAwareFuture::new(ctx.clone(), handler_state_rx, user_code_fut).await + svc, + ) + .await } +} - async fn init_loop_vm(vm: &mut CoreVM, input_rx: &mut InputReceiver) -> Result<(), ErrorInner> { - while !vm.is_ready_to_execute().map_err(ErrorInner::VM)? { - match input_rx.recv().await { - Some(Ok(b)) => vm.notify_input(b), - Some(Err(e)) => vm.notify_error( - "Error when reading the body".into(), - e.to_string().into(), - None, - ), - None => vm.notify_input_closed(), - } +#[doc(hidden)] +pub async fn handle>> + Send + Sync>( + mut input_rx: InputReceiver, + output_tx: OutputSender, + vm: CoreVM, + svc_name: String, + handler_name: String, + svc: &S, +) -> Result<(), Error> { + let mut vm = vm; + init_loop_vm(&mut vm, &mut input_rx).await?; + + // Initialize handler context + let (handler_state_tx, handler_state_rx) = HandlerStateNotifier::new(); + let ctx = ContextInternal::new( + vm, + svc_name, + handler_name, + input_rx, + output_tx, + handler_state_tx, + ); + + // Start user code + let user_code_fut = InterceptErrorFuture::new(ctx.clone(), svc.handle(ctx.clone())); + + // Wrap it in handler state aware future + HandlerStateAwareFuture::new(ctx.clone(), handler_state_rx, user_code_fut).await +} + +async fn init_loop_vm(vm: &mut CoreVM, input_rx: &mut InputReceiver) -> Result<(), ErrorInner> { + while !vm.is_ready_to_execute().map_err(ErrorInner::VM)? { + match input_rx.recv().await { + Some(Ok(b)) => vm.notify_input(b), + Some(Err(e)) => vm.notify_error( + "Error when reading the body".into(), + e.to_string().into(), + None, + ), + None => vm.notify_input_closed(), } - Ok(()) } + Ok(()) }