diff --git a/.editorconfig b/.editorconfig index 73390a1..bcb5235 100644 --- a/.editorconfig +++ b/.editorconfig @@ -2,8 +2,11 @@ root = true [*] indent_style = space -indent_size = 4 +indent_size = 2 end_of_line = lf charset = utf-8 trim_trailing_whitespace = false insert_final_newline = true + +[*.rs] +indent_size = 4 diff --git a/Cargo.lock b/Cargo.lock index 287c46f..c2ffca4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "digest" version = "0.10.6" @@ -1442,11 +1455,13 @@ version = "0.1.0" dependencies = [ "axum", "confique", + "dashmap", "futures", "futures-util", "miette", "once_cell", "serde", + "serde_json", "tokio", "tower-http", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 9176d6c..ff23b14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,5 @@ futures = "0.3" futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } uuid = { version = "1.3.1", features = ["v4", "serde"] } once_cell = "1.17.1" +serde_json = "1.0.95" +dashmap = "5.4.0" diff --git a/public/assets/main.css b/public/assets/main.css new file mode 100644 index 0000000..0f075a9 --- /dev/null +++ b/public/assets/main.css @@ -0,0 +1,38 @@ +:root { + --bg-rgb: 28, 23, 36; + --fg-rgb: 234, 234, 248; + --accent-rgb: 181, 127, 220; + --fg: rgb(var(--fg-rgb)); + --bg: rgb(var(--bg-rgb)); + --default-user-color: rgb(238, 126, 255); + --accent: rgb(var(--accent-rgb)); + --fg-transparent: rgba(var(--fg-rgb), 0.25); + --bg-transparent: rgba(var(--bg-rgb), 0.25); + --autocomplete-bg: linear-gradient( + var(--fg-transparent), + var(--fg-transparent) + ), + linear-gradient(var(--bg), var(--bg)); + --chip-bg: linear-gradient( + var(--accent-transparent), + var(--accent-transparent) + ), + linear-gradient(var(--bg), var(--bg)); + --accent-transparent: rgba(var(--accent-rgb), 0.25); +} + +html { + background-color: var(--bg); + color: var(--fg); + font-family: sans-serif; + font-size: 1.125rem; +} + +html, body { + margin: 0; + padding: 0; +} + +.hidden { + display: none; +} diff --git a/public/index.html b/public/index.html new file mode 100644 index 0000000..f423202 --- /dev/null +++ b/public/index.html @@ -0,0 +1,14 @@ + + + + watch party + + + + +
+

watch party

