From fe3d3f05ffbaaeb17135bf1c9bd629116cef336e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 22 Oct 2018 10:02:47 +0200 Subject: [PATCH] Improve promise cancellation and clean up any garbage references --- composer.json | 2 +- src/ProxyConnector.php | 62 +++++++++++++++++++++++--------- tests/AbstractTestCase.php | 9 +++-- tests/FunctionalTest.php | 16 +++++++++ tests/ProxyConnectorTest.php | 69 ++++++++++++++++++++++++++++++++++-- 5 files changed, 134 insertions(+), 24 deletions(-) diff --git a/composer.json b/composer.json index 0b70f02..72e4a8f 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ "require": { "php": ">=5.3", "react/promise": " ^2.1 || ^1.2.1", - "react/socket": "^1.0 || ^0.8.4", + "react/socket": "^1.1", "ringcentral/psr7": "^1.2" }, "require-dev": { diff --git a/src/ProxyConnector.php b/src/ProxyConnector.php index dfb5490..6b84f21 100644 --- a/src/ProxyConnector.php +++ b/src/ProxyConnector.php @@ -136,21 +136,34 @@ public function connect($uri) $proxyUri .= '#' . $parts['fragment']; } - $auth = $this->proxyAuth; + $connecting = $this->connector->connect($proxyUri); + + $deferred = new Deferred(function ($_, $reject) use ($connecting) { + $reject(new RuntimeException( + 'Connection cancelled while waiting for proxy (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); - return $this->connector->connect($proxyUri)->then(function (ConnectionInterface $stream) use ($target, $auth) { - $deferred = new Deferred(function ($_, $reject) use ($stream) { - $reject(new RuntimeException('Connection canceled while waiting for response from proxy (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103)); + // either close active connection or cancel pending connection attempt + $connecting->then(function (ConnectionInterface $stream) { $stream->close(); }); + $connecting->cancel(); + }); + $auth = $this->proxyAuth; + $connecting->then(function (ConnectionInterface $stream) use ($target, $auth, $deferred) { // keep buffering data until headers are complete $buffer = ''; - $fn = function ($chunk) use (&$buffer, $deferred, $stream) { + $stream->on('data', $fn = function ($chunk) use (&$buffer, $deferred, $stream, &$fn) { $buffer .= $chunk; $pos = strpos($buffer, "\r\n\r\n"); if ($pos !== false) { + // end of headers received => stop buffering + $stream->removeListener('data', $fn); + $fn = null; + // try to parse headers as response message try { $response = Psr7\parse_response(substr($buffer, 0, $pos)); @@ -163,11 +176,13 @@ public function connect($uri) if ($response->getStatusCode() === 407) { // map status code 407 (Proxy Authentication Required) to EACCES $deferred->reject(new RuntimeException('Proxy denied connection due to invalid authentication ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (EACCES)', defined('SOCKET_EACCES') ? SOCKET_EACCES : 13)); - return $stream->close(); + $stream->close(); + return; } elseif ($response->getStatusCode() < 200 || $response->getStatusCode() >= 300) { // map non-2xx status code to ECONNREFUSED $deferred->reject(new RuntimeException('Proxy refused connection with HTTP error code ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111)); - return $stream->close(); + $stream->close(); + return; } // all okay, resolve with stream instance @@ -187,8 +202,7 @@ public function connect($uri) $deferred->reject(new RuntimeException('Proxy must not send more than 8 KiB of headers (EMSGSIZE)', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90)); $stream->close(); } - }; - $stream->on('data', $fn); + }); $stream->on('error', function (Exception $e) use ($deferred) { $deferred->reject(new RuntimeException('Stream error while waiting for response from proxy (EIO)', defined('SOCKET_EIO') ? SOCKET_EIO : 5, $e)); @@ -199,14 +213,28 @@ public function connect($uri) }); $stream->write("CONNECT " . $target . " HTTP/1.1\r\nHost: " . $target . "\r\n" . $auth . "\r\n"); - - return $deferred->promise()->then(function (ConnectionInterface $stream) use ($fn) { - // Stop buffering when connection has been established. - $stream->removeListener('data', $fn); - return new Promise\FulfilledPromise($stream); - }); - }, function (Exception $e) use ($proxyUri) { - throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e); + }, function (Exception $e) use ($deferred) { + $deferred->reject($e = new RuntimeException( + 'Unable to connect to proxy (ECONNREFUSED)', + defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, + $e + )); + + // avoid garbage references by replacing all closures in call stack. + // what a lovely piece of code! + $r = new \ReflectionProperty('Exception', 'trace'); + $r->setAccessible(true); + $trace = $r->getValue($e); + foreach ($trace as &$one) { + foreach ($one['args'] as &$arg) { + if ($arg instanceof \Closure) { + $arg = 'Object(' . get_class($arg) . ')'; + } + } + } + $r->setValue($e, $trace); }); + + return $deferred->promise(); } } diff --git a/tests/AbstractTestCase.php b/tests/AbstractTestCase.php index 632b314..78d96d9 100644 --- a/tests/AbstractTestCase.php +++ b/tests/AbstractTestCase.php @@ -43,9 +43,12 @@ protected function expectCallableOnceWithExceptionCode($code) $mock ->expects($this->once()) ->method('__invoke') - ->with($this->callback(function ($e) use ($code) { - return $e->getCode() === $code; - })); + ->with($this->logicalAnd( + $this->isInstanceOf('Exception'), + $this->callback(function ($e) use ($code) { + return $e->getCode() === $code; + }) + )); return $mock; } diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index 23273cf..673b12f 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -72,4 +72,20 @@ public function testSecureGoogleDoesNotAcceptPlainStream() $this->setExpectedException('RuntimeException', 'Connection to proxy lost', SOCKET_ECONNRESET); Block\await($promise, $this->loop, 3.0); } + + /** + * @requires PHP 7 + */ + public function testCancelWhileConnectingShouldNotCreateGarbageCycles() + { + $proxy = new ProxyConnector('google.com', $this->dnsConnector); + + gc_collect_cycles(); + + $promise = $proxy->connect('google.com:80'); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } } diff --git a/tests/ProxyConnectorTest.php b/tests/ProxyConnectorTest.php index 32b7d66..3147d92 100644 --- a/tests/ProxyConnectorTest.php +++ b/tests/ProxyConnectorTest.php @@ -5,6 +5,7 @@ use Clue\React\HttpProxy\ProxyConnector; use React\Promise\Promise; use React\Socket\ConnectionInterface; +use React\Promise\Deferred; class ProxyConnectorTest extends AbstractTestCase { @@ -355,22 +356,84 @@ public function testResolvesIfStreamReturnsSuccessAndEmitsExcessiveData() $stream->emit('data', array("HTTP/1.1 200 OK\r\n\r\nhello!")); } - public function testCancelPromiseWillCloseOpenConnectionAndReject() + public function testCancelPromiseWhileConnectionIsReadyWillCloseOpenConnectionAndReject() { $stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock(); $stream->expects($this->once())->method('close'); - $promise = \React\Promise\resolve($stream); - $this->connector->expects($this->once())->method('connect')->willReturn($promise); + $deferred = new Deferred(); + + $this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); $proxy = new ProxyConnector('proxy.example.com', $this->connector); $promise = $proxy->connect('google.com:80'); + $deferred->resolve($stream); + $this->assertInstanceOf('React\Promise\CancellablePromiseInterface', $promise); $promise->cancel(); $promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED)); } + + public function testCancelPromiseDuringConnectionShouldNotCreateGarbageCycles() + { + $pending = new Promise(function () { }); + $this->connector->expects($this->once())->method('connect')->willReturn($pending); + + gc_collect_cycles(); + + $proxy = new ProxyConnector('proxy.example.com', $this->connector); + + $promise = $proxy->connect('google.com:80'); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } + + public function testCancelPromiseWhileConnectionIsReadyShouldNotCreateGarbageCycles() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock(); + + $deferred = new Deferred(); + + $this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); + + gc_collect_cycles(); + + $proxy = new ProxyConnector('proxy.example.com', $this->connector); + + $promise = $proxy->connect('google.com:80'); + $deferred->resolve($stream); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } + + public function testRejectedConnectionShouldNotCreateGarbageCycles() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $rejected = \React\Promise\reject(new \RuntimeException()); + $this->connector->expects($this->once())->method('connect')->willReturn($rejected); + + gc_collect_cycles(); + + $proxy = new ProxyConnector('proxy.example.com', $this->connector); + + $promise = $proxy->connect('google.com:80'); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } }