diff --git a/src/Client.php b/src/Client.php index 628ef55..21d3389 100644 --- a/src/Client.php +++ b/src/Client.php @@ -14,7 +14,6 @@ use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; use React\EventLoop\LoopInterface; -use React\Promise\Deferred; use React\HttpClient\Client as ReactClient; use React\HttpClient\Request as ReactRequest; use React\HttpClient\Response as ReactResponse; @@ -93,45 +92,42 @@ public function sendRequest(RequestInterface $request) public function sendAsyncRequest(RequestInterface $request) { $reactRequest = $this->buildReactRequest($request); - $deferred = new Deferred(); + $promise = new Promise($this->loop); - $reactRequest->on('error', function (\Exception $error) use ($deferred, $request) { - $deferred->reject(new RequestException( + $reactRequest->on('error', function (\Exception $error) use ($promise, $request) { + $promise->reject(new RequestException( $error->getMessage(), $request, $error )); }); - $reactRequest->on('response', function (ReactResponse $reactResponse = null) use ($deferred, $reactRequest, $request) { + $reactRequest->on('response', function (ReactResponse $reactResponse = null) use ($promise, $request) { $bodyStream = $this->streamFactory->createStream(); $reactResponse->on('data', function ($data) use (&$bodyStream) { $bodyStream->write((string) $data); }); - $reactResponse->on('end', function (\Exception $error = null) use ($deferred, $request, $reactResponse, &$bodyStream) { + $reactResponse->on('end', function (\Exception $error = null) use ($promise, $request, $reactResponse, &$bodyStream) { $response = $this->buildResponse( $reactResponse, $bodyStream ); if (null !== $error) { - $deferred->reject(new HttpException( + $promise->reject(new HttpException( $error->getMessage(), $request, $response, $error )); } else { - $deferred->resolve($response); + $promise->resolve($response); } }); }); $reactRequest->end((string) $request->getBody()); - $promise = new Promise($deferred->promise()); - $promise->setLoop($this->loop); - return $promise; } diff --git a/src/Promise.php b/src/Promise.php index 0a05459..a70d8b8 100644 --- a/src/Promise.php +++ b/src/Promise.php @@ -3,7 +3,6 @@ namespace Http\Adapter\React; use React\EventLoop\LoopInterface; -use React\Promise\PromiseInterface as ReactPromise; use Http\Client\Exception; use Http\Promise\Promise as HttpPromise; use Psr\Http\Message\ResponseInterface; @@ -12,8 +11,10 @@ * React promise adapter implementation. * * @author Stéphane Hulard + * + * @internal */ -class Promise implements HttpPromise +final class Promise implements HttpPromise { /** * Promise status. @@ -22,13 +23,6 @@ class Promise implements HttpPromise */ private $state = HttpPromise::PENDING; - /** - * Adapted React promise. - * - * @var ReactPromise - */ - private $promise; - /** * PSR7 received response. * @@ -43,6 +37,16 @@ class Promise implements HttpPromise */ private $exception; + /** + * @var callable|null + */ + private $onFulfilled; + + /** + * @var callable|null + */ + private $onRejected; + /** * React Event Loop used for synchronous processing. * @@ -50,24 +54,9 @@ class Promise implements HttpPromise */ private $loop; - /** - * Initialize the promise. - * - * @param ReactPromise $promise - */ - public function __construct(ReactPromise $promise) + public function __construct(LoopInterface $loop) { - $promise->then( - function (ResponseInterface $response) { - $this->state = HttpPromise::FULFILLED; - $this->response = $response; - }, - function (Exception $error) { - $this->state = HttpPromise::REJECTED; - $this->exception = $error; - } - ); - $this->promise = $promise; + $this->loop = $loop; } /** @@ -80,39 +69,103 @@ function (Exception $error) { */ public function then(callable $onFulfilled = null, callable $onRejected = null) { - $this->promise->then(function () use ($onFulfilled) { - if (null !== $onFulfilled) { - call_user_func($onFulfilled, $this->response); + $newPromise = new self($this->loop); + + $onFulfilled = $onFulfilled !== null ? $onFulfilled : function (ResponseInterface $response) { + return $response; + }; + + $onRejected = $onRejected !== null ? $onRejected : function (Exception $exception) { + throw $exception; + }; + + $this->onFulfilled = function (ResponseInterface $response) use ($onFulfilled, $newPromise) { + try { + $newPromise->resolve($onFulfilled($response)); + } catch (Exception $exception) { + $newPromise->reject($exception); } - }, function () use ($onRejected) { - if (null !== $onRejected) { - call_user_func($onRejected, $this->exception); + }; + + $this->onRejected = function (Exception $exception) use ($onRejected, $newPromise) { + try { + $newPromise->resolve($onRejected($exception)); + } catch (Exception $exception) { + $newPromise->reject($exception); } - }); + }; - return $this; + if ($this->state === HttpPromise::FULFILLED) { + $this->doResolve($this->response); + } + + if ($this->state === HttpPromise::REJECTED) { + $this->doReject($this->exception); + } + + return $newPromise; } /** - * {@inheritdoc} + * Resolve this promise. + * + * @param ResponseInterface $response + * + * @internal */ - public function getState() + public function resolve(ResponseInterface $response) { - return $this->state; + if ($this->state !== HttpPromise::PENDING) { + throw new \RuntimeException('Promise is already resolved'); + } + + $this->state = HttpPromise::FULFILLED; + $this->response = $response; + $this->doResolve($response); + } + + private function doResolve(ResponseInterface $response) + { + $onFulfilled = $this->onFulfilled; + + if (null !== $onFulfilled) { + $onFulfilled($response); + } } /** - * Set EventLoop used for synchronous processing. + * Reject this promise. * - * @param LoopInterface $loop + * @param Exception $exception * - * @return Promise + * @internal */ - public function setLoop(LoopInterface $loop) + public function reject(Exception $exception) { - $this->loop = $loop; + if ($this->state !== HttpPromise::PENDING) { + throw new \RuntimeException('Promise is already resolved'); + } + + $this->state = HttpPromise::REJECTED; + $this->exception = $exception; + $this->doReject($exception); + } + + private function doReject(Exception $exception) + { + $onRejected = $this->onRejected; - return $this; + if (null !== $onRejected) { + $onRejected($exception); + } + } + + /** + * {@inheritdoc} + */ + public function getState() + { + return $this->state; } /** @@ -120,9 +173,6 @@ public function setLoop(LoopInterface $loop) */ public function wait($unwrap = true) { - if (null === $this->loop) { - throw new \LogicException('You must set the loop before wait!'); - } while (HttpPromise::PENDING === $this->getState()) { $this->loop->tick(); } diff --git a/tests/PromiseTest.php b/tests/PromiseTest.php new file mode 100644 index 0000000..8a4bd8d --- /dev/null +++ b/tests/PromiseTest.php @@ -0,0 +1,34 @@ +loop = ReactFactory::buildEventLoop(); + } + + public function testChain() + { + $promise = new Promise($this->loop); + $response = new Response(200); + + $lastPromise = $promise->then(function (Response $response) { + return $response->withStatus(300); + }); + + $promise->resolve($response); + $updatedResponse = $lastPromise->wait(); + + self::assertEquals(200, $response->getStatusCode()); + self::assertEquals(300, $updatedResponse->getStatusCode()); + } +}