Skip to content

Commit

Permalink
Clean up internals
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Oct 21, 2023
1 parent f1185f7 commit b1128cd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 68 deletions.
11 changes: 7 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ where
&'buf mut self,
request: Request<'conn, B>,
rx_buf: &'buf mut [u8],
) -> Result<Response<'buf, 'conn, T>, Error> {
) -> Result<Response<'buf, HttpConnection<'conn, T>>, Error> {
request.write(self).await?;
Response::read(self, request.method, rx_buf).await
}
Expand Down Expand Up @@ -299,7 +299,10 @@ where
/// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers.
///
/// The response is returned.
pub async fn send<'buf>(&'buf mut self, rx_buf: &'buf mut [u8]) -> Result<Response<'buf, 'conn, C>, Error> {
pub async fn send<'buf>(
&'buf mut self,
rx_buf: &'buf mut [u8],
) -> Result<Response<'buf, HttpConnection<'conn, C>>, Error> {
let request = self.request.take().ok_or(Error::AlreadySent)?.build();
request.write(&mut self.conn).await?;
Response::read(&mut self.conn, request.method, rx_buf).await
Expand Down Expand Up @@ -428,7 +431,7 @@ where
&'req mut self,
mut request: Request<'req, B>,
rx_buf: &'req mut [u8],
) -> Result<Response<'req, 'res, C>, Error> {
) -> Result<Response<'req, HttpConnection<'res, C>>, Error> {
request.base_path = Some(self.base_path);
request.write(&mut self.conn).await?;
Response::read(&mut self.conn, request.method, rx_buf).await
Expand Down Expand Up @@ -456,7 +459,7 @@ where
/// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers.
///
/// The response is returned.
pub async fn send<'buf>(self, rx_buf: &'buf mut [u8]) -> Result<Response<'buf, 'conn, C>, Error>
pub async fn send<'buf>(self, rx_buf: &'buf mut [u8]) -> Result<Response<'buf, HttpConnection<'conn, C>>, Error>
where
'conn: 'req + 'buf,
'req: 'buf,
Expand Down
29 changes: 13 additions & 16 deletions src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use embedded_io::{ErrorKind, ErrorType};
use embedded_io::{Error, ErrorKind, ErrorType};
use embedded_io_async::{BufRead, Read, Write};

#[cfg(feature = "embedded-tls")]
use embedded_io::Error;

use crate::client::HttpConnection;

struct ReadBuffer<'buf> {
Expand Down Expand Up @@ -45,48 +42,48 @@ impl ReadBuffer<'_> {
}
}

pub struct BufferingReader<'buf, 'conn, B>
pub struct BufferingReader<'buf, B>
where
B: Read + Write,
B: Read,
{
buffer: ReadBuffer<'buf>,
stream: &'buf mut HttpConnection<'conn, B>,
stream: &'buf mut B,
}

impl<'buf, 'conn, B> BufferingReader<'buf, 'conn, B>
impl<'buf, 'conn, B> BufferingReader<'buf, B>
where
B: Read + Write,
B: Read,
{
pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: &'buf mut HttpConnection<'conn, B>) -> Self {
pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: &'buf mut B) -> Self {
Self {
buffer: ReadBuffer::new(buffer, loaded),
stream,
}
}
}

impl<C> ErrorType for BufferingReader<'_, '_, C>
impl<C> ErrorType for BufferingReader<'_, C>
where
C: Read + Write,
C: Read,
{
type Error = ErrorKind;
}

impl<C> Read for BufferingReader<'_, '_, C>
impl<C> Read for BufferingReader<'_, C>
where
C: Read + Write,
C: Read,
{
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
if !self.buffer.is_empty() {
let amt = self.buffer.read(buf)?;
return Ok(amt);
}

self.stream.read(buf).await
self.stream.read(buf).await.map_err(|e| e.kind())
}
}

impl<C> BufRead for BufferingReader<'_, '_, C>
impl<C> BufRead for BufferingReader<'_, HttpConnection<'_, C>>
where
C: Read + Write,
{
Expand Down
88 changes: 40 additions & 48 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ use embedded_io::{Error as _, ErrorType};
use embedded_io_async::{BufRead, Read, Write};
use heapless::Vec;

use crate::client::HttpConnection;
use crate::headers::{ContentType, KeepAlive, TransferEncoding};
use crate::reader::BufferingReader;
use crate::request::Method;
use crate::Error;

/// Type representing a parsed HTTP response.
pub struct Response<'buf, 'conn, C>
pub struct Response<'buf, C>
where
C: Read + Write,
C: Read,
{
conn: &'buf mut HttpConnection<'conn, C>,
conn: &'buf mut C,
/// The method used to create the response.
method: Method,
/// The HTTP response status code.
Expand All @@ -32,7 +31,7 @@ where
}

#[cfg(feature = "defmt")]
impl<C> defmt::Format for Response<'_, '_, C>
impl<C> defmt::Format for Response<'_, C>
where
C: Read + Write,
{
Expand All @@ -53,7 +52,7 @@ where
}
}

impl<C> core::fmt::Debug for Response<'_, '_, C>
impl<C> core::fmt::Debug for Response<'_, C>
where
C: Read + Write,
{
Expand All @@ -72,16 +71,12 @@ where
}
}

