Skip to content

Split BufRead into multiple files #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Sep 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/io/buf_read/lines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::mem;
use std::pin::Pin;
use std::str;

use futures_io::AsyncBufRead;

use super::read_until_internal;
use crate::io;
use crate::task::{Context, Poll};

/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
pub(crate) reader: R,
pub(crate) buf: String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
}

impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
}
}

pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
138 changes: 8 additions & 130 deletions src/io/buf_read.rs → src/io/buf_read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
mod lines;
mod read_line;
mod read_until;

pub use lines::Lines;
use read_line::ReadLineFuture;
use read_until::ReadUntilFuture;

use std::mem;
use std::pin::Pin;
use std::str;

use cfg_if::cfg_if;
use futures_io::AsyncBufRead;

use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};

Expand Down Expand Up @@ -191,134 +197,6 @@ pub trait BufRead {

impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
byte: u8,
buf: &'a mut Vec<u8>,
read: usize,
}

impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut String,
bytes: Vec<u8>,
read: usize,
}

impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
read,
} = &mut *self;
let reader = Pin::new(reader);

let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}

/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}

impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
}
}

pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}

pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
Expand Down
49 changes: 49 additions & 0 deletions src/io/buf_read/read_line.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::mem;
use std::pin::Pin;
use std::str;

use futures_io::AsyncBufRead;

use super::read_until_internal;
use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) buf: &'a mut String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
}

impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
read,
} = &mut *self;
let reader = Pin::new(reader);

let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}
31 changes: 31 additions & 0 deletions src/io/buf_read/read_until.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::pin::Pin;

use futures_io::AsyncBufRead;

use super::read_until_internal;
use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) byte: u8,
pub(crate) buf: &'a mut Vec<u8>,
pub(crate) read: usize,
}

impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}