feat: forward stream

This commit is contained in:
2020-08-09 11:04:49 +08:00
parent 05538818ff
commit 66b6ddda9c
3 changed files with 933 additions and 431 deletions

1261
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -8,9 +8,13 @@ edition = "2018"
[dependencies] [dependencies]
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
reqwest = "0.10" reqwest = { version = "0.10", features = ["stream"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
warp = "0.2"
clap = "2.0" clap = "2.0"
rust_util = "0.6"
actix-http = "1.0"
actix-web = "2"
actix-rt = "1"
bytes = "0.5"
futures-core = "0.3"

View File

@@ -1,3 +1,92 @@
fn main() { #[macro_use]
println!("Hello, world!"); extern crate rust_util;
use std::pin::Pin;
use std::fmt::{ self, Display, Formatter };
use std::error::Error;
use std::task::{ Context, Poll };
use bytes::Bytes;
use clap::{ App, Arg };
use actix_web::{ web, HttpServer, HttpRequest, HttpResponse };
const DEFAULT_PORT: u16 = 8888;
#[derive(Debug)]
enum GetStreamError {
ReqwestError(reqwest::Error),
}
impl Display for GetStreamError {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Unknown error!")
}
}
impl Error for GetStreamError {
fn cause(&self) -> Option<&dyn Error> {
None
}
}
impl From<reqwest::Error> for GetStreamError {
fn from(reqwest_error: reqwest::Error) -> Self {
GetStreamError::ReqwestError(reqwest_error)
}
}
impl From<GetStreamError> for actix_http::error::Error {
fn from(_: GetStreamError) -> Self { todo!() }
}
struct ProxyStream {
inner: Box<dyn futures_core::Stream<Item = reqwest::Result<Bytes>> + Unpin>,
}
impl futures_core::Stream for ProxyStream {
type Item = Result<Bytes, GetStreamError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match futures_core::Stream::poll_next(Pin::new(&mut (*self.inner)), cx) {
Poll::Ready(t) => Poll::Ready(t.map(|e| e.map_err(|e| e.into()))),
Poll::Pending => Poll::Pending,
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
futures_core::Stream::size_hint(&*self.inner)
}
}
async fn get_file(req: HttpRequest) -> HttpResponse {
println!("{:?}", req);
let resp = reqwest::get("http://example.com/").await.unwrap();
HttpResponse::Ok()
.content_type("text/plain")
.streaming(ProxyStream{ inner: Box::new(resp.bytes_stream()) })
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
let app = App::new(env!("CARGO_PKG_NAME"))
.version(env!("CARGO_PKG_VERSION"))
.about(env!("CARGO_PKG_DESCRIPTION"))
.arg(Arg::with_name("listen_wide").long("wide").short("w").help("Listen wide 0.0.0.0:port"))
.arg(Arg::with_name("listen_port").long("port").short("p").takes_value(true).help("Listen port"));
let matches = app.get_matches();
let is_listen_wide = matches.is_present("listen_wide");
let listen_port = matches.value_of("listen_port")
.map(|p| p.parse::<u16>().unwrap_or_else(|e| panic!(format!("Parse port: {}, error: {}", p, e))))
.unwrap_or_else(|| DEFAULT_PORT);
let listen_addr = iff!(is_listen_wide, "0.0.0.0", "127.0.0.1");
information!("Listen at: {}:{}", listen_addr, listen_port);
HttpServer::new(||
actix_web::App::new()
.service(web::resource("/get_file").to(get_file))
).bind(format!("{}:{}", listen_addr, listen_port))?
.run()
.await
} }