Skip to content

server::conn::http1::Connection always holds a proto #3018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ impl Sender {
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
#[cfg(feature = "http1")]
pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.data_tx
.try_send(Ok(chunk))
Expand Down Expand Up @@ -447,7 +448,7 @@ mod tests {
assert!(err.is_body_write_aborted(), "{:?}", err);
}

#[cfg(not(miri))]
#[cfg(all(not(miri), feature = "http1"))]
#[tokio::test]
async fn channel_abort_when_buffer_is_full() {
let (mut tx, mut rx) = Recv::channel();
Expand All @@ -463,6 +464,7 @@ mod tests {
assert!(err.is_body_write_aborted(), "{:?}", err);
}

#[cfg(feature = "http1")]
#[test]
fn channel_buffers_one() {
let (mut tx, _rx) = Recv::channel();
Expand Down
1 change: 1 addition & 0 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use http_body::SizeHint;

pub use self::aggregate::aggregate;
pub use self::body::Recv;
#[cfg(feature = "http1")]
pub(crate) use self::body::Sender;
pub(crate) use self::length::DecodedLength;
pub use self::to_bytes::to_bytes;
Expand Down
13 changes: 0 additions & 13 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ pub(super) enum User {
#[cfg(feature = "http1")]
ManualUpgrade,

/// User called `server::Connection::without_shutdown()` on an HTTP/2 conn.
#[cfg(feature = "server")]
WithoutShutdownNonHttp1,

/// User aborted in an FFI callback.
#[cfg(feature = "ffi")]
AbortedByCallback,
Expand Down Expand Up @@ -308,11 +304,6 @@ impl Error {
Error::new_user(User::Body).with(cause)
}

#[cfg(feature = "server")]
pub(super) fn new_without_shutdown_not_h1() -> Error {
Error::new(Kind::User(User::WithoutShutdownNonHttp1))
}

#[cfg(feature = "http1")]
pub(super) fn new_shutdown(cause: std::io::Error) -> Error {
Error::new(Kind::Shutdown).with(cause)
Expand Down Expand Up @@ -399,10 +390,6 @@ impl Error {
Kind::User(User::NoUpgrade) => "no upgrade available",
#[cfg(feature = "http1")]
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",
#[cfg(feature = "server")]
Kind::User(User::WithoutShutdownNonHttp1) => {
"without_shutdown() called on a non-HTTP/1 connection"
}
#[cfg(feature = "ffi")]
Kind::User(User::AbortedByCallback) => "operation aborted by an application callback",
}
Expand Down
61 changes: 20 additions & 41 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pin_project_lite::pin_project! {
where
S: HttpService<Recv>,
{
conn: Option<Http1Dispatcher<T, S::ResBody, S>>,
conn: Http1Dispatcher<T, S::ResBody, S>,
}
}

Expand Down Expand Up @@ -98,12 +98,7 @@ where
/// pending. If called after `Connection::poll` has resolved, this does
/// nothing.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
match self.conn {
Some(ref mut h1) => {
h1.disable_keep_alive();
}
None => (),
}
self.conn.disable_keep_alive();
}

/// Return the inner IO object, and additional information.
Expand All @@ -116,25 +111,13 @@ where
/// # Panics
/// This method will panic if this connection is using an h2 protocol.
pub fn into_parts(self) -> Parts<I, S> {
self.try_into_parts()
.unwrap_or_else(|| panic!("h2 cannot into_inner"))
}

/// Return the inner IO object, and additional information, if available.
///
///
/// TODO:(mike) does this need to return none for h1 or is it expected to always be present? previously used an "unwrap"
/// This method will return a `None` if this connection is using an h2 protocol.
pub fn try_into_parts(self) -> Option<Parts<I, S>> {
self.conn.map(|h1| {
let (io, read_buf, dispatch) = h1.into_inner();
Parts {
io,
read_buf,
service: dispatch.into_service(),
_inner: (),
}
})
let (io, read_buf, dispatch) = self.conn.into_inner();
Parts {
io,
read_buf,
service: dispatch.into_service(),
_inner: (),
}
}

/// Poll the connection for completion, but without calling `shutdown`
Expand All @@ -150,7 +133,7 @@ where
S::Future: Unpin,
B: Unpin,
{
self.conn.as_mut().unwrap().poll_without_shutdown(cx)
self.conn.poll_without_shutdown(cx)
}

/// Prevent shutdown of the underlying IO object at the end of service the request,
Expand All @@ -165,15 +148,11 @@ where
S::Future: Unpin,
B: Unpin,
{
// TODO(mike): "new_without_shutdown_not_h1" is not possible here
let mut conn = Some(self);
let mut zelf = Some(self);
futures_util::future::poll_fn(move |cx| {
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
Poll::Ready(
conn.take()
.unwrap()
.try_into_parts()
.ok_or_else(crate::Error::new_without_shutdown_not_h1),
Ok(zelf.take().unwrap().into_parts())
)
})
}
Expand All @@ -185,7 +164,7 @@ where
where
I: Send,
{
upgrades::UpgradeableConnection { inner: self }
upgrades::UpgradeableConnection { inner: Some(self) }
}
}

Expand All @@ -201,7 +180,7 @@ where
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
match ready!(Pin::new(&mut self.conn).poll(cx)) {
Ok(done) => {
match done {
proto::Dispatched::Shutdown => {}
Expand Down Expand Up @@ -417,7 +396,7 @@ impl Builder {
let sd = proto::h1::dispatch::Server::new(service);
let proto = proto::h1::Dispatcher::new(sd, conn);
Connection {
conn: Some(proto),
conn: proto,
}
}
}
Expand All @@ -436,7 +415,7 @@ mod upgrades {
where
S: HttpService<Recv>,
{
pub(super) inner: Connection<T, S>,
pub(super) inner: Option<Connection<T, S>>,
}

impl<I, B, S> UpgradeableConnection<I, S>
Expand All @@ -452,7 +431,7 @@ mod upgrades {
/// This `Connection` should continue to be polled until shutdown
/// can finish.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
Pin::new(&mut self.inner).graceful_shutdown()
Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
}
}

Expand All @@ -467,10 +446,10 @@ mod upgrades {
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let (io, buf, _) = self.inner.conn.take().unwrap().into_inner();
let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
}
Expand Down