Skip to content

Commit 996e3f0

Browse files
committed
Make endpoint handle function independent of other handlers
1 parent cd72f10 commit 996e3f0

File tree

1 file changed

+50
-29
lines changed

1 file changed

+50
-29
lines changed

src/endpoint/mod.rs

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -333,49 +333,70 @@ pub struct BidiStreamRunner {
333333

334334
impl BidiStreamRunner {
335335
pub async fn handle(
336-
mut self,
337-
mut input_rx: InputReceiver,
336+
self,
337+
input_rx: InputReceiver,
338338
output_tx: OutputSender,
339339
) -> Result<(), Error> {
340-
Self::init_loop_vm(&mut self.vm, &mut input_rx).await?;
341-
342340
// Retrieve the service from the Arc
343341
let svc = self
344342
.endpoint
345343
.svcs
346344
.get(&self.svc_name)
347345
.expect("service must exist at this point");
348346

349-
// Initialize handler context
350-
let (handler_state_tx, handler_state_rx) = HandlerStateNotifier::new();
351-
let ctx = ContextInternal::new(
347+
handle(
348+
input_rx,
349+
output_tx,
352350
self.vm,
353351
self.svc_name,
354352
self.handler_name,
355-
input_rx,
356-
output_tx,
357-
handler_state_tx,
358-
);
359-
360-
// Start user code
361-
let user_code_fut = InterceptErrorFuture::new(ctx.clone(), svc.handle(ctx.clone()));
362-
363-
// Wrap it in handler state aware future
364-
HandlerStateAwareFuture::new(ctx.clone(), handler_state_rx, user_code_fut).await
353+
svc,
354+
)
355+
.await
365356
}
357+
}
366358

367-
async fn init_loop_vm(vm: &mut CoreVM, input_rx: &mut InputReceiver) -> Result<(), ErrorInner> {
368-
while !vm.is_ready_to_execute().map_err(ErrorInner::VM)? {
369-
match input_rx.recv().await {
370-
Some(Ok(b)) => vm.notify_input(b),
371-
Some(Err(e)) => vm.notify_error(
372-
"Error when reading the body".into(),
373-
e.to_string().into(),
374-
None,
375-
),
376-
None => vm.notify_input_closed(),
377-
}
359+
#[doc(hidden)]
360+
pub async fn handle<S: Service<Future = BoxFuture<'static, Result<(), Error>>> + Send + Sync>(
361+
mut input_rx: InputReceiver,
362+
output_tx: OutputSender,
363+
vm: CoreVM,
364+
svc_name: String,
365+
handler_name: String,
366+
svc: &S,
367+
) -> Result<(), Error> {
368+
let mut vm = vm;
369+
init_loop_vm(&mut vm, &mut input_rx).await?;
370+
371+
// Initialize handler context
372+
let (handler_state_tx, handler_state_rx) = HandlerStateNotifier::new();
373+
let ctx = ContextInternal::new(
374+
vm,
375+
svc_name,
376+
handler_name,
377+
input_rx,
378+
output_tx,
379+
handler_state_tx,
380+
);
381+
382+
// Start user code
383+
let user_code_fut = InterceptErrorFuture::new(ctx.clone(), svc.handle(ctx.clone()));
384+
385+
// Wrap it in handler state aware future
386+
HandlerStateAwareFuture::new(ctx.clone(), handler_state_rx, user_code_fut).await
387+
}
388+
389+
async fn init_loop_vm(vm: &mut CoreVM, input_rx: &mut InputReceiver) -> Result<(), ErrorInner> {
390+
while !vm.is_ready_to_execute().map_err(ErrorInner::VM)? {
391+
match input_rx.recv().await {
392+
Some(Ok(b)) => vm.notify_input(b),
393+
Some(Err(e)) => vm.notify_error(
394+
"Error when reading the body".into(),
395+
e.to_string().into(),
396+
None,
397+
),
398+
None => vm.notify_input_closed(),
378399
}
379-
Ok(())
380400
}
401+
Ok(())
381402
}

0 commit comments

Comments
 (0)