diff --git a/frontend/main.js b/frontend/main.js index 6e3acfe..91f2964 100644 --- a/frontend/main.js +++ b/frontend/main.js @@ -32,6 +32,9 @@ const createVideoElement = (videoUrl, subtitles) => { return video; } +let outgoingDebounce = false; +let outgoingDebounceCallbackId = null; + /** * @param {WebSocket} socket * @param {HTMLVideoElement} video @@ -50,6 +53,8 @@ const setupSocketEvents = (socket, video) => { const event = JSON.parse(messageEvent.data); console.log(event); + outgoingDebounce = true; + switch (event.op) { case "SetPlaying": if (event.data.playing) { @@ -68,6 +73,70 @@ const setupSocketEvents = (socket, video) => { } } catch (_err) { } + + if (outgoingDebounceCallbackId) { + cancelIdleCallback(outgoingDebounceCallbackId); + outgoingDebounceCallbackId = null; + } + + outgoingDebounceCallbackId = setTimeout(() => { + outgoingDebounce = false; + }, 500); + }); +} + +/** + * @param {string} sessionId + * @param {HTMLVideoElement} video + * @param {WebSocket} socket + */ +const setupVideoEvents = (sessionId, video, socket) => { + const currentVideoTime = () => (video.currentTime * 1000) | 0; + + video.addEventListener("pause", async event => { + if (outgoingDebounce) { + return; + } + + socket.send(JSON.stringify({ + "op": "SetPlaying", + "data": { + "playing": false, + "time": currentVideoTime(), + } + })); + }); + + video.addEventListener("play", event => { + if (outgoingDebounce) { + return; + } + + socket.send(JSON.stringify({ + "op": "SetPlaying", + "data": { + "playing": true, + "time": currentVideoTime(), + } + })); + }); + + let firstSeekComplete = false; + video.addEventListener("seeked", async event => { + if (!firstSeekComplete) { + // The first seeked event is performed by the browser when the video is loading + firstSeekComplete = true; + return; + } + + if (outgoingDebounce) { + return; + } + + socket.send(JSON.stringify({ + "op": "SetTime", + "data": currentVideoTime(), + })); }); } @@ -94,44 +163,8 @@ const setupVideo = async (sessionId, videoUrl, subtitles, currentTime, playing, // Auto-play is probably disabled, we should uhhhhhhh do something about it } - video.addEventListener("pause", async event => { - await fetch(`/sess/${sessionId}/playing`, { - method: "PUT", - body: JSON.stringify(false), - headers: { - "Content-Type": "application/json" - }, - }); - }); - - video.addEventListener("play", async event => { - await fetch(`/sess/${sessionId}/playing`, { - method: "PUT", - body: JSON.stringify(true), - headers: { - "Content-Type": "application/json" - }, - }); - }); - - let firstSeekComplete = false; - video.addEventListener("seeked", async event => { - if (!firstSeekComplete) { - // The first seeked event is performed by the browser when the video is loading - firstSeekComplete = true; - return; - } - - await fetch(`/sess/${sessionId}/current_time`, { - method: "PUT", - body: JSON.stringify((video.currentTime * 1000) | 0), - headers: { - "Content-Type": "application/json" - }, - }); - }); - setupSocketEvents(socket, video); + setupVideoEvents(sessionId, video, socket); } /** @param {string} sessionId */ diff --git a/src/events.rs b/src/events.rs index ad3de53..13eabe4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,69 +1,8 @@ -use once_cell::sync::Lazy; -use std::{ - collections::HashMap, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use futures::{SinkExt, StreamExt, TryFutureExt}; -use tokio::sync::{ - mpsc::{self, UnboundedSender}, - RwLock, -}; -use tokio_stream::wrappers::UnboundedReceiverStream; - use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use warp::ws::{Message, WebSocket}; - -static CONNECTED_VIEWERS: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); -static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); - #[derive(Clone, Serialize, Deserialize)] #[serde(tag = "op", content = "data")] pub enum WatchEvent { SetPlaying { playing: bool, time: u64 }, SetTime(u64), } - -pub struct ConnectedViewer { - pub session_uuid: Uuid, - pub tx: UnboundedSender, -} - -pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) { - let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); - let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); - - let (tx, rx) = mpsc::unbounded_channel::(); - let mut rx = UnboundedReceiverStream::new(rx); - - tokio::task::spawn(async move { - while let Some(event) = rx.next().await { - viewer_ws_tx - .send(Message::text( - serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), - )) - .unwrap_or_else(|e| eprintln!("ws send error: {}", e)) - .await; - } - }); - - CONNECTED_VIEWERS - .write() - .await - .insert(viewer_id, ConnectedViewer { session_uuid, tx }); - while let Some(Ok(_)) = viewer_ws_rx.next().await {} - CONNECTED_VIEWERS.write().await.remove(&viewer_id); -} - -pub async fn ws_publish(session_uuid: Uuid, event: WatchEvent) { - for viewer in CONNECTED_VIEWERS.read().await.values() { - if viewer.session_uuid != session_uuid { - continue; - } - - let _ = viewer.tx.send(event.clone()); - } -} diff --git a/src/main.rs b/src/main.rs index c887edd..b130574 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,22 @@ -use std::{collections::HashMap, net::IpAddr, sync::Mutex}; - -use once_cell::sync::Lazy; use serde_json::json; +use std::net::IpAddr; use uuid::Uuid; use warb::{hyper::StatusCode, Filter, Reply}; use warp as warb; // i think it's funny mod events; +mod viewer_connection; mod watch_session; use serde::Deserialize; use crate::{ - events::{ws_publish, ws_subscribe, WatchEvent}, - watch_session::{SubtitleTrack, WatchSession}, + events::WatchEvent, + viewer_connection::{ws_publish, ws_subscribe}, + watch_session::{get_session, handle_watch_event, SubtitleTrack, WatchSession, SESSIONS}, }; -static SESSIONS: Lazy>> = - Lazy::new(|| Mutex::new(HashMap::new())); - #[derive(Deserialize)] struct StartSessionBody { pub video_url: String, @@ -52,14 +49,14 @@ async fn main() { .and(warb::path::param::()) .map(|session_id: String| { if let Ok(uuid) = Uuid::parse_str(&session_id) { - if let Some(session) = SESSIONS.lock().unwrap().get(&uuid) { - RequestedSession::Session(uuid, session.clone()) - } else { - RequestedSession::Error(warb::reply::with_status( - warb::reply::json(&json!({ "error": "session does not exist" })), - StatusCode::NOT_FOUND, - )) - } + get_session(uuid) + .map(|sess| RequestedSession::Session(uuid, sess)) + .unwrap_or_else(|| { + RequestedSession::Error(warb::reply::with_status( + warb::reply::json(&json!({ "error": "session does not exist" })), + StatusCode::NOT_FOUND, + )) + }) } else { RequestedSession::Error(warb::reply::with_status( warb::reply::json(&json!({ "error": "invalid session UUID" })), @@ -83,13 +80,13 @@ async fn main() { .and(warb::body::json()) .map(|requested_session, playing: bool| match requested_session { RequestedSession::Session(uuid, mut sess) => { - sess.set_playing(playing); - let time = sess.get_time_ms(); - SESSIONS.lock().unwrap().insert(uuid, sess.clone()); + let event = WatchEvent::SetPlaying { + playing, + time: sess.get_time_ms(), + }; - tokio::spawn(async move { - ws_publish(uuid, WatchEvent::SetPlaying { playing, time }).await - }); + handle_watch_event(uuid, &mut sess, event.clone()); + tokio::spawn(ws_publish(uuid, None, event)); warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) } @@ -103,12 +100,10 @@ async fn main() { .map( |requested_session, current_time_ms: u64| match requested_session { RequestedSession::Session(uuid, mut sess) => { - sess.set_time_ms(current_time_ms); - SESSIONS.lock().unwrap().insert(uuid, sess.clone()); + let event = WatchEvent::SetTime(current_time_ms); - tokio::spawn(async move { - ws_publish(uuid, WatchEvent::SetTime(current_time_ms)).await - }); + handle_watch_event(uuid, &mut sess, event.clone()); + tokio::spawn(ws_publish(uuid, None, event)); warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) } diff --git a/src/viewer_connection.rs b/src/viewer_connection.rs new file mode 100644 index 0000000..385eeb5 --- /dev/null +++ b/src/viewer_connection.rs @@ -0,0 +1,93 @@ +use once_cell::sync::Lazy; +use std::{ + collections::HashMap, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use futures::{SinkExt, StreamExt, TryFutureExt}; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + RwLock, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use uuid::Uuid; +use warp::ws::{Message, WebSocket}; + +use crate::{ + events::WatchEvent, + watch_session::{get_session, handle_watch_event}, +}; + +static CONNECTED_VIEWERS: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); +static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); + +pub struct ConnectedViewer { + pub session: Uuid, + pub viewer_id: usize, + pub tx: UnboundedSender, +} + +pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) { + let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); + let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); + + let (tx, rx) = mpsc::unbounded_channel::(); + let mut rx = UnboundedReceiverStream::new(rx); + + tokio::task::spawn(async move { + while let Some(event) = rx.next().await { + viewer_ws_tx + .send(Message::text( + serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), + )) + .unwrap_or_else(|e| eprintln!("ws send error: {}", e)) + .await; + } + }); + + CONNECTED_VIEWERS.write().await.insert( + viewer_id, + ConnectedViewer { + viewer_id, + session: session_uuid, + tx, + }, + ); + + while let Some(Ok(message)) = viewer_ws_rx.next().await { + let event: WatchEvent = match message + .to_str() + .ok() + .and_then(|s| serde_json::from_str(s).ok()) + { + Some(e) => e, + None => continue, + }; + + handle_watch_event( + session_uuid, + &mut get_session(session_uuid).unwrap(), + event.clone(), + ); + + ws_publish(session_uuid, Some(viewer_id), event).await; + } + + CONNECTED_VIEWERS.write().await.remove(&viewer_id); +} + +pub async fn ws_publish(session_uuid: Uuid, viewer_id: Option, event: WatchEvent) { + for viewer in CONNECTED_VIEWERS.read().await.values() { + if viewer_id == Some(viewer.viewer_id) { + continue; + } + + if viewer.session != session_uuid { + continue; + } + + let _ = viewer.tx.send(event.clone()); + } +} diff --git a/src/watch_session.rs b/src/watch_session.rs index 7c228ba..5579f6a 100644 --- a/src/watch_session.rs +++ b/src/watch_session.rs @@ -1,5 +1,9 @@ +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use std::time::Instant; +use std::{collections::HashMap, sync::Mutex, time::Instant}; +use uuid::Uuid; + +use crate::events::WatchEvent; #[derive(Serialize, Deserialize, Clone)] pub struct SubtitleTrack { @@ -15,7 +19,6 @@ pub struct WatchSession { is_playing: bool, playing_from_timestamp: u64, playing_from_instant: Instant, - // TODO: How do we keep track of the current playing time ? } #[derive(Serialize)] @@ -59,8 +62,29 @@ impl WatchSession { self.playing_from_instant = Instant::now(); } - pub fn set_playing(&mut self, playing: bool) { - self.set_time_ms(self.get_time_ms()); + pub fn set_playing(&mut self, playing: bool, time_ms: u64) { + self.set_time_ms(time_ms); self.is_playing = playing; } } + +pub static SESSIONS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +pub fn get_session(uuid: Uuid) -> Option { + SESSIONS.lock().unwrap().get(&uuid).cloned() +} + +pub fn handle_watch_event(uuid: Uuid, watch_session: &mut WatchSession, event: WatchEvent) { + match event { + WatchEvent::SetPlaying { playing, time } => { + watch_session.set_playing(playing, time); + } + + WatchEvent::SetTime(time) => { + watch_session.set_time_ms(time); + } + }; + + let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone()); +}