@@ -372,6 +372,228 @@ def test_send_buffer_nowait(self):
372
372
obj [4 :8 ] = b'ham.'
373
373
self .assertEqual (obj , buf )
374
374
375
+ def test_send_cleared_with_subinterpreter (self ):
376
+ def common (rch , sch , unbound = None , presize = 0 ):
377
+ if not unbound :
378
+ extraargs = ''
379
+ elif unbound is channels .UNBOUND :
380
+ extraargs = ', unbound=channels.UNBOUND'
381
+ elif unbound is channels .UNBOUND_ERROR :
382
+ extraargs = ', unbound=channels.UNBOUND_ERROR'
383
+ elif unbound is channels .UNBOUND_REMOVE :
384
+ extraargs = ', unbound=channels.UNBOUND_REMOVE'
385
+ else :
386
+ raise NotImplementedError (repr (unbound ))
387
+ interp = interpreters .create ()
388
+
389
+ _run_output (interp , dedent (f"""
390
+ from test.support.interpreters import channels
391
+ sch = channels.SendChannel({ sch .id } )
392
+ obj1 = b'spam'
393
+ obj2 = b'eggs'
394
+ sch.send_nowait(obj1{ extraargs } )
395
+ sch.send_nowait(obj2{ extraargs } )
396
+ """ ))
397
+ self .assertEqual (
398
+ _channels .get_count (rch .id ),
399
+ presize + 2 ,
400
+ )
401
+
402
+ if presize == 0 :
403
+ obj1 = rch .recv ()
404
+ self .assertEqual (obj1 , b'spam' )
405
+ self .assertEqual (
406
+ _channels .get_count (rch .id ),
407
+ presize + 1 ,
408
+ )
409
+
410
+ return interp
411
+
412
+ with self .subTest ('default' ): # UNBOUND
413
+ rch , sch = channels .create ()
414
+ interp = common (rch , sch )
415
+ del interp
416
+ self .assertEqual (_channels .get_count (rch .id ), 1 )
417
+ obj1 = rch .recv ()
418
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
419
+ self .assertIs (obj1 , channels .UNBOUND )
420
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
421
+ with self .assertRaises (channels .ChannelEmptyError ):
422
+ rch .recv_nowait ()
423
+
424
+ with self .subTest ('UNBOUND' ):
425
+ rch , sch = channels .create ()
426
+ interp = common (rch , sch , channels .UNBOUND )
427
+ del interp
428
+ self .assertEqual (_channels .get_count (rch .id ), 1 )
429
+ obj1 = rch .recv ()
430
+ self .assertIs (obj1 , channels .UNBOUND )
431
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
432
+ with self .assertRaises (channels .ChannelEmptyError ):
433
+ rch .recv_nowait ()
434
+
435
+ with self .subTest ('UNBOUND_ERROR' ):
436
+ rch , sch = channels .create ()
437
+ interp = common (rch , sch , channels .UNBOUND_ERROR )
438
+
439
+ del interp
440
+ self .assertEqual (_channels .get_count (rch .id ), 1 )
441
+ with self .assertRaises (channels .ItemInterpreterDestroyed ):
442
+ rch .recv ()
443
+
444
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
445
+ with self .assertRaises (channels .ChannelEmptyError ):
446
+ rch .recv_nowait ()
447
+
448
+ with self .subTest ('UNBOUND_REMOVE' ):
449
+ rch , sch = channels .create ()
450
+
451
+ interp = common (rch , sch , channels .UNBOUND_REMOVE )
452
+ del interp
453
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
454
+ with self .assertRaises (channels .ChannelEmptyError ):
455
+ rch .recv_nowait ()
456
+
457
+ sch .send_nowait (b'ham' , unbound = channels .UNBOUND_REMOVE )
458
+ self .assertEqual (_channels .get_count (rch .id ), 1 )
459
+ interp = common (rch , sch , channels .UNBOUND_REMOVE , 1 )
460
+ self .assertEqual (_channels .get_count (rch .id ), 3 )
461
+ sch .send_nowait (42 , unbound = channels .UNBOUND_REMOVE )
462
+ self .assertEqual (_channels .get_count (rch .id ), 4 )
463
+ del interp
464
+ self .assertEqual (_channels .get_count (rch .id ), 2 )
465
+ obj1 = rch .recv ()
466
+ obj2 = rch .recv ()
467
+ self .assertEqual (obj1 , b'ham' )
468
+ self .assertEqual (obj2 , 42 )
469
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
470
+ with self .assertRaises (channels .ChannelEmptyError ):
471
+ rch .recv_nowait ()
472
+
473
+ def test_send_cleared_with_subinterpreter_mixed (self ):
474
+ rch , sch = channels .create ()
475
+ interp = interpreters .create ()
476
+
477
+ # If we don't associate the main interpreter with the channel
478
+ # then the channel will be automatically closed when interp
479
+ # is destroyed.
480
+ sch .send_nowait (None )
481
+ rch .recv ()
482
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
483
+
484
+ _run_output (interp , dedent (f"""
485
+ from test.support.interpreters import channels
486
+ sch = channels.SendChannel({ sch .id } )
487
+ sch.send_nowait(1, unbound=channels.UNBOUND)
488
+ sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
489
+ sch.send_nowait(3)
490
+ sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
491
+ sch.send_nowait(5, unbound=channels.UNBOUND)
492
+ """ ))
493
+ self .assertEqual (_channels .get_count (rch .id ), 5 )
494
+
495
+ del interp
496
+ self .assertEqual (_channels .get_count (rch .id ), 4 )
497
+
498
+ obj1 = rch .recv ()
499
+ self .assertIs (obj1 , channels .UNBOUND )
500
+ self .assertEqual (_channels .get_count (rch .id ), 3 )
501
+
502
+ with self .assertRaises (channels .ItemInterpreterDestroyed ):
503
+ rch .recv ()
504
+ self .assertEqual (_channels .get_count (rch .id ), 2 )
505
+
506
+ obj2 = rch .recv ()
507
+ self .assertIs (obj2 , channels .UNBOUND )
508
+ self .assertEqual (_channels .get_count (rch .id ), 1 )
509
+
510
+ obj3 = rch .recv ()
511
+ self .assertIs (obj3 , channels .UNBOUND )
512
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
513
+
514
+ def test_send_cleared_with_subinterpreter_multiple (self ):
515
+ rch , sch = channels .create ()
516
+ interp1 = interpreters .create ()
517
+ interp2 = interpreters .create ()
518
+
519
+ sch .send_nowait (1 )
520
+ _run_output (interp1 , dedent (f"""
521
+ from test.support.interpreters import channels
522
+ rch = channels.RecvChannel({ rch .id } )
523
+ sch = channels.SendChannel({ sch .id } )
524
+ obj1 = rch.recv()
525
+ sch.send_nowait(2, unbound=channels.UNBOUND)
526
+ sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
527
+ """ ))
528
+ _run_output (interp2 , dedent (f"""
529
+ from test.support.interpreters import channels
530
+ rch = channels.RecvChannel({ rch .id } )
531
+ sch = channels.SendChannel({ sch .id } )
532
+ obj2 = rch.recv()
533
+ obj1 = rch.recv()
534
+ """ ))
535
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
536
+ sch .send_nowait (3 )
537
+ _run_output (interp1 , dedent ("""
538
+ sch.send_nowait(4, unbound=channels.UNBOUND)
539
+ # interp closed here
540
+ sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
541
+ sch.send_nowait(6, unbound=channels.UNBOUND)
542
+ """ ))
543
+ _run_output (interp2 , dedent ("""
544
+ sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
545
+ # interp closed here
546
+ sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
547
+ sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
548
+ sch.send_nowait(8, unbound=channels.UNBOUND)
549
+ """ ))
550
+ _run_output (interp1 , dedent ("""
551
+ sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
552
+ sch.send_nowait(10, unbound=channels.UNBOUND)
553
+ """ ))
554
+ self .assertEqual (_channels .get_count (rch .id ), 10 )
555
+
556
+ obj3 = rch .recv ()
557
+ self .assertEqual (obj3 , 3 )
558
+ self .assertEqual (_channels .get_count (rch .id ), 9 )
559
+
560
+ obj4 = rch .recv ()
561
+ self .assertEqual (obj4 , 4 )
562
+ self .assertEqual (_channels .get_count (rch .id ), 8 )
563
+
564
+ del interp1
565
+ self .assertEqual (_channels .get_count (rch .id ), 6 )
566
+
567
+ # obj5 was removed
568
+
569
+ obj6 = rch .recv ()
570
+ self .assertIs (obj6 , channels .UNBOUND )
571
+ self .assertEqual (_channels .get_count (rch .id ), 5 )
572
+
573
+ obj7 = rch .recv ()
574
+ self .assertEqual (obj7 , 7 )
575
+ self .assertEqual (_channels .get_count (rch .id ), 4 )
576
+
577
+ del interp2
578
+ self .assertEqual (_channels .get_count (rch .id ), 3 )
579
+
580
+ # obj1
581
+ with self .assertRaises (channels .ItemInterpreterDestroyed ):
582
+ rch .recv ()
583
+ self .assertEqual (_channels .get_count (rch .id ), 2 )
584
+
585
+ # obj2 was removed
586
+
587
+ obj8 = rch .recv ()
588
+ self .assertIs (obj8 , channels .UNBOUND )
589
+ self .assertEqual (_channels .get_count (rch .id ), 1 )
590
+
591
+ # obj9 was removed
592
+
593
+ obj10 = rch .recv ()
594
+ self .assertIs (obj10 , channels .UNBOUND )
595
+ self .assertEqual (_channels .get_count (rch .id ), 0 )
596
+
375
597
376
598
if __name__ == '__main__' :
377
599
# Test needs to be a package, so we can do relative imports.
0 commit comments