feat: room
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -100,6 +100,15 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-channel"
|
||||||
|
version = "0.3.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-core"
|
name = "futures-core"
|
||||||
version = "0.3.15"
|
version = "0.3.15"
|
||||||
@@ -488,11 +497,13 @@ dependencies = [
|
|||||||
name = "room-rs"
|
name = "room-rs"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"rust_util",
|
"rust_util",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-tungstenite",
|
"tokio-tungstenite",
|
||||||
|
"tungstenite",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ edition = "2018"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rust_util = "0.6"
|
rust_util = "0.6"
|
||||||
|
tungstenite = {version = "0.13", default-features = false}
|
||||||
tokio-tungstenite = "0.14"
|
tokio-tungstenite = "0.14"
|
||||||
tokio = { version = "1.0.0", features = ["full"]}
|
tokio = { version = "1.0.0", features = ["full"]}
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
lazy_static = "1.4"
|
lazy_static = "1.4"
|
||||||
|
futures-channel = "0.3"
|
||||||
145
src/main.rs
145
src/main.rs
@@ -1,48 +1,123 @@
|
|||||||
#[macro_use] extern crate lazy_static;
|
|
||||||
#[macro_use] extern crate rust_util;
|
#[macro_use] extern crate rust_util;
|
||||||
|
|
||||||
use std::{env, io::Error};
|
use std::{
|
||||||
use std::process;
|
collections::HashMap,
|
||||||
use futures_util::StreamExt;
|
env,
|
||||||
|
io::Error as IoError,
|
||||||
|
net::SocketAddr,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
};
|
||||||
|
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||||
|
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tungstenite::protocol::Message;
|
||||||
use rust_util::XResult;
|
use rust_util::XResult;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
type Tx = UnboundedSender<Message>;
|
||||||
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
||||||
|
type RoomMap = Arc<Mutex<BTreeMap<String, BTreeMap<String, SocketAddr>>>>;
|
||||||
|
|
||||||
|
async fn handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) {
|
||||||
|
if let Err(e) = inner_handle_connection(peer_map, room_map, raw_stream, addr).await {
|
||||||
|
failure!("Handler connection error: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn inner_handle_connection(peer_map: PeerMap, room_map: RoomMap, raw_stream: TcpStream, addr: SocketAddr) -> XResult<()> {
|
||||||
|
println!("Incoming TCP connection from: {}", addr);
|
||||||
|
|
||||||
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||||
|
println!("WebSocket connection established: {}", addr);
|
||||||
|
|
||||||
|
// Insert the write part of this peer to the peer map.
|
||||||
|
let (tx, rx) = unbounded();
|
||||||
|
peer_map.lock().unwrap().insert(addr, tx.clone());
|
||||||
|
|
||||||
|
let (outgoing, incoming) = ws_stream.split();
|
||||||
|
|
||||||
|
let mut room_id: Option<String> = None;
|
||||||
|
let mut client_id: Option<String> = None;
|
||||||
|
let broadcast_incoming = incoming.try_for_each(|msg| {
|
||||||
|
match msg {
|
||||||
|
Message::Close(_) => {
|
||||||
|
information!("Close connection: {:?} - {:?} from: {}", room_id, client_id, addr);
|
||||||
|
},
|
||||||
|
Message::Ping(msg) => {
|
||||||
|
tx.unbounded_send(Message::Pong(msg)).unwrap();
|
||||||
|
},
|
||||||
|
Message::Pong(_) => {},
|
||||||
|
Message::Binary(_) => {
|
||||||
|
warning!("Ignore binary message from: {:?} - {:?}", room_id, client_id)
|
||||||
|
},
|
||||||
|
Message::Text(msg) => {
|
||||||
|
// TODO ...
|
||||||
|
// create/enter room
|
||||||
|
// exit room
|
||||||
|
// destroy room
|
||||||
|
// list peers
|
||||||
|
// enter peer
|
||||||
|
// exit peer
|
||||||
|
// send message
|
||||||
|
// - broadcast
|
||||||
|
// - peer
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// println!("Received a message from {}: {}", addr, msg.to_text().unwrap());
|
||||||
|
// let peers = peer_map.lock().unwrap();
|
||||||
|
//
|
||||||
|
// // We want to broadcast the message to everyone except ourselves.
|
||||||
|
// let broadcast_recipients =
|
||||||
|
// peers.iter().filter(|(peer_addr, _)| peer_addr != &&addr).map(|(_, ws_sink)| ws_sink);
|
||||||
|
//
|
||||||
|
// for recp in broadcast_recipients {
|
||||||
|
// recp.unbounded_send(msg.clone()).unwrap();
|
||||||
|
// }
|
||||||
|
|
||||||
|
future::ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
let receive_from_others = rx.map(Ok).forward(outgoing);
|
||||||
|
|
||||||
|
pin_mut!(broadcast_incoming, receive_from_others);
|
||||||
|
future::select(broadcast_incoming, receive_from_others).await;
|
||||||
|
|
||||||
|
information!("{} disconnected", &addr);
|
||||||
|
client_exit(&room_map, &room_id, &client_id);
|
||||||
|
peer_map.lock().unwrap().remove(&addr);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn client_exit(room_map: &RoomMap, room_id: &Option<String>, client_id: &Option<String>) {
|
||||||
|
if let (Some(room_id), Some(client_id)) = (room_id, client_id) {
|
||||||
|
let mut room_map = room_map.lock().unwrap();
|
||||||
|
if let Some(client_map) = room_map.get_mut(room_id) {
|
||||||
|
client_map.remove(client_id);
|
||||||
|
if client_map.is_empty() {
|
||||||
|
room_map.remove(room_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), IoError> {
|
||||||
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
||||||
|
|
||||||
let listener = match TcpListener::bind(&addr).await {
|
let state = PeerMap::new(Mutex::new(HashMap::new()));
|
||||||
Ok(socket) => socket,
|
let room = RoomMap::new(Mutex::new(BTreeMap::new()));
|
||||||
Err(e) => {
|
|
||||||
failure!("Try listen on: {}, faield: {}", addr, e);
|
|
||||||
process::exit(-1);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
success!("Listening on: {}", addr);
|
|
||||||
|
|
||||||
while let Ok((stream, _)) = listener.accept().await {
|
// Create the event loop and TCP listener we'll accept connections on.
|
||||||
tokio::spawn(accept_connection(stream));
|
let try_socket = TcpListener::bind(&addr).await;
|
||||||
|
let listener = try_socket.expect("Failed to bind");
|
||||||
|
println!("Listening on: {}", addr);
|
||||||
|
|
||||||
|
// Let's spawn the handling of each connection in a separate task.
|
||||||
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
|
tokio::spawn(handle_connection(state.clone(), room.clone(), stream, addr));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn accept_connection(stream: TcpStream) {
|
|
||||||
if let Err(e) = inner_accept_connection(stream).await {
|
|
||||||
failure!("Error occurred in accepting connection: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn inner_accept_connection(stream: TcpStream) -> XResult<()> {
|
|
||||||
let addr = stream.peer_addr()?;
|
|
||||||
information!("Peer address: {}", addr);
|
|
||||||
|
|
||||||
let ws_stream = tokio_tungstenite::accept_async(stream).await?;
|
|
||||||
|
|
||||||
information!("New WebSocket connection: {}", addr);
|
|
||||||
|
|
||||||
let (write, read) = ws_stream.split();
|
|
||||||
read.forward(write).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|||||||
75
src/main2.rs
Normal file
75
src/main2.rs
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
#[macro_use] extern crate lazy_static;
|
||||||
|
#[macro_use] extern crate rust_util;
|
||||||
|
|
||||||
|
use std::{env, io::Error};
|
||||||
|
use std::process;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use futures_util::stream::SplitSink;
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use rust_util::XResult;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use tokio_tungstenite::WebSocketStream;
|
||||||
|
|
||||||
|
lazy_static!{
|
||||||
|
static ref ALL_ROOMS: Mutex<BTreeMap<String, BTreeMap<String, SplitSink<WebSocketStream<TcpStream>, Message>>>> = Mutex::new(BTreeMap::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
// create/enter room
|
||||||
|
// exit room
|
||||||
|
// destroy room
|
||||||
|
// list peers
|
||||||
|
// enter peer
|
||||||
|
// exit peer
|
||||||
|
// send message
|
||||||
|
// - broadcast
|
||||||
|
// - peer
|
||||||
|
|
||||||
|
// const MESSAGE_TYPE_CREATE: &str = "create";
|
||||||
|
// const MESSAGE_TYPE_MESSAGE: &str = "message";
|
||||||
|
|
||||||
|
struct Message {
|
||||||
|
ty: String,
|
||||||
|
room: Option<String>,
|
||||||
|
data: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Error> {
|
||||||
|
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
||||||
|
|
||||||
|
let listener = match TcpListener::bind(&addr).await {
|
||||||
|
Ok(socket) => socket,
|
||||||
|
Err(e) => {
|
||||||
|
failure!("Try listen on: {}, faield: {}", addr, e);
|
||||||
|
process::exit(-1);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
success!("Listening on: {}", addr);
|
||||||
|
|
||||||
|
while let Ok((stream, _)) = listener.accept().await {
|
||||||
|
tokio::spawn(accept_connection(stream));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn accept_connection(stream: TcpStream) {
|
||||||
|
if let Err(e) = inner_accept_connection(stream).await {
|
||||||
|
failure!("Error occurred in accepting connection: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn inner_accept_connection(stream: TcpStream) -> XResult<()> {
|
||||||
|
let addr = stream.peer_addr()?;
|
||||||
|
information!("Peer address: {}", addr);
|
||||||
|
let ws_stream = tokio_tungstenite::accept_async(stream).await?;
|
||||||
|
information!("New WebSocket connection: {}", addr);
|
||||||
|
|
||||||
|
let (write, mut read) = ws_stream.split();
|
||||||
|
//read.forward(write).await?;
|
||||||
|
let a = read.next().await;
|
||||||
|
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user