+
+
+ + diff --git a/src/main.rs b/src/main.rs index 7e5bd0b..9269a4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,28 @@ -use std::net::{IpAddr, SocketAddr}; +use std::{ + net::{IpAddr, SocketAddr}, + sync::{Arc, Mutex}, +}; use axum::{ - extract::{ws::WebSocket, Query, WebSocketUpgrade}, + extract::{ + ws::{Message, WebSocket}, + Query, WebSocketUpgrade, + }, + http::StatusCode, response::{IntoResponse, Response}, - routing::get, - Router, Server, + routing::{get, post}, + Json, Router, Server, }; use confique::Config; +use futures::{SinkExt, StreamExt}; use miette::{Context, IntoDiagnostic, Result}; +use rooms::create_room; use serde::Deserialize; +use sources::MediaSource; use tower_http::services::ServeDir; +use uuid::Uuid; + +use crate::rooms::{find_room, Member, RoomRef}; mod rooms; mod sources; @@ -25,12 +38,61 @@ struct WatchPartyConfig { #[derive(Deserialize)] struct WsQuery { room: String, + rejoin_id: Option, } -async fn handle_socket(socket: WebSocket, room: String) {} - async fn ws_handler(ws: WebSocketUpgrade, Query(q): Query) -> Response { - ws.on_upgrade(move |s| handle_socket(s, q.room)) + let Ok(room_id) = Uuid::parse_str(&q.room) else { + return (StatusCode::BAD_REQUEST, "Malformed Room ID").into_response() + }; + let Some(room) = find_room(room_id).await else { + return (StatusCode::NOT_FOUND, "Room does not exist").into_response() + }; + let rejoin_id = q.rejoin_id.and_then(|s| Uuid::parse_str(&s).ok()); + + ws.on_upgrade(move |s| handle_socket(s, room, rejoin_id)) +} + +async fn handle_socket(mut socket: WebSocket, room: RoomRef, rejoin_id: Option) { + let (mut tx, mut rx) = socket.split(); + + let mut member: Member = match rejoin_id { + Some(_rejoin_id) => todo!("find existing member"), + None => Member { + id: Uuid::new_v4(), + chat_identity: None, + connections: vec![], + }, + }; + member.connections.push(tx); + let member_id = member.id; + room.add_member(member).await; + + while let Some(Ok(message)) = rx.next().await { + match message { + Message::Text(text) => { + // TODO: parse a proto message + } + Message::Close(_) => break, + _ => continue, + } + } +} + +#[derive(Deserialize)] +struct CreateRoomPayload { + playlist: Vec, +} + +async fn create_room_handler(Json(body): Json) -> Response { + if body.playlist.is_empty() { + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let room = create_room(body.playlist).await; + let id = room.get().await.id; + + id.to_string().into_response() } #[tokio::main] @@ -44,9 +106,12 @@ async fn main() -> Result<()> { let app = Router::new() .fallback_service(ServeDir::new("public")) - .route("/api/ws", get(ws_handler)); + .route("/api/ws", get(ws_handler)) + .route("/api/create", post(create_room_handler)); let bind_address: SocketAddr = (conf.address, conf.port).into(); + + println!("Listening on http://{} ...", &bind_address); Server::bind(&bind_address) .serve(app.into_make_service()) .await diff --git a/src/rooms.rs b/src/rooms.rs index 606b523..c3aa6f8 100644 --- a/src/rooms.rs +++ b/src/rooms.rs @@ -1,29 +1,95 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, sync::Arc}; -use axum::extract::ws::WebSocket; -use miette::Result; +use axum::extract::ws::{Message, WebSocket}; +use dashmap::DashMap; +use futures::{stream::SplitSink, SinkExt}; use once_cell::sync::Lazy; -use serde::Serialize; +use tokio::sync::{Mutex, MutexGuard}; use uuid::Uuid; use crate::sources::MediaSource; -// TODO: How do we store the active rooms? +#[derive(Clone)] +pub struct RoomRef { + room: Arc>, +} + +impl RoomRef { + fn new(room: Room) -> Self { + RoomRef { + room: Arc::new(Mutex::new(room)), + } + } + + pub async fn get(&self) -> MutexGuard { + self.room.lock().await + } + + pub async fn add_member(&self, member: Member) { + let mut room = self.room.lock().await; + room.members.insert(member.id, member); + } + + pub async fn broadcast(&self, message: Message) { + let mut room = self.room.lock().await; + for member in room.members.values_mut() { + member.send(message.clone()).await + } + } + + pub async fn list_chat_members(&self) -> Vec { + let room = self.room.lock().await; + room.members + .values() + .flat_map(|m| &m.chat_identity) + .cloned() + .collect() + } +} + +static ROOMS: Lazy> = Lazy::new(DashMap::new); + +pub async fn create_room(playlist: Vec) -> RoomRef { + let id = Uuid::new_v4(); + let room = Room { + id, + playlist, + currently_playing: 0, + members: HashMap::new(), + }; + + let room_ref = RoomRef::new(room); + ROOMS.insert(id, room_ref.clone()); + room_ref +} + +pub async fn find_room(id: Uuid) -> Option { + ROOMS.get(&id).as_deref().cloned() +} -#[derive(Serialize)] pub struct Room { pub id: Uuid, pub playlist: Vec, - // TODO: Currently playing information - #[serde(skip)] - pub members: Vec, + pub currently_playing: usize, + pub members: HashMap, +} + +#[derive(Clone)] +pub struct ChatIdentity { + pub nickname: String, + pub color: String, } pub struct Member { pub id: Uuid, - pub nickname: Option, - pub socket: WebSocket, + pub chat_identity: Option, + pub connections: Vec>, +} + +impl Member { + pub async fn send(&mut self, message: Message) { + for conn in self.connections.iter_mut() { + let _ = conn.send(message.clone()).await; + } + } } diff --git a/src/sources.rs b/src/sources.rs index 856f2b0..4548742 100644 --- a/src/sources.rs +++ b/src/sources.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use uuid::Uuid; #[derive(Serialize, Deserialize)] pub struct DirectMediaSource {