diff --git a/Cargo.lock b/Cargo.lock index 58ee71a..58aa6c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,6 +100,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-channel" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" +dependencies = [ + "futures-core", +] + [[package]] name = "futures-core" version = "0.3.15" @@ -488,11 +497,13 @@ dependencies = [ name = "room-rs" version = "0.1.0" dependencies = [ + "futures-channel", "futures-util", "lazy_static", "rust_util", "tokio", "tokio-tungstenite", + "tungstenite", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0b09ace..2837aac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,9 @@ edition = "2018" [dependencies] rust_util = "0.6" +tungstenite = {version = "0.13", default-features = false} tokio-tungstenite = "0.14" tokio = { version = "1.0.0", features = ["full"]} futures-util = "0.3" lazy_static = "1.4" +futures-channel = "0.3" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index b318134..4428189 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,48 +1,123 @@ -#[macro_use] extern crate lazy_static; #[macro_use] extern crate rust_util; -use std::{env, io::Error}; -use std::process; -use futures_util::StreamExt; +use std::{ + collections::HashMap, + env, + io::Error as IoError, + net::SocketAddr, + sync::{Arc, Mutex}, +}; +use futures_channel::mpsc::{unbounded, UnboundedSender}; +use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; +use tungstenite::protocol::Message; use rust_util::XResult; +use std::collections::BTreeMap; + +type Tx = UnboundedSender; +type PeerMap = Arc>>; +type RoomMap = Arc>>>; + +async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) { + if let Err(e) = inner_handle_connection(peer_map, room_map, raw_stream, addr).await { + failure!("Handler connection error: {}", e); + } +} + +async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) -> XResult<()> { + println!("Incoming TCP connection from: {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; + println!("WebSocket connection established: {}", addr); + + // Insert the write part of this peer to the peer map. + let (tx, rx) = unbounded(); + peer_map.lock().unwrap().insert(addr, tx.clone()); + + let (outgoing, incoming) = ws_stream.split(); + + let mut room_id: Option = None; + let mut client_id: Option = None; + let broadcast_incoming = incoming.try_for_each(|msg| { + match msg { + Message::Close(_) => { + information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr); + }, + Message::Ping(msg) => { + tx.unbounded_send(Message::Pong(msg)).unwrap(); + }, + Message::Pong(_) => {}, + Message::Binary(_) => { + warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id) + }, + Message::Text(msg) => { + // TODO ... + // create/enter room + // exit room + // destroy room + // list peers + // enter peer + // exit peer + // send message + // - broadcast + // - peer + }, + } + + // println!("Received a message from {}: {}", addr, msg.to_text().unwrap()); + // let peers = peer_map.lock().unwrap(); + // + // // We want to broadcast the message to everyone except ourselves. + // let broadcast_recipients = + // peers.iter().filter(|(peer_addr, _)| peer_addr != &&addr).map(|(_, ws_sink)| ws_sink); + // + // for recp in broadcast_recipients { + // recp.unbounded_send(msg.clone()).unwrap(); + // } + + future::ok(()) + }); + + let receive_from_others = rx.map(Ok).forward(outgoing); + + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; + + information!("{} disconnected", &addr); + client_exit(&room_map, &room_id, &client_id); + peer_map.lock().unwrap().remove(&addr); + + Ok(()) +} + +fn client_exit(room_map: &RoomMap, room_id: &Option, client_id: &Option) { + if let (Some(room_id), Some(client_id)) = (room_id, client_id) { + let mut room_map = room_map.lock().unwrap(); + if let Some(client_map) = room_map.get_mut(room_id) { + client_map.remove(client_id); + if client_map.is_empty() { + room_map.remove(room_id); + } + } + } +} #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), IoError> { let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); - let listener = match TcpListener::bind(&addr).await { - Ok(socket) => socket, - Err(e) => { - failure!("Try listen on: {}, faield: {}", addr, e); - process::exit(-1); - }, - }; - success!("Listening on: {}", addr); + let state = PeerMap::new(Mutex::new(HashMap::new())); + let room = RoomMap::new(Mutex::new(BTreeMap::new())); - while let Ok((stream, _)) = listener.accept().await { - tokio::spawn(accept_connection(stream)); + // Create the event loop and TCP listener we'll accept connections on. + let try_socket = TcpListener::bind(&addr).await; + let listener = try_socket.expect("Failed to bind"); + println!("Listening on: {}", addr); + + // Let's spawn the handling of each connection in a separate task. + while let Ok((stream, addr)) = listener.accept().await { + tokio::spawn(handle_connection(state.clone(), room.clone(), stream, addr)); } Ok(()) -} - -async fn accept_connection(stream: TcpStream) { - if let Err(e) = inner_accept_connection(stream).await { - failure!("Error occurred in accepting connection: {}", e); - } -} - -async fn inner_accept_connection(stream: TcpStream) -> XResult<()> { - let addr = stream.peer_addr()?; - information!("Peer address: {}", addr); - - let ws_stream = tokio_tungstenite::accept_async(stream).await?; - - information!("New WebSocket connection: {}", addr); - - let (write, read) = ws_stream.split(); - read.forward(write).await?; - - Ok(()) -} +} \ No newline at end of file diff --git a/src/main2.rs b/src/main2.rs new file mode 100644 index 0000000..3e5d360 --- /dev/null +++ b/src/main2.rs @@ -0,0 +1,75 @@ +#[macro_use] extern crate lazy_static; +#[macro_use] extern crate rust_util; + +use std::{env, io::Error}; +use std::process; +use futures_util::StreamExt; +use futures_util::stream::SplitSink; +use tokio::net::{TcpListener, TcpStream}; +use rust_util::XResult; +use tokio::sync::Mutex; +use std::collections::BTreeMap; +use tokio_tungstenite::WebSocketStream; + +lazy_static!{ + static ref ALL_ROOMS: Mutex, Message>>>> = Mutex::new(BTreeMap::new()); +} + +// create/enter room +// exit room +// destroy room +// list peers +// enter peer +// exit peer +// send message +// - broadcast +// - peer + +// const MESSAGE_TYPE_CREATE: &str = "create"; +// const MESSAGE_TYPE_MESSAGE: &str = "message"; + +struct Message { + ty: String, + room: Option, + data: Option, +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()); + + let listener = match TcpListener::bind(&addr).await { + Ok(socket) => socket, + Err(e) => { + failure!("Try listen on: {}, faield: {}", addr, e); + process::exit(-1); + }, + }; + success!("Listening on: {}", addr); + + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(accept_connection(stream)); + } + + Ok(()) +} + +async fn accept_connection(stream: TcpStream) { + if let Err(e) = inner_accept_connection(stream).await { + failure!("Error occurred in accepting connection: {}", e); + } +} + +async fn inner_accept_connection(stream: TcpStream) -> XResult<()> { + let addr = stream.peer_addr()?; + information!("Peer address: {}", addr); + let ws_stream = tokio_tungstenite::accept_async(stream).await?; + information!("New WebSocket connection: {}", addr); + + let (write, mut read) = ws_stream.split(); + //read.forward(write).await?; + let a = read.next().await; + + + Ok(()) +}