impl<'buf, 'conn, C> Response<'buf, 'conn, C>
impl<'buf, C> Response<'buf, C>
where
C: Read + Write,
C: Read,
{
// Read at least the headers from the connection.
pub async fn read(
conn: &'buf mut HttpConnection<'conn, C>,
method: Method,
header_buf: &'buf mut [u8],
) -> Result<Response<'buf, 'conn, C>, Error> {
pub async fn read(conn: &'buf mut C, method: Method, header_buf: &'buf mut [u8]) -> Result<Self, Error> {
let mut header_len = 0;
let mut pos = 0;
while pos < header_buf.len() {
Expand Down Expand Up @@ -179,7 +174,7 @@ where
}

/// Get the response body
pub fn body(self) -> ResponseBody<'buf, 'conn, C> {
pub fn body(self) -> ResponseBody<'buf, C> {
let reader_hint = if self.method == Method::HEAD {
// Head requests does not have a body so we return an empty reader
ReaderHint::Empty
Expand Down Expand Up @@ -223,11 +218,11 @@ impl<'a> Iterator for HeaderIterator<'a> {
/// This type contains the original header buffer provided to `read_headers`,
/// now renamed to `body_buf`, the number of read body bytes that are available
/// in `body_buf`, and a reader to be used for reading the remaining body.
pub struct ResponseBody<'buf, 'conn, C>
pub struct ResponseBody<'buf, C>
where
C: Read + Write,
C: Read,
{
conn: &'buf mut HttpConnection<'conn, C>,
conn: &'buf mut C,
reader_hint: ReaderHint,
/// The number of raw bytes read from the body and available in the beginning of `body_buf`.
raw_body_read: usize,
Expand All @@ -242,11 +237,11 @@ enum ReaderHint {
ToEnd, // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.3 pt. 7: Until end of connection
}

impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C>
impl<'buf, C> ResponseBody<'buf, C>
where
C: Read + Write,
C: Read,
{
pub fn reader(self) -> BodyReader<BufferingReader<'buf, 'conn, C>> {
pub fn reader(self) -> BodyReader<BufferingReader<'buf, C>> {
let raw_body = BufferingReader::new(self.body_buf, self.raw_body_read, self.conn);

match self.reader_hint {
Expand All @@ -264,9 +259,9 @@ where
}
}

impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C>
impl<'buf, C> ResponseBody<'buf, C>
where
C: Read + Write,
C: Read,
{
/// Read the entire body into the buffer originally provided [`Response::read()`].
/// This requires that this original buffer is large enough to contain the entire body.
Expand Down Expand Up @@ -326,7 +321,7 @@ pub enum BodyReader<B> {

impl<B> BodyReader<B>
where
B: BufRead + Read,
B: Read,
{
/// Read the entire body
pub async fn read_to_end(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
Expand Down Expand Up @@ -360,14 +355,13 @@ where

async fn discard(&mut self) -> Result<usize, Error> {
let mut body_len = 0;
let mut buf = [0; 128];
loop {
let buf = self.fill_buf().await?;
if buf.is_empty() {
let buf = self.read(&mut buf).await?;
if buf == 0 {
break;
}
let buf_len = buf.len();
body_len += buf_len;
self.consume(buf_len);
body_len += buf;
}

Ok(body_len)
Expand All @@ -380,7 +374,7 @@ impl<B> ErrorType for BodyReader<B> {

impl<B> Read for BodyReader<B>
where
B: BufRead + Read,
B: Read,
{
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
match self {
Expand Down Expand Up @@ -427,7 +421,7 @@ impl<C> ErrorType for FixedLengthBodyReader<C> {

impl<C> Read for FixedLengthBodyReader<C>
where
C: BufRead + Read,
C: Read,
{
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
if self.remaining == 0 {
Expand Down Expand Up @@ -506,31 +500,29 @@ pub struct ChunkedBodyReader<B> {

impl<C> ChunkedBodyReader<C>
where
C: BufRead + Read,
C: Read,
{
async fn read_next_chunk_length(&mut self) -> Result<(), Error> {
let mut header_buf = [0; 8 + 2]; // 32 bit hex + \r + \n
let mut total_read = 0;

'read_size: loop {
let buf = self.raw_body.fill_buf().await.map_err(|e| e.kind())?;
for (i, byte) in buf.iter().enumerate() {
if *byte != b'\n' {
header_buf[total_read] = *byte;
total_read += 1;

if total_read == header_buf.len() {
self.raw_body.consume(i + 1);
return Err(Error::Codec);
}
} else {
self.raw_body.consume(i + 1);
break 'read_size;
let mut byte = 0;
self.raw_body
.read_exact(core::slice::from_mut(&mut byte))
.await
.map_err(|e| Error::from(e).kind())?;

if byte != b'\n' {
header_buf[total_read] = byte;
total_read += 1;

if total_read == header_buf.len() {
return Err(Error::Codec);
}
} else {
break 'read_size;
}

let consumed = buf.len();
self.raw_body.consume(consumed);
}

if header_buf[total_read - 1] != b'\r' {
Expand Down Expand Up @@ -595,7 +587,7 @@ impl<C> ErrorType for ChunkedBodyReader<C> {

impl<C> Read for ChunkedBodyReader<C>
where
C: BufRead + Read,
C: Read,
{
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let remaining = self.handle_chunk_boundary().await?;
Expand Down

0 comments on commit b1128cd

Please sign in to comment.