|
21 | 21 | from unittest import mock
|
22 | 22 | import weakref
|
23 | 23 |
|
| 24 | +if sys.platform != 'win32': |
| 25 | + import tty |
24 | 26 |
|
25 | 27 | import asyncio
|
26 | 28 | from asyncio import proactor_events
|
@@ -1626,6 +1628,79 @@ def reader(data):
|
1626 | 1628 | self.loop.run_until_complete(proto.done)
|
1627 | 1629 | self.assertEqual('CLOSED', proto.state)
|
1628 | 1630 |
|
| 1631 | + @unittest.skipUnless(sys.platform != 'win32', |
| 1632 | + "Don't support pipes for Windows") |
| 1633 | + # select, poll and kqueue don't support character devices (PTY) on Mac OS X |
| 1634 | + # older than 10.6 (Snow Leopard) |
| 1635 | + @support.requires_mac_ver(10, 6) |
| 1636 | + def test_bidirectional_pty(self): |
| 1637 | + master, read_slave = os.openpty() |
| 1638 | + write_slave = os.dup(read_slave) |
| 1639 | + tty.setraw(read_slave) |
| 1640 | + |
| 1641 | + slave_read_obj = io.open(read_slave, 'rb', 0) |
| 1642 | + read_proto = MyReadPipeProto(loop=self.loop) |
| 1643 | + read_connect = self.loop.connect_read_pipe(lambda: read_proto, |
| 1644 | + slave_read_obj) |
| 1645 | + read_transport, p = self.loop.run_until_complete(read_connect) |
| 1646 | + self.assertIs(p, read_proto) |
| 1647 | + self.assertIs(read_transport, read_proto.transport) |
| 1648 | + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) |
| 1649 | + self.assertEqual(0, read_proto.nbytes) |
| 1650 | + |
| 1651 | + |
| 1652 | + slave_write_obj = io.open(write_slave, 'wb', 0) |
| 1653 | + write_proto = MyWritePipeProto(loop=self.loop) |
| 1654 | + write_connect = self.loop.connect_write_pipe(lambda: write_proto, |
| 1655 | + slave_write_obj) |
| 1656 | + write_transport, p = self.loop.run_until_complete(write_connect) |
| 1657 | + self.assertIs(p, write_proto) |
| 1658 | + self.assertIs(write_transport, write_proto.transport) |
| 1659 | + self.assertEqual('CONNECTED', write_proto.state) |
| 1660 | + |
| 1661 | + data = bytearray() |
| 1662 | + def reader(data): |
| 1663 | + chunk = os.read(master, 1024) |
| 1664 | + data += chunk |
| 1665 | + return len(data) |
| 1666 | + |
| 1667 | + write_transport.write(b'1') |
| 1668 | + test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) |
| 1669 | + self.assertEqual(b'1', data) |
| 1670 | + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) |
| 1671 | + self.assertEqual('CONNECTED', write_proto.state) |
| 1672 | + |
| 1673 | + os.write(master, b'a') |
| 1674 | + test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, |
| 1675 | + timeout=10) |
| 1676 | + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) |
| 1677 | + self.assertEqual(1, read_proto.nbytes) |
| 1678 | + self.assertEqual('CONNECTED', write_proto.state) |
| 1679 | + |
| 1680 | + write_transport.write(b'2345') |
| 1681 | + test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) |
| 1682 | + self.assertEqual(b'12345', data) |
| 1683 | + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) |
| 1684 | + self.assertEqual('CONNECTED', write_proto.state) |
| 1685 | + |
| 1686 | + os.write(master, b'bcde') |
| 1687 | + test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, |
| 1688 | + timeout=10) |
| 1689 | + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) |
| 1690 | + self.assertEqual(5, read_proto.nbytes) |
| 1691 | + self.assertEqual('CONNECTED', write_proto.state) |
| 1692 | + |
| 1693 | + os.close(master) |
| 1694 | + |
| 1695 | + read_transport.close() |
| 1696 | + self.loop.run_until_complete(read_proto.done) |
| 1697 | + self.assertEqual( |
| 1698 | + ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) |
| 1699 | + |
| 1700 | + write_transport.close() |
| 1701 | + self.loop.run_until_complete(write_proto.done) |
| 1702 | + self.assertEqual('CLOSED', write_proto.state) |
| 1703 | + |
1629 | 1704 | def test_prompt_cancellation(self):
|
1630 | 1705 | r, w = test_utils.socketpair()
|
1631 | 1706 | r.setblocking(False)
|
|
0 commit comments