@@ -3,7 +3,8 @@ import { createActivitySet } from '../../core';
3
3
import { createAtomActivityFromHandler } from '../atom-activity' ;
4
4
import { createParallelActivity } from './parallel-activity' ;
5
5
import { createWorkflowMachineBuilder } from '../../workflow-machine-builder' ;
6
- import { branchName } from '../results' ;
6
+ import { branchName , interrupt } from '../results' ;
7
+ import { ParallelActivityHandler } from './types' ;
7
8
8
9
interface ParallelTestGlobalState {
9
10
logger : string ;
@@ -78,22 +79,34 @@ function createDefinition0(activeBranchNames: string[]) {
78
79
} ;
79
80
}
80
81
81
- function createTest ( definition : Definition ) {
82
+ function createTest (
83
+ definition : Definition ,
84
+ customParallelActivityHandler ?: ParallelActivityHandler < ParallelStep , ParallelTestGlobalState , unknown >
85
+ ) {
82
86
const activitySet = createActivitySet < ParallelTestGlobalState > ( [
83
87
createAtomActivityFromHandler < LogStep , ParallelTestGlobalState > ( 'log' , async ( step , globalState ) => {
84
88
globalState . logger += `;${ step . id } ;` ;
85
- const delay = Math . ceil ( 20 * Math . random ( ) ) ;
86
- await new Promise ( resolve => setTimeout ( resolve , delay ) ) ;
89
+ const delay = Math . ceil ( 10 * Math . random ( ) ) ;
90
+ await sleep ( delay ) ;
87
91
} ) ,
88
92
createAtomActivityFromHandler < JobStep , ParallelTestGlobalState > ( 'job' , async ( step , globalState ) => {
89
93
globalState . logger += ';job;' ;
90
- if ( step . properties . job === 'fail' ) {
94
+ const { job } = step . properties ;
95
+ if ( job === 'fail' ) {
91
96
throw new Error ( 'Job failed!' ) ;
92
97
}
98
+ if ( job === 'interrupt' ) {
99
+ return interrupt ( ) ;
100
+ }
101
+ if ( job . startsWith ( 'sleep:' ) ) {
102
+ await sleep ( Number ( job . substring ( 6 ) ) ) ;
103
+ return ;
104
+ }
105
+ throw new Error ( 'Unknown job' ) ;
93
106
} ) ,
94
107
createParallelActivity < ParallelStep , ParallelTestGlobalState > ( 'parallel' , {
95
108
init : ( ) => ( { } ) ,
96
- handler : async step => step . properties . activeBranchNames . map ( branchName )
109
+ handler : customParallelActivityHandler ?? ( async step => step . properties . activeBranchNames . map ( branchName ) )
97
110
} )
98
111
] ) ;
99
112
@@ -272,8 +285,87 @@ describe('ParallelActivity', () => {
272
285
} ) ;
273
286
interpreter . start ( ) ;
274
287
} ) ;
288
+
289
+ it ( 'interrupts the execution if a parallel activity handler returns interrupt()' , done => {
290
+ const definition : Definition = {
291
+ sequence : [
292
+ createLogStep ( 'before' ) ,
293
+ createParallelStep ( 'parallel' , [ 'thread0' ] , {
294
+ thread0 : [ ]
295
+ } ) ,
296
+ createLogStep ( 'after' )
297
+ ] ,
298
+ properties : { }
299
+ } ;
300
+
301
+ const interpreter = createTest ( definition , async ( _ , globalState ) => {
302
+ globalState . logger += ';interrupt;' ;
303
+ return interrupt ( ) ;
304
+ } ) ;
305
+
306
+ interpreter . onDone ( ( ) => {
307
+ const snapshot = interpreter . getSnapshot ( ) ;
308
+ const logger = snapshot . globalState . logger ;
309
+
310
+ expect ( logger ) . toBe ( ';before;;interrupt;' ) ;
311
+ expect ( snapshot . isFailed ( ) ) . toEqual ( false ) ;
312
+ expect ( snapshot . isInterrupted ( ) ) . toEqual ( true ) ;
313
+ expect ( snapshot . isFinished ( ) ) . toEqual ( false ) ;
314
+
315
+ done ( ) ;
316
+ } ) ;
317
+ interpreter . start ( ) ;
318
+ } ) ;
319
+
320
+ // TODO: This test may be fragile on slow machines (it uses timeouts). For now I leave it as it is.
321
+ it ( 'interrupts the execution if a step inside a parallel section returns interrupt()' , done => {
322
+ const definition : Definition = {
323
+ sequence : [
324
+ createLogStep ( 'before' ) ,
325
+ createParallelStep ( 'parallel' , [ 'thread0' , 'thread1' ] , {
326
+ thread0 : [
327
+ createLogStep ( 'thread0_0' ) ,
328
+ createJobStep ( 'job' , 'sleep:100' ) ,
329
+ createJobStep ( 'job' , 'interrupt' ) ,
330
+ createLogStep ( 'thread0_1' )
331
+ ] ,
332
+ thread1 : [ createLogStep ( 'thread1_0' ) , createJobStep ( 'job' , 'sleep:300' ) , createLogStep ( 'thread1_1' ) ]
333
+ } ) ,
334
+ createLogStep ( 'after' )
335
+ ] ,
336
+ properties : { }
337
+ } ;
338
+
339
+ const interpreter = createTest ( definition ) ;
340
+
341
+ interpreter . onDone ( ( ) => {
342
+ const snapshot = interpreter . getSnapshot ( ) ;
343
+ const logger = snapshot . globalState . logger ;
344
+
345
+ expect ( logger ) . toContain ( ';before;' ) ;
346
+ expect ( logger ) . toContain ( ';thread0_0;' ) ;
347
+ expect ( logger ) . toContain ( ';thread1_0;' ) ;
348
+ expect ( logger ) . toContain ( ';job;' ) ;
349
+ expect ( snapshot . isFailed ( ) ) . toEqual ( false ) ;
350
+ expect ( snapshot . isInterrupted ( ) ) . toEqual ( true ) ;
351
+ expect ( snapshot . isFinished ( ) ) . toEqual ( false ) ;
352
+
353
+ setTimeout ( ( ) => {
354
+ expect ( logger ) . not . toContain ( 'thread0_1' ) ;
355
+ // We expect the `thread1_1` step won't be executed if the machine is interrupted.
356
+ // The second "thread" should be also interrupted.
357
+ expect ( logger ) . not . toContain ( 'thread1_1' ) ;
358
+ done ( ) ;
359
+ } , 600 ) ;
360
+ } ) ;
361
+ interpreter . start ( ) ;
362
+ } ) ;
275
363
} ) ;
276
364
277
365
function extractBetween ( log : string , start : string , end : string ) {
278
366
return log . split ( start ) [ 1 ] . split ( end ) [ 0 ] ;
279
367
}
368
+
369
+ function sleep ( ms : number ) {
370
+ return new Promise ( resolve => setTimeout ( resolve , ms ) ) ;
371
+ }
0 commit comments