Skip to content

Commit fe3d3f0

Browse files
committed
Improve promise cancellation and clean up any garbage references
1 parent 5675206 commit fe3d3f0

File tree

5 files changed

+134
-24
lines changed

5 files changed

+134
-24
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"require": {
2020
"php": ">=5.3",
2121
"react/promise": " ^2.1 || ^1.2.1",
22-
"react/socket": "^1.0 || ^0.8.4",
22+
"react/socket": "^1.1",
2323
"ringcentral/psr7": "^1.2"
2424
},
2525
"require-dev": {

src/ProxyConnector.php

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -136,21 +136,34 @@ public function connect($uri)
136136
$proxyUri .= '#' . $parts['fragment'];
137137
}
138138

139-
$auth = $this->proxyAuth;
139+
$connecting = $this->connector->connect($proxyUri);
140+
141+
$deferred = new Deferred(function ($_, $reject) use ($connecting) {
142+
$reject(new RuntimeException(
143+
'Connection cancelled while waiting for proxy (ECONNABORTED)',
144+
defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
145+
));
140146

141-
return $this->connector->connect($proxyUri)->then(function (ConnectionInterface $stream) use ($target, $auth) {
142-
$deferred = new Deferred(function ($_, $reject) use ($stream) {
143-
$reject(new RuntimeException('Connection canceled while waiting for response from proxy (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103));
147+
// either close active connection or cancel pending connection attempt
148+
$connecting->then(function (ConnectionInterface $stream) {
144149
$stream->close();
145150
});
151+
$connecting->cancel();
152+
});
146153

154+
$auth = $this->proxyAuth;
155+
$connecting->then(function (ConnectionInterface $stream) use ($target, $auth, $deferred) {
147156
// keep buffering data until headers are complete
148157
$buffer = '';
149-
$fn = function ($chunk) use (&$buffer, $deferred, $stream) {
158+
$stream->on('data', $fn = function ($chunk) use (&$buffer, $deferred, $stream, &$fn) {
150159
$buffer .= $chunk;
151160

152161
$pos = strpos($buffer, "\r\n\r\n");
153162
if ($pos !== false) {
163+
// end of headers received => stop buffering
164+
$stream->removeListener('data', $fn);
165+
$fn = null;
166+
154167
// try to parse headers as response message
155168
try {
156169
$response = Psr7\parse_response(substr($buffer, 0, $pos));
@@ -163,11 +176,13 @@ public function connect($uri)
163176
if ($response->getStatusCode() === 407) {
164177
// map status code 407 (Proxy Authentication Required) to EACCES
165178
$deferred->reject(new RuntimeException('Proxy denied connection due to invalid authentication ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (EACCES)', defined('SOCKET_EACCES') ? SOCKET_EACCES : 13));
166-
return $stream->close();
179+
$stream->close();
180+
return;
167181
} elseif ($response->getStatusCode() < 200 || $response->getStatusCode() >= 300) {
168182
// map non-2xx status code to ECONNREFUSED
169183
$deferred->reject(new RuntimeException('Proxy refused connection with HTTP error code ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111));
170-
return $stream->close();
184+
$stream->close();
185+
return;
171186
}
172187

173188
// all okay, resolve with stream instance
@@ -187,8 +202,7 @@ public function connect($uri)
187202
$deferred->reject(new RuntimeException('Proxy must not send more than 8 KiB of headers (EMSGSIZE)', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90));
188203
$stream->close();
189204
}
190-
};
191-
$stream->on('data', $fn);
205+
});
192206

193207
$stream->on('error', function (Exception $e) use ($deferred) {
194208
$deferred->reject(new RuntimeException('Stream error while waiting for response from proxy (EIO)', defined('SOCKET_EIO') ? SOCKET_EIO : 5, $e));
@@ -199,14 +213,28 @@ public function connect($uri)
199213
});
200214

201215
$stream->write("CONNECT " . $target . " HTTP/1.1\r\nHost: " . $target . "\r\n" . $auth . "\r\n");
202-
203-
return $deferred->promise()->then(function (ConnectionInterface $stream) use ($fn) {
204-
// Stop buffering when connection has been established.
205-
$stream->removeListener('data', $fn);
206-
return new Promise\FulfilledPromise($stream);
207-
});
208-
}, function (Exception $e) use ($proxyUri) {
209-
throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e);
216+
}, function (Exception $e) use ($deferred) {
217+
$deferred->reject($e = new RuntimeException(
218+
'Unable to connect to proxy (ECONNREFUSED)',
219+
defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111,
220+
$e
221+
));
222+
223+
// avoid garbage references by replacing all closures in call stack.
224+
// what a lovely piece of code!
225+
$r = new \ReflectionProperty('Exception', 'trace');
226+
$r->setAccessible(true);
227+
$trace = $r->getValue($e);
228+
foreach ($trace as &$one) {
229+
foreach ($one['args'] as &$arg) {
230+
if ($arg instanceof \Closure) {
231+
$arg = 'Object(' . get_class($arg) . ')';
232+
}
233+
}
234+
}
235+
$r->setValue($e, $trace);
210236
});
237+
238+
return $deferred->promise();
211239
}
212240
}

tests/AbstractTestCase.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@ protected function expectCallableOnceWithExceptionCode($code)
4343
$mock
4444
->expects($this->once())
4545
->method('__invoke')
46-
->with($this->callback(function ($e) use ($code) {
47-
return $e->getCode() === $code;
48-
}));
46+
->with($this->logicalAnd(
47+
$this->isInstanceOf('Exception'),
48+
$this->callback(function ($e) use ($code) {
49+
return $e->getCode() === $code;
50+
})
51+
));
4952

5053
return $mock;
5154
}

tests/FunctionalTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,20 @@ public function testSecureGoogleDoesNotAcceptPlainStream()
7272
$this->setExpectedException('RuntimeException', 'Connection to proxy lost', SOCKET_ECONNRESET);
7373
Block\await($promise, $this->loop, 3.0);
7474
}
75+
76+
/**
77+
* @requires PHP 7
78+
*/
79+
public function testCancelWhileConnectingShouldNotCreateGarbageCycles()
80+
{
81+
$proxy = new ProxyConnector('google.com', $this->dnsConnector);
82+
83+
gc_collect_cycles();
84+
85+
$promise = $proxy->connect('google.com:80');
86+
$promise->cancel();
87+
unset($promise);
88+
89+
$this->assertEquals(0, gc_collect_cycles());
90+
}
7591
}

tests/ProxyConnectorTest.php

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Clue\React\HttpProxy\ProxyConnector;
66
use React\Promise\Promise;
77
use React\Socket\ConnectionInterface;
8+
use React\Promise\Deferred;
89

910
class ProxyConnectorTest extends AbstractTestCase
1011
{
@@ -355,22 +356,84 @@ public function testResolvesIfStreamReturnsSuccessAndEmitsExcessiveData()
355356
$stream->emit('data', array("HTTP/1.1 200 OK\r\n\r\nhello!"));
356357
}
357358

358-
public function testCancelPromiseWillCloseOpenConnectionAndReject()
359+
public function testCancelPromiseWhileConnectionIsReadyWillCloseOpenConnectionAndReject()
359360
{
360361
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock();
361362
$stream->expects($this->once())->method('close');
362363

363-
$promise = \React\Promise\resolve($stream);
364-
$this->connector->expects($this->once())->method('connect')->willReturn($promise);
364+
$deferred = new Deferred();
365+
366+
$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());
365367

366368
$proxy = new ProxyConnector('proxy.example.com', $this->connector);
367369

368370
$promise = $proxy->connect('google.com:80');
369371

372+
$deferred->resolve($stream);
373+
370374
$this->assertInstanceOf('React\Promise\CancellablePromiseInterface', $promise);
371375

372376
$promise->cancel();
373377

374378
$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
375379
}
380+
381+
public function testCancelPromiseDuringConnectionShouldNotCreateGarbageCycles()
382+
{
383+
$pending = new Promise(function () { });
384+
$this->connector->expects($this->once())->method('connect')->willReturn($pending);
385+
386+
gc_collect_cycles();
387+
388+
$proxy = new ProxyConnector('proxy.example.com', $this->connector);
389+
390+
$promise = $proxy->connect('google.com:80');
391+
$promise->cancel();
392+
unset($promise);
393+
394+
$this->assertEquals(0, gc_collect_cycles());
395+
}
396+
397+
public function testCancelPromiseWhileConnectionIsReadyShouldNotCreateGarbageCycles()
398+
{
399+
if (class_exists('React\Promise\When')) {
400+
$this->markTestSkipped('Not supported on legacy Promise v1 API');
401+
}
402+
403+
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock();
404+
405+
$deferred = new Deferred();
406+
407+
$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());
408+
409+
gc_collect_cycles();
410+
411+
$proxy = new ProxyConnector('proxy.example.com', $this->connector);
412+
413+
$promise = $proxy->connect('google.com:80');
414+
$deferred->resolve($stream);
415+
$promise->cancel();
416+
unset($promise);
417+
418+
$this->assertEquals(0, gc_collect_cycles());
419+
}
420+
421+
public function testRejectedConnectionShouldNotCreateGarbageCycles()
422+
{
423+
if (class_exists('React\Promise\When')) {
424+
$this->markTestSkipped('Not supported on legacy Promise v1 API');
425+
}
426+
427+
$rejected = \React\Promise\reject(new \RuntimeException());
428+
$this->connector->expects($this->once())->method('connect')->willReturn($rejected);
429+
430+
gc_collect_cycles();
431+
432+
$proxy = new ProxyConnector('proxy.example.com', $this->connector);
433+
434+
$promise = $proxy->connect('google.com:80');
435+
unset($promise);
436+
437+
$this->assertEquals(0, gc_collect_cycles());
438+
}
376439
}

0 commit comments

Comments
 (0)