-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Make client able to use non-Send executor #3184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
d88ec30
44587d6
55753a9
ad57f1c
4132aa6
4138669
ce4619e
4ac835a
27eb1e2
ed95666
beb3ee4
c4a51b3
7cbfc72
d3821a9
2394949
8e0f907
6d44c29
c7d59fe
0269396
3a78a57
be5c654
b39d5d4
15be46f
8391604
a276efc
3dd0579
98b0599
25ff95c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| #![deny(warnings)] | ||
| #![warn(rust_2018_idioms)] | ||
| use std::env; | ||
| use std::marker::PhantomData; | ||
| use std::pin::Pin; | ||
| use std::task::{Context, Poll}; | ||
|
|
||
| use bytes::Bytes; | ||
| use http_body_util::BodyExt; | ||
| use hyper::body::{Body as HttpBody, Frame}; | ||
| use hyper::Error; | ||
| use hyper::Request; | ||
| use tokio::io::{self, AsyncWriteExt as _}; | ||
| use tokio::net::TcpStream; | ||
|
|
||
| struct Body { | ||
| // Our Body type is !Send and !Sync: | ||
| _marker: PhantomData<*const ()>, | ||
| data: Option<Bytes>, | ||
| } | ||
|
|
||
| impl From<String> for Body { | ||
| fn from(a: String) -> Self { | ||
| Body { | ||
| _marker: PhantomData, | ||
| data: Some(a.into()), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl HttpBody for Body { | ||
| type Data = Bytes; | ||
| type Error = Error; | ||
|
|
||
| fn poll_frame( | ||
| self: Pin<&mut Self>, | ||
| _: &mut Context<'_>, | ||
| ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { | ||
| Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d)))) | ||
| } | ||
| } | ||
|
|
||
| fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| pretty_env_logger::init(); | ||
|
|
||
| // Configure a runtime that runs everything on the current thread | ||
| let rt = tokio::runtime::Builder::new_current_thread() | ||
| .enable_all() | ||
| .build() | ||
| .expect("build runtime"); | ||
|
|
||
| // Combine it with a `LocalSet, which means it can spawn !Send futures... | ||
| let local = tokio::task::LocalSet::new(); | ||
| local.block_on(&rt, init()) | ||
| } | ||
|
|
||
| async fn init() -> Result<(), Box<dyn std::error::Error>> { | ||
| // Some simple CLI args requirements... | ||
| let url = match env::args().nth(1) { | ||
| Some(url) => url, | ||
| None => { | ||
| println!("Usage: client <url>"); | ||
| return Ok(()); | ||
| } | ||
| }; | ||
|
|
||
| // HTTPS requires picking a TLS implementation, so give a better | ||
| // warning if the user tries to request an 'https' URL. | ||
| let url = url.parse::<hyper::Uri>().unwrap(); | ||
| if url.scheme_str() != Some("http") { | ||
| println!("This example only works with 'http' URLs."); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| fetch_url(url).await | ||
| } | ||
|
|
||
| async fn fetch_url(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> { | ||
| let host = url.host().expect("uri has no host"); | ||
| let port = url.port_u16().unwrap_or(80); | ||
| let addr = format!("{}:{}", host, port); | ||
| let stream = TcpStream::connect(addr).await?; | ||
|
|
||
| let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?; | ||
| tokio::task::spawn_local(async move { | ||
| if let Err(err) = conn.await { | ||
| println!("Connection failed: {:?}", err); | ||
| } | ||
| }); | ||
|
|
||
| let authority = url.authority().unwrap().clone(); | ||
|
|
||
| let req = Request::builder() | ||
| .uri(url) | ||
| .header(hyper::header::HOST, authority.as_str()) | ||
| .body(Body::from("test".to_string()))?; | ||
|
|
||
| let mut res = sender.send_request(req).await?; | ||
|
|
||
| println!("Response: {}", res.status()); | ||
| println!("Headers: {:#?}\n", res.headers()); | ||
|
|
||
| // Stream the body, writing each chunk to stdout as we get it | ||
| // (instead of buffering and printing at the end). | ||
| while let Some(next) = res.frame().await { | ||
| let frame = next?; | ||
| if let Some(chunk) = frame.data_ref() { | ||
| io::stdout().write_all(&chunk).await?; | ||
| } | ||
| } | ||
|
|
||
| println!("\n\nDone!"); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| // NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor. | ||
| // | ||
| // Since the Server needs to spawn some background tasks, we needed | ||
| // to configure an Executor that can spawn !Send futures... | ||
| #[derive(Clone, Copy, Debug)] | ||
| struct LocalExec; | ||
|
|
||
| impl<F> hyper::rt::Executor<F> for LocalExec | ||
| where | ||
| F: std::future::Future + 'static, // not requiring `Send` | ||
| { | ||
| fn execute(&self, fut: F) { | ||
| // This will spawn into the currently running `LocalSet`. | ||
| tokio::task::spawn_local(fut); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; | |
|
|
||
| use super::super::dispatch; | ||
| use crate::body::{Body, Incoming as IncomingBody}; | ||
| use crate::common::exec::ExecutorClient; | ||
| use crate::common::time::Time; | ||
| use crate::common::{ | ||
| exec::{BoxSendFuture, Exec}, | ||
|
|
@@ -26,7 +27,9 @@ pub struct SendRequest<B> { | |
|
|
||
| impl<B> Clone for SendRequest<B> { | ||
| fn clone(&self) -> SendRequest<B> { | ||
| SendRequest { dispatch: self.dispatch.clone() } | ||
| SendRequest { | ||
| dispatch: self.dispatch.clone(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -35,20 +38,25 @@ impl<B> Clone for SendRequest<B> { | |
| /// In most cases, this should just be spawned into an executor, so that it | ||
| /// can process incoming and outgoing messages, notice hangups, and the like. | ||
| #[must_use = "futures do nothing unless polled"] | ||
| pub struct Connection<T, B> | ||
| pub struct Connection<T, B, E> | ||
| where | ||
| T: AsyncRead + AsyncWrite + Send + 'static, | ||
| T: AsyncRead + AsyncWrite + Send + 'static + Unpin, | ||
| B: Body + 'static, | ||
| E: ExecutorClient<B, T> + Unpin, | ||
| <B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static, | ||
| { | ||
| inner: (PhantomData<T>, proto::h2::ClientTask<B>), | ||
| inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>), | ||
| } | ||
|
|
||
| /// A builder to configure an HTTP connection. | ||
| /// | ||
| /// After setting options, the builder is used to create a handshake future. | ||
| #[derive(Clone, Debug)] | ||
| pub struct Builder { | ||
| pub(super) exec: Exec, | ||
| pub struct Builder<Ex> | ||
| where | ||
| Ex: Executor<BoxSendFuture> + Send + Sync + 'static + Clone, | ||
seanmonstar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| pub(super) exec: Ex, | ||
| pub(super) timer: Time, | ||
| h2_builder: proto::h2::client::Config, | ||
| } | ||
|
|
@@ -60,13 +68,15 @@ pub struct Builder { | |
| pub async fn handshake<E, T, B>( | ||
| exec: E, | ||
| io: T, | ||
| ) -> crate::Result<(SendRequest<B>, Connection<T, B>)> | ||
| ) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)> | ||
| where | ||
| E: Executor<BoxSendFuture> + Send + Sync + 'static, | ||
| T: AsyncRead + AsyncWrite + Unpin + Send + 'static, | ||
| B: Body + 'static, | ||
| B::Data: Send, | ||
| B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||
| E: ExecutorClient<B, T> + Unpin + Clone, | ||
| <B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static, | ||
|
||
| { | ||
| Builder::new(exec).handshake(io).await | ||
| } | ||
|
|
@@ -189,12 +199,14 @@ impl<B> fmt::Debug for SendRequest<B> { | |
|
|
||
| // ===== impl Connection | ||
|
|
||
| impl<T, B> Connection<T, B> | ||
| impl<T, B, E> Connection<T, B, E> | ||
| where | ||
| T: AsyncRead + AsyncWrite + Unpin + Send + 'static, | ||
| B: Body + Unpin + Send + 'static, | ||
| B::Data: Send, | ||
| B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||
| E: ExecutorClient<B, T> + Unpin, | ||
| <B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static, | ||
| { | ||
| /// Returns whether the [extended CONNECT protocol][1] is enabled or not. | ||
| /// | ||
|
|
@@ -210,22 +222,27 @@ where | |
| } | ||
| } | ||
|
|
||
| impl<T, B> fmt::Debug for Connection<T, B> | ||
| impl<T, B, E> fmt::Debug for Connection<T, B, E> | ||
| where | ||
| T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, | ||
| T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static+ Unpin, | ||
| B: Body + 'static, | ||
| E: ExecutorClient<B, T> + Unpin, | ||
| <B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static, | ||
seanmonstar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| f.debug_struct("Connection").finish() | ||
| } | ||
| } | ||
|
|
||
| impl<T, B> Future for Connection<T, B> | ||
| impl<T, B, E> Future for Connection<T, B, E> | ||
| where | ||
| T: AsyncRead + AsyncWrite + Unpin + Send + 'static, | ||
| B: Body + Send + 'static, | ||
| B: Body + 'static + Unpin, | ||
| B::Data: Send, | ||
| E: Unpin, | ||
| B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||
| E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin, | ||
| <B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static, | ||
| { | ||
| type Output = crate::Result<()>; | ||
|
|
||
|
|
@@ -240,22 +257,22 @@ where | |
|
|
||
| // ===== impl Builder | ||
|
|
||
| impl Builder { | ||
| impl<Ex> Builder<Ex> | ||
| where | ||
| Ex: Executor<BoxSendFuture> + Send + Sync + 'static + Clone, | ||
seanmonstar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| /// Creates a new connection builder. | ||
| #[inline] | ||
| pub fn new<E>(exec: E) -> Builder | ||
| where | ||
| E: Executor<BoxSendFuture> + Send + Sync + 'static, | ||
| { | ||
| pub fn new(exec: Ex) -> Builder<Ex> { | ||
| Builder { | ||
| exec: Exec::new(exec), | ||
| exec, | ||
| timer: Time::Empty, | ||
| h2_builder: Default::default(), | ||
| } | ||
| } | ||
|
|
||
| /// Provide a timer to execute background HTTP2 tasks. | ||
| pub fn timer<M>(&mut self, timer: M) -> &mut Builder | ||
| pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex> | ||
| where | ||
| M: Timer + Send + Sync + 'static, | ||
| { | ||
|
|
@@ -284,10 +301,7 @@ impl Builder { | |
| /// Passing `None` will do nothing. | ||
| /// | ||
| /// If not set, hyper will use a default. | ||
| pub fn initial_connection_window_size( | ||
| &mut self, | ||
| sz: impl Into<Option<u32>>, | ||
| ) -> &mut Self { | ||
| pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { | ||
| if let Some(sz) = sz.into() { | ||
| self.h2_builder.adaptive_window = false; | ||
| self.h2_builder.initial_conn_window_size = sz; | ||
|
|
@@ -329,10 +343,7 @@ impl Builder { | |
| /// Pass `None` to disable HTTP2 keep-alive. | ||
| /// | ||
| /// Default is currently disabled. | ||
| pub fn keep_alive_interval( | ||
| &mut self, | ||
| interval: impl Into<Option<Duration>>, | ||
| ) -> &mut Self { | ||
| pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self { | ||
| self.h2_builder.keep_alive_interval = interval.into(); | ||
| self | ||
| } | ||
|
|
@@ -395,12 +406,14 @@ impl Builder { | |
| pub fn handshake<T, B>( | ||
| &self, | ||
| io: T, | ||
| ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> | ||
| ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>> + '_ | ||
seanmonstar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| where | ||
| T: AsyncRead + AsyncWrite + Unpin + Send + 'static, | ||
| B: Body + 'static, | ||
| B::Data: Send, | ||
| B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||
| Ex: ExecutorClient<B, T> + Unpin, | ||
| <B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static, | ||
| { | ||
| let opts = self.clone(); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.