From 7796d8e5f03de077344a06e735379962a92df262 Mon Sep 17 00:00:00 2001 From: videogame hacker Date: Mon, 25 Oct 2021 01:51:26 +0100 Subject: [PATCH] Use WebSockets to communicate with the server instead of HTTP verbs This means that we can filter out our own events and have an outgoing debounce set up when we receive an incoming event. Hopefully this means that we stop seeing repeat events when there is more latency involved. --- frontend/main.js | 107 +++++++++++++++++++++++++-------------- src/events.rs | 61 ---------------------- src/main.rs | 49 ++++++++---------- src/viewer_connection.rs | 93 ++++++++++++++++++++++++++++++++++ src/watch_session.rs | 32 ++++++++++-- 5 files changed, 213 insertions(+), 129 deletions(-) create mode 100644 src/viewer_connection.rs diff --git a/frontend/main.js b/frontend/main.js index 6e3acfe..91f2964 100644 --- a/frontend/main.js +++ b/frontend/main.js @@ -32,6 +32,9 @@ const createVideoElement = (videoUrl, subtitles) => { return video; } +let outgoingDebounce = false; +let outgoingDebounceCallbackId = null; + /** * @param {WebSocket} socket * @param {HTMLVideoElement} video @@ -50,6 +53,8 @@ const setupSocketEvents = (socket, video) => { const event = JSON.parse(messageEvent.data); console.log(event); + outgoingDebounce = true; + switch (event.op) { case "SetPlaying": if (event.data.playing) { @@ -68,6 +73,70 @@ const setupSocketEvents = (socket, video) => { } } catch (_err) { } + + if (outgoingDebounceCallbackId) { + cancelIdleCallback(outgoingDebounceCallbackId); + outgoingDebounceCallbackId = null; + } + + outgoingDebounceCallbackId = setTimeout(() => { + outgoingDebounce = false; + }, 500); + }); +} + +/** + * @param {string} sessionId + * @param {HTMLVideoElement} video + * @param {WebSocket} socket + */ +const setupVideoEvents = (sessionId, video, socket) => { + const currentVideoTime = () => (video.currentTime * 1000) | 0; + + video.addEventListener("pause", async event => { + if (outgoingDebounce) { + return; + } + + socket.send(JSON.stringify({ + "op": "SetPlaying", + "data": { + "playing": false, + "time": currentVideoTime(), + } + })); + }); + + video.addEventListener("play", event => { + if (outgoingDebounce) { + return; + } + + socket.send(JSON.stringify({ + "op": "SetPlaying", + "data": { + "playing": true, + "time": currentVideoTime(), + } + })); + }); + + let firstSeekComplete = false; + video.addEventListener("seeked", async event => { + if (!firstSeekComplete) { + // The first seeked event is performed by the browser when the video is loading + firstSeekComplete = true; + return; + } + + if (outgoingDebounce) { + return; + } + + socket.send(JSON.stringify({ + "op": "SetTime", + "data": currentVideoTime(), + })); }); } @@ -94,44 +163,8 @@ const setupVideo = async (sessionId, videoUrl, subtitles, currentTime, playing, // Auto-play is probably disabled, we should uhhhhhhh do something about it } - video.addEventListener("pause", async event => { - await fetch(`/sess/${sessionId}/playing`, { - method: "PUT", - body: JSON.stringify(false), - headers: { - "Content-Type": "application/json" - }, - }); - }); - - video.addEventListener("play", async event => { - await fetch(`/sess/${sessionId}/playing`, { - method: "PUT", - body: JSON.stringify(true), - headers: { - "Content-Type": "application/json" - }, - }); - }); - - let firstSeekComplete = false; - video.addEventListener("seeked", async event => { - if (!firstSeekComplete) { - // The first seeked event is performed by the browser when the video is loading - firstSeekComplete = true; - return; - } - - await fetch(`/sess/${sessionId}/current_time`, { - method: "PUT", - body: JSON.stringify((video.currentTime * 1000) | 0), - headers: { - "Content-Type": "application/json" - }, - }); - }); - setupSocketEvents(socket, video); + setupVideoEvents(sessionId, video, socket); } /** @param {string} sessionId */ diff --git a/src/events.rs b/src/events.rs index ad3de53..13eabe4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,69 +1,8 @@ -use once_cell::sync::Lazy; -use std::{ - collections::HashMap, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use futures::{SinkExt, StreamExt, TryFutureExt}; -use tokio::sync::{ - mpsc::{self, UnboundedSender}, - RwLock, -}; -use tokio_stream::wrappers::UnboundedReceiverStream; - use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use warp::ws::{Message, WebSocket}; - -static CONNECTED_VIEWERS: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); -static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); - #[derive(Clone, Serialize, Deserialize)] #[serde(tag = "op", content = "data")] pub enum WatchEvent { SetPlaying { playing: bool, time: u64 }, SetTime(u64), } - -pub struct ConnectedViewer { - pub session_uuid: Uuid, - pub tx: UnboundedSender, -} - -pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) { - let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); - let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); - - let (tx, rx) = mpsc::unbounded_channel::(); - let mut rx = UnboundedReceiverStream::new(rx); - - tokio::task::spawn(async move { - while let Some(event) = rx.next().await { - viewer_ws_tx - .send(Message::text( - serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), - )) - .unwrap_or_else(|e| eprintln!("ws send error: {}", e)) - .await; - } - }); - - CONNECTED_VIEWERS - .write() - .await - .insert(viewer_id, ConnectedViewer { session_uuid, tx }); - while let Some(Ok(_)) = viewer_ws_rx.next().await {} - CONNECTED_VIEWERS.write().await.remove(&viewer_id); -} - -pub async fn ws_publish(session_uuid: Uuid, event: WatchEvent) { - for viewer in CONNECTED_VIEWERS.read().await.values() { - if viewer.session_uuid != session_uuid { - continue; - } - - let _ = viewer.tx.send(event.clone()); - } -} diff --git a/src/main.rs b/src/main.rs index c887edd..b130574 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,22 @@ -use std::{collections::HashMap, net::IpAddr, sync::Mutex}; - -use once_cell::sync::Lazy; use serde_json::json; +use std::net::IpAddr; use uuid::Uuid; use warb::{hyper::StatusCode, Filter, Reply}; use warp as warb; // i think it's funny mod events; +mod viewer_connection; mod watch_session; use serde::Deserialize; use crate::{ - events::{ws_publish, ws_subscribe, WatchEvent}, - watch_session::{SubtitleTrack, WatchSession}, + events::WatchEvent, + viewer_connection::{ws_publish, ws_subscribe}, + watch_session::{get_session, handle_watch_event, SubtitleTrack, WatchSession, SESSIONS}, }; -static SESSIONS: Lazy>> = - Lazy::new(|| Mutex::new(HashMap::new())); - #[derive(Deserialize)] struct StartSessionBody { pub video_url: String, @@ -52,14 +49,14 @@ async fn main() { .and(warb::path::param::()) .map(|session_id: String| { if let Ok(uuid) = Uuid::parse_str(&session_id) { - if let Some(session) = SESSIONS.lock().unwrap().get(&uuid) { - RequestedSession::Session(uuid, session.clone()) - } else { - RequestedSession::Error(warb::reply::with_status( - warb::reply::json(&json!({ "error": "session does not exist" })), - StatusCode::NOT_FOUND, - )) - } + get_session(uuid) + .map(|sess| RequestedSession::Session(uuid, sess)) + .unwrap_or_else(|| { + RequestedSession::Error(warb::reply::with_status( + warb::reply::json(&json!({ "error": "session does not exist" })), + StatusCode::NOT_FOUND, + )) + }) } else { RequestedSession::Error(warb::reply::with_status( warb::reply::json(&json!({ "error": "invalid session UUID" })), @@ -83,13 +80,13 @@ async fn main() { .and(warb::body::json()) .map(|requested_session, playing: bool| match requested_session { RequestedSession::Session(uuid, mut sess) => { - sess.set_playing(playing); - let time = sess.get_time_ms(); - SESSIONS.lock().unwrap().insert(uuid, sess.clone()); + let event = WatchEvent::SetPlaying { + playing, + time: sess.get_time_ms(), + }; - tokio::spawn(async move { - ws_publish(uuid, WatchEvent::SetPlaying { playing, time }).await - }); + handle_watch_event(uuid, &mut sess, event.clone()); + tokio::spawn(ws_publish(uuid, None, event)); warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) } @@ -103,12 +100,10 @@ async fn main() { .map( |requested_session, current_time_ms: u64| match requested_session { RequestedSession::Session(uuid, mut sess) => { - sess.set_time_ms(current_time_ms); - SESSIONS.lock().unwrap().insert(uuid, sess.clone()); + let event = WatchEvent::SetTime(current_time_ms); - tokio::spawn(async move { - ws_publish(uuid, WatchEvent::SetTime(current_time_ms)).await - }); + handle_watch_event(uuid, &mut sess, event.clone()); + tokio::spawn(ws_publish(uuid, None, event)); warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) } diff --git a/src/viewer_connection.rs b/src/viewer_connection.rs new file mode 100644 index 0000000..385eeb5 --- /dev/null +++ b/src/viewer_connection.rs @@ -0,0 +1,93 @@ +use once_cell::sync::Lazy; +use std::{ + collections::HashMap, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use futures::{SinkExt, StreamExt, TryFutureExt}; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + RwLock, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use uuid::Uuid; +use warp::ws::{Message, WebSocket}; + +use crate::{ + events::WatchEvent, + watch_session::{get_session, handle_watch_event}, +}; + +static CONNECTED_VIEWERS: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); +static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); + +pub struct ConnectedViewer { + pub session: Uuid, + pub viewer_id: usize, + pub tx: UnboundedSender, +} + +pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) { + let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); + let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); + + let (tx, rx) = mpsc::unbounded_channel::(); + let mut rx = UnboundedReceiverStream::new(rx); + + tokio::task::spawn(async move { + while let Some(event) = rx.next().await { + viewer_ws_tx + .send(Message::text( + serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), + )) + .unwrap_or_else(|e| eprintln!("ws send error: {}", e)) + .await; + } + }); + + CONNECTED_VIEWERS.write().await.insert( + viewer_id, + ConnectedViewer { + viewer_id, + session: session_uuid, + tx, + }, + ); + + while let Some(Ok(message)) = viewer_ws_rx.next().await { + let event: WatchEvent = match message + .to_str() + .ok() + .and_then(|s| serde_json::from_str(s).ok()) + { + Some(e) => e, + None => continue, + }; + + handle_watch_event( + session_uuid, + &mut get_session(session_uuid).unwrap(), + event.clone(), + ); + + ws_publish(session_uuid, Some(viewer_id), event).await; + } + + CONNECTED_VIEWERS.write().await.remove(&viewer_id); +} + +pub async fn ws_publish(session_uuid: Uuid, viewer_id: Option, event: WatchEvent) { + for viewer in CONNECTED_VIEWERS.read().await.values() { + if viewer_id == Some(viewer.viewer_id) { + continue; + } + + if viewer.session != session_uuid { + continue; + } + + let _ = viewer.tx.send(event.clone()); + } +} diff --git a/src/watch_session.rs b/src/watch_session.rs index 7c228ba..5579f6a 100644 --- a/src/watch_session.rs +++ b/src/watch_session.rs @@ -1,5 +1,9 @@ +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use std::time::Instant; +use std::{collections::HashMap, sync::Mutex, time::Instant}; +use uuid::Uuid; + +use crate::events::WatchEvent; #[derive(Serialize, Deserialize, Clone)] pub struct SubtitleTrack { @@ -15,7 +19,6 @@ pub struct WatchSession { is_playing: bool, playing_from_timestamp: u64, playing_from_instant: Instant, - // TODO: How do we keep track of the current playing time ? } #[derive(Serialize)] @@ -59,8 +62,29 @@ impl WatchSession { self.playing_from_instant = Instant::now(); } - pub fn set_playing(&mut self, playing: bool) { - self.set_time_ms(self.get_time_ms()); + pub fn set_playing(&mut self, playing: bool, time_ms: u64) { + self.set_time_ms(time_ms); self.is_playing = playing; } } + +pub static SESSIONS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +pub fn get_session(uuid: Uuid) -> Option { + SESSIONS.lock().unwrap().get(&uuid).cloned() +} + +pub fn handle_watch_event(uuid: Uuid, watch_session: &mut WatchSession, event: WatchEvent) { + match event { + WatchEvent::SetPlaying { playing, time } => { + watch_session.set_playing(playing, time); + } + + WatchEvent::SetTime(time) => { + watch_session.set_time_ms(time); + } + }; + + let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone()); +}