Skip to content

Commit a1edba4

Browse files
authored
Merge pull request #17 from clue-labs/unwrapped-write
Writing to closed unwrapped stream should return false (backpressure)
2 parents b5cf8b2 + 3d3eed1 commit a1edba4

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

src/UnwrapWritableStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ function ($e) use ($out, &$closed) {
105105
public function write($data)
106106
{
107107
if ($this->ending) {
108-
return;
108+
return false;
109109
}
110110

111111
// forward to inner stream if possible

tests/UnwrapWritableTest.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,55 @@ public function testForwardsNoDataWhenWritingAfterEndOncePromiseResolves()
253253
$this->loop->run();
254254
}
255255

256+
public function testWriteReturnsFalseWhenPromiseIsPending()
257+
{
258+
$promise = new \React\Promise\Promise(function () { });
259+
$stream = Stream\unwrapWritable($promise);
260+
261+
$ret = $stream->write('nope');
262+
263+
$this->assertFalse($ret);
264+
}
265+
266+
public function testWriteReturnsTrueWhenUnwrappedStreamReturnsTrueForWrite()
267+
{
268+
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
269+
$input->expects($this->once())->method('isWritable')->willReturn(true);
270+
$input->expects($this->once())->method('write')->willReturn(true);
271+
272+
$promise = \React\Promise\resolve($input);
273+
$stream = Stream\unwrapWritable($promise);
274+
275+
$ret = $stream->write('hello');
276+
277+
$this->assertTrue($ret);
278+
}
279+
280+
public function testWriteReturnsFalseWhenUnwrappedStreamReturnsFalseForWrite()
281+
{
282+
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
283+
$input->expects($this->once())->method('isWritable')->willReturn(true);
284+
$input->expects($this->once())->method('write')->willReturn(false);
285+
286+
$promise = \React\Promise\resolve($input);
287+
$stream = Stream\unwrapWritable($promise);
288+
289+
$ret = $stream->write('nope');
290+
291+
$this->assertFalse($ret);
292+
}
293+
294+
public function testWriteAfterCloseReturnsFalse()
295+
{
296+
$promise = new \React\Promise\Promise(function () { });
297+
$stream = Stream\unwrapWritable($promise);
298+
299+
$stream->close();
300+
$ret = $stream->write('nope');
301+
302+
$this->assertFalse($ret);
303+
}
304+
256305
public function testEmitsErrorAndClosesWhenInputEmitsError()
257306
{
258307
$input = new ThroughStream();
@@ -267,6 +316,20 @@ public function testEmitsErrorAndClosesWhenInputEmitsError()
267316
$this->assertFalse($stream->isWritable());
268317
}
269318

319+
public function testEmitsDrainWhenPromiseResolvesWithStreamWhenForwardingData()
320+
{
321+
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
322+
$input->expects($this->once())->method('isWritable')->willReturn(true);
323+
$input->expects($this->once())->method('write')->with('hello')->willReturn(true);
324+
325+
$deferred = new Deferred();
326+
$stream = Stream\unwrapWritable($deferred->promise());
327+
$stream->write('hello');
328+
329+
$stream->on('drain', $this->expectCallableOnce());
330+
$deferred->resolve($input);
331+
}
332+
270333
public function testDoesNotEmitDrainWhenStreamBufferExceededAfterForwardingData()
271334
{
272335
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();

0 commit comments

Comments
 (0)