@@ -333,49 +333,72 @@ pub struct BidiStreamRunner {
333
333
334
334
impl BidiStreamRunner {
335
335
pub async fn handle (
336
- mut self ,
337
- mut input_rx : InputReceiver ,
336
+ self ,
337
+ input_rx : InputReceiver ,
338
338
output_tx : OutputSender ,
339
339
) -> Result < ( ) , Error > {
340
- Self :: init_loop_vm ( & mut self . vm , & mut input_rx) . await ?;
341
-
342
340
// Retrieve the service from the Arc
343
341
let svc = self
344
342
. endpoint
345
343
. svcs
346
344
. get ( & self . svc_name )
347
345
. expect ( "service must exist at this point" ) ;
348
346
349
- // Initialize handler context
350
- let ( handler_state_tx , handler_state_rx ) = HandlerStateNotifier :: new ( ) ;
351
- let ctx = ContextInternal :: new (
347
+ handle (
348
+ input_rx ,
349
+ output_tx ,
352
350
self . vm ,
353
351
self . svc_name ,
354
352
self . handler_name ,
355
- input_rx,
356
- output_tx,
357
- handler_state_tx,
358
- ) ;
359
-
360
- // Start user code
361
- let user_code_fut = InterceptErrorFuture :: new ( ctx. clone ( ) , svc. handle ( ctx. clone ( ) ) ) ;
362
-
363
- // Wrap it in handler state aware future
364
- HandlerStateAwareFuture :: new ( ctx. clone ( ) , handler_state_rx, user_code_fut) . await
353
+ svc,
354
+ )
355
+ . await
365
356
}
357
+ }
366
358
367
- async fn init_loop_vm ( vm : & mut CoreVM , input_rx : & mut InputReceiver ) -> Result < ( ) , ErrorInner > {
368
- while !vm. is_ready_to_execute ( ) . map_err ( ErrorInner :: VM ) ? {
369
- match input_rx. recv ( ) . await {
370
- Some ( Ok ( b) ) => vm. notify_input ( b) ,
371
- Some ( Err ( e) ) => vm. notify_error (
372
- "Error when reading the body" . into ( ) ,
373
- e. to_string ( ) . into ( ) ,
374
- None ,
375
- ) ,
376
- None => vm. notify_input_closed ( ) ,
377
- }
359
+ #[ doc( hidden) ]
360
+ pub async fn handle <
361
+ S : Service < Future = BoxFuture < ' static , Result < ( ) , Error > > > + Send + Sync ,
362
+ > (
363
+ mut input_rx : InputReceiver ,
364
+ output_tx : OutputSender ,
365
+ vm : CoreVM ,
366
+ svc_name : String ,
367
+ handler_name : String ,
368
+ svc : & S ,
369
+ ) -> Result < ( ) , Error > {
370
+ let mut vm = vm;
371
+ init_loop_vm ( & mut vm, & mut input_rx) . await ?;
372
+
373
+ // Initialize handler context
374
+ let ( handler_state_tx, handler_state_rx) = HandlerStateNotifier :: new ( ) ;
375
+ let ctx = ContextInternal :: new (
376
+ vm,
377
+ svc_name,
378
+ handler_name,
379
+ input_rx,
380
+ output_tx,
381
+ handler_state_tx,
382
+ ) ;
383
+
384
+ // Start user code
385
+ let user_code_fut = InterceptErrorFuture :: new ( ctx. clone ( ) , svc. handle ( ctx. clone ( ) ) ) ;
386
+
387
+ // Wrap it in handler state aware future
388
+ HandlerStateAwareFuture :: new ( ctx. clone ( ) , handler_state_rx, user_code_fut) . await
389
+ }
390
+
391
+ async fn init_loop_vm ( vm : & mut CoreVM , input_rx : & mut InputReceiver ) -> Result < ( ) , ErrorInner > {
392
+ while !vm. is_ready_to_execute ( ) . map_err ( ErrorInner :: VM ) ? {
393
+ match input_rx. recv ( ) . await {
394
+ Some ( Ok ( b) ) => vm. notify_input ( b) ,
395
+ Some ( Err ( e) ) => vm. notify_error (
396
+ "Error when reading the body" . into ( ) ,
397
+ e. to_string ( ) . into ( ) ,
398
+ None ,
399
+ ) ,
400
+ None => vm. notify_input_closed ( ) ,
378
401
}
379
- Ok ( ( ) )
380
402
}
403
+ Ok ( ( ) )
381
404
}
0 commit comments