From f3ee2ecc8366dad68050a26445053395ce51dc51 Mon Sep 17 00:00:00 2001 From: easrng Date: Wed, 1 Feb 2023 20:29:47 -0500 Subject: [PATCH] fuck it, converting the rest to unix line endings --- src/events.rs | 104 ++++++------- src/main.rs | 264 ++++++++++++++++----------------- src/viewer_connection.rs | 312 +++++++++++++++++++-------------------- src/watch_session.rs | 192 ++++++++++++------------ 4 files changed, 436 insertions(+), 436 deletions(-) diff --git a/src/events.rs b/src/events.rs index 8dbc5f6..8509711 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,52 +1,52 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Serialize, Deserialize)] -pub struct Viewer { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub nickname: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub colour: Option, -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(tag = "op", content = "data")] -pub enum WatchEventData { - SetPlaying { - playing: bool, - time: u64, - }, - SetTime { - #[serde(default, skip_serializing_if = "Option::is_none")] - from: Option, - to: u64, - }, - - UserJoin, - UserLeave, - ChatMessage(String), - Ping(String), - UpdateViewerList(Vec), -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct WatchEvent { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub user: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub colour: Option, - #[serde(flatten)] - pub data: WatchEventData, - #[serde(default)] - pub reflected: bool, -} - -impl WatchEvent { - pub fn new(user: String, colour: String, data: WatchEventData) -> Self { - WatchEvent { - user: Some(user), - colour: Some(colour), - data, - reflected: false, - } - } -} +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +pub struct Viewer { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub nickname: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub colour: Option, +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(tag = "op", content = "data")] +pub enum WatchEventData { + SetPlaying { + playing: bool, + time: u64, + }, + SetTime { + #[serde(default, skip_serializing_if = "Option::is_none")] + from: Option, + to: u64, + }, + + UserJoin, + UserLeave, + ChatMessage(String), + Ping(String), + UpdateViewerList(Vec), +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct WatchEvent { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub colour: Option, + #[serde(flatten)] + pub data: WatchEventData, + #[serde(default)] + pub reflected: bool, +} + +impl WatchEvent { + pub fn new(user: String, colour: String, data: WatchEventData) -> Self { + WatchEvent { + user: Some(user), + colour: Some(colour), + data, + reflected: false, + } + } +} diff --git a/src/main.rs b/src/main.rs index 9d29e35..1a914f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,132 +1,132 @@ -use serde_json::json; -use std::net::IpAddr; -use uuid::Uuid; - -use warb::{hyper::StatusCode, Filter, Reply}; -use warp as warb; // i think it's funny - -mod events; -mod utils; -mod viewer_connection; -mod watch_session; - -use serde::Deserialize; - -use crate::{ - viewer_connection::ws_subscribe, - watch_session::{get_session, SubtitleTrack, WatchSession, SESSIONS}, -}; - -#[derive(Deserialize)] -struct StartSessionBody { - video_url: String, - #[serde(default = "Vec::new")] - subtitle_tracks: Vec, -} - -#[derive(Deserialize)] -struct SubscribeQuery { - nickname: String, - colour: String, -} - -async fn get_emoji_list() -> Result { - use tokio_stream::{wrappers::ReadDirStream, StreamExt}; - - let dir = tokio::fs::read_dir("frontend/emojis") - .await - .expect("Couldn't read emojis directory!"); - - let files = ReadDirStream::new(dir) - .filter_map(|r| r.ok()) - .map(|e| e.file_name().to_string_lossy().to_string()) - .collect::>() - .await; - - Ok(warb::reply::json(&files)) -} - -#[tokio::main] -async fn main() { - let start_session_route = warb::path!("start_session") - .and(warb::path::end()) - .and(warb::post()) - .and(warb::body::json()) - .map(|body: StartSessionBody| { - let mut sessions = SESSIONS.lock().unwrap(); - let session_uuid = Uuid::new_v4(); - let session = WatchSession::new(body.video_url, body.subtitle_tracks); - let session_view = session.view(); - sessions.insert(session_uuid, session); - - warb::reply::json(&json!({ "id": session_uuid.to_string(), "session": session_view })) - }); - - let get_emoji_route = warb::path!("emojos").and_then(get_emoji_list); - - enum RequestedSession { - Session(Uuid, WatchSession), - Error(warb::reply::WithStatus), - } - - let get_running_session = warb::path::path("sess") - .and(warb::path::param::()) - .map(|session_id: String| { - if let Ok(uuid) = Uuid::parse_str(&session_id) { - get_session(uuid) - .map(|sess| RequestedSession::Session(uuid, sess)) - .unwrap_or_else(|| { - RequestedSession::Error(warb::reply::with_status( - warb::reply::json(&json!({ "error": "session does not exist" })), - StatusCode::NOT_FOUND, - )) - }) - } else { - RequestedSession::Error(warb::reply::with_status( - warb::reply::json(&json!({ "error": "invalid session UUID" })), - StatusCode::BAD_REQUEST, - )) - } - }); - - let get_status_route = get_running_session - .and(warb::path::end()) - .map(|requested_session| match requested_session { - RequestedSession::Session(_, sess) => { - warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) - } - RequestedSession::Error(e) => e, - }); - - let ws_subscribe_route = get_running_session - .and(warb::path!("subscribe")) - .and(warb::query()) - .and(warb::ws()) - .map( - |requested_session, query: SubscribeQuery, ws: warb::ws::Ws| match requested_session { - RequestedSession::Session(uuid, _) => ws - .on_upgrade(move |ws| ws_subscribe(uuid, query.nickname, query.colour, ws)) - .into_response(), - RequestedSession::Error(error_response) => error_response.into_response(), - }, - ); - - let routes = start_session_route - .or(get_status_route) - .or(ws_subscribe_route) - .or(get_emoji_route) - .or(warb::path::end().and(warb::fs::file("frontend/index.html"))) - .or(warb::fs::dir("frontend")); - - let ip = std::env::var("IP") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or_else(|| [127, 0, 0, 1].into()); - let port = std::env::var("PORT") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(3000); - - println!("Listening at http://{}:{} ...", &ip, &port); - warb::serve(routes).run((ip, port)).await; -} +use serde_json::json; +use std::net::IpAddr; +use uuid::Uuid; + +use warb::{hyper::StatusCode, Filter, Reply}; +use warp as warb; // i think it's funny + +mod events; +mod utils; +mod viewer_connection; +mod watch_session; + +use serde::Deserialize; + +use crate::{ + viewer_connection::ws_subscribe, + watch_session::{get_session, SubtitleTrack, WatchSession, SESSIONS}, +}; + +#[derive(Deserialize)] +struct StartSessionBody { + video_url: String, + #[serde(default = "Vec::new")] + subtitle_tracks: Vec, +} + +#[derive(Deserialize)] +struct SubscribeQuery { + nickname: String, + colour: String, +} + +async fn get_emoji_list() -> Result { + use tokio_stream::{wrappers::ReadDirStream, StreamExt}; + + let dir = tokio::fs::read_dir("frontend/emojis") + .await + .expect("Couldn't read emojis directory!"); + + let files = ReadDirStream::new(dir) + .filter_map(|r| r.ok()) + .map(|e| e.file_name().to_string_lossy().to_string()) + .collect::>() + .await; + + Ok(warb::reply::json(&files)) +} + +#[tokio::main] +async fn main() { + let start_session_route = warb::path!("start_session") + .and(warb::path::end()) + .and(warb::post()) + .and(warb::body::json()) + .map(|body: StartSessionBody| { + let mut sessions = SESSIONS.lock().unwrap(); + let session_uuid = Uuid::new_v4(); + let session = WatchSession::new(body.video_url, body.subtitle_tracks); + let session_view = session.view(); + sessions.insert(session_uuid, session); + + warb::reply::json(&json!({ "id": session_uuid.to_string(), "session": session_view })) + }); + + let get_emoji_route = warb::path!("emojos").and_then(get_emoji_list); + + enum RequestedSession { + Session(Uuid, WatchSession), + Error(warb::reply::WithStatus), + } + + let get_running_session = warb::path::path("sess") + .and(warb::path::param::()) + .map(|session_id: String| { + if let Ok(uuid) = Uuid::parse_str(&session_id) { + get_session(uuid) + .map(|sess| RequestedSession::Session(uuid, sess)) + .unwrap_or_else(|| { + RequestedSession::Error(warb::reply::with_status( + warb::reply::json(&json!({ "error": "session does not exist" })), + StatusCode::NOT_FOUND, + )) + }) + } else { + RequestedSession::Error(warb::reply::with_status( + warb::reply::json(&json!({ "error": "invalid session UUID" })), + StatusCode::BAD_REQUEST, + )) + } + }); + + let get_status_route = get_running_session + .and(warb::path::end()) + .map(|requested_session| match requested_session { + RequestedSession::Session(_, sess) => { + warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) + } + RequestedSession::Error(e) => e, + }); + + let ws_subscribe_route = get_running_session + .and(warb::path!("subscribe")) + .and(warb::query()) + .and(warb::ws()) + .map( + |requested_session, query: SubscribeQuery, ws: warb::ws::Ws| match requested_session { + RequestedSession::Session(uuid, _) => ws + .on_upgrade(move |ws| ws_subscribe(uuid, query.nickname, query.colour, ws)) + .into_response(), + RequestedSession::Error(error_response) => error_response.into_response(), + }, + ); + + let routes = start_session_route + .or(get_status_route) + .or(ws_subscribe_route) + .or(get_emoji_route) + .or(warb::path::end().and(warb::fs::file("frontend/index.html"))) + .or(warb::fs::dir("frontend")); + + let ip = std::env::var("IP") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| [127, 0, 0, 1].into()); + let port = std::env::var("PORT") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(3000); + + println!("Listening at http://{}:{} ...", &ip, &port); + warb::serve(routes).run((ip, port)).await; +} diff --git a/src/viewer_connection.rs b/src/viewer_connection.rs index ea38161..45f6852 100644 --- a/src/viewer_connection.rs +++ b/src/viewer_connection.rs @@ -1,156 +1,156 @@ -use once_cell::sync::Lazy; -use std::{ - collections::HashMap, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use futures::{SinkExt, StreamExt, TryFutureExt}; -use tokio::sync::{ - mpsc::{self, UnboundedSender}, - RwLock, -}; -use tokio_stream::wrappers::UnboundedReceiverStream; - -use uuid::Uuid; -use warp::ws::{Message, WebSocket}; - -use crate::{ - events::{Viewer, WatchEvent, WatchEventData}, - utils::truncate_str, - watch_session::{get_session, handle_watch_event_data}, -}; - -static CONNECTED_VIEWERS: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); -static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); - -pub struct ConnectedViewer { - pub session: Uuid, - pub viewer_id: usize, - pub tx: UnboundedSender, - pub nickname: Option, - pub colour: Option, -} - -pub async fn ws_subscribe(session_uuid: Uuid, nickname: String, colour: String, ws: WebSocket) { - let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); - let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); - - let (tx, rx) = mpsc::unbounded_channel::(); - let mut rx = UnboundedReceiverStream::new(rx); - - tokio::task::spawn(async move { - while let Some(event) = rx.next().await { - viewer_ws_tx - .send(Message::text( - serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), - )) - .unwrap_or_else(|e| eprintln!("ws send error: {}", e)) - .await; - } - }); - - let mut colour = colour; - if colour.len() != 6 || !colour.chars().all(|x| x.is_ascii_hexdigit()) { - colour = String::from("7ed0ff"); - } - let nickname = truncate_str(&nickname, 50).to_string(); - - CONNECTED_VIEWERS.write().await.insert( - viewer_id, - ConnectedViewer { - viewer_id, - session: session_uuid, - tx, - nickname: Some(nickname.clone()), - colour: Some(colour.clone()), - }, - ); - - ws_publish( - session_uuid, - None, - WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserJoin), - ) - .await; - - update_viewer_list(session_uuid).await; - - while let Some(Ok(message)) = viewer_ws_rx.next().await { - let event: WatchEventData = match message - .to_str() - .ok() - .and_then(|s| serde_json::from_str(s).ok()) - { - Some(e) => e, - None => continue, - }; - - let session = &mut get_session(session_uuid).unwrap(); - - // server side event modification where neccessary - let event: WatchEventData = match event { - WatchEventData::SetTime { from: _, to } => WatchEventData::SetTime { - from: Some(session.get_time_ms()), - to, - }, - _ => event, - }; - - handle_watch_event_data(session_uuid, session, event.clone()); - - ws_publish( - session_uuid, - Some(viewer_id), - WatchEvent::new(nickname.clone(), colour.clone(), event), - ) - .await; - } - - ws_publish( - session_uuid, - None, - WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserLeave), - ) - .await; - - CONNECTED_VIEWERS.write().await.remove(&viewer_id); - update_viewer_list(session_uuid).await; -} - -pub async fn ws_publish(session_uuid: Uuid, skip_viewer_id: Option, event: WatchEvent) { - for viewer in CONNECTED_VIEWERS.read().await.values() { - if viewer.session != session_uuid { - continue; - } - - let _ = viewer.tx.send(WatchEvent { - reflected: skip_viewer_id == Some(viewer.viewer_id), - ..event.clone() - }); - } -} - -async fn update_viewer_list(session_uuid: Uuid) { - let mut viewers = Vec::new(); - - for viewer in CONNECTED_VIEWERS.read().await.values() { - if viewer.session == session_uuid { - viewers.push(Viewer { - nickname: viewer.nickname.clone(), - colour: viewer.colour.clone(), - }) - } - } - - ws_publish( - session_uuid, - None, - WatchEvent::new( - String::from("server"), - String::from(""), - WatchEventData::UpdateViewerList(viewers), - ), - ) - .await; -} +use once_cell::sync::Lazy; +use std::{ + collections::HashMap, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use futures::{SinkExt, StreamExt, TryFutureExt}; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + RwLock, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use uuid::Uuid; +use warp::ws::{Message, WebSocket}; + +use crate::{ + events::{Viewer, WatchEvent, WatchEventData}, + utils::truncate_str, + watch_session::{get_session, handle_watch_event_data}, +}; + +static CONNECTED_VIEWERS: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); +static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); + +pub struct ConnectedViewer { + pub session: Uuid, + pub viewer_id: usize, + pub tx: UnboundedSender, + pub nickname: Option, + pub colour: Option, +} + +pub async fn ws_subscribe(session_uuid: Uuid, nickname: String, colour: String, ws: WebSocket) { + let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); + let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); + + let (tx, rx) = mpsc::unbounded_channel::(); + let mut rx = UnboundedReceiverStream::new(rx); + + tokio::task::spawn(async move { + while let Some(event) = rx.next().await { + viewer_ws_tx + .send(Message::text( + serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), + )) + .unwrap_or_else(|e| eprintln!("ws send error: {}", e)) + .await; + } + }); + + let mut colour = colour; + if colour.len() != 6 || !colour.chars().all(|x| x.is_ascii_hexdigit()) { + colour = String::from("7ed0ff"); + } + let nickname = truncate_str(&nickname, 50).to_string(); + + CONNECTED_VIEWERS.write().await.insert( + viewer_id, + ConnectedViewer { + viewer_id, + session: session_uuid, + tx, + nickname: Some(nickname.clone()), + colour: Some(colour.clone()), + }, + ); + + ws_publish( + session_uuid, + None, + WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserJoin), + ) + .await; + + update_viewer_list(session_uuid).await; + + while let Some(Ok(message)) = viewer_ws_rx.next().await { + let event: WatchEventData = match message + .to_str() + .ok() + .and_then(|s| serde_json::from_str(s).ok()) + { + Some(e) => e, + None => continue, + }; + + let session = &mut get_session(session_uuid).unwrap(); + + // server side event modification where neccessary + let event: WatchEventData = match event { + WatchEventData::SetTime { from: _, to } => WatchEventData::SetTime { + from: Some(session.get_time_ms()), + to, + }, + _ => event, + }; + + handle_watch_event_data(session_uuid, session, event.clone()); + + ws_publish( + session_uuid, + Some(viewer_id), + WatchEvent::new(nickname.clone(), colour.clone(), event), + ) + .await; + } + + ws_publish( + session_uuid, + None, + WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserLeave), + ) + .await; + + CONNECTED_VIEWERS.write().await.remove(&viewer_id); + update_viewer_list(session_uuid).await; +} + +pub async fn ws_publish(session_uuid: Uuid, skip_viewer_id: Option, event: WatchEvent) { + for viewer in CONNECTED_VIEWERS.read().await.values() { + if viewer.session != session_uuid { + continue; + } + + let _ = viewer.tx.send(WatchEvent { + reflected: skip_viewer_id == Some(viewer.viewer_id), + ..event.clone() + }); + } +} + +async fn update_viewer_list(session_uuid: Uuid) { + let mut viewers = Vec::new(); + + for viewer in CONNECTED_VIEWERS.read().await.values() { + if viewer.session == session_uuid { + viewers.push(Viewer { + nickname: viewer.nickname.clone(), + colour: viewer.colour.clone(), + }) + } + } + + ws_publish( + session_uuid, + None, + WatchEvent::new( + String::from("server"), + String::from(""), + WatchEventData::UpdateViewerList(viewers), + ), + ) + .await; +} diff --git a/src/watch_session.rs b/src/watch_session.rs index cda42c5..6279b72 100644 --- a/src/watch_session.rs +++ b/src/watch_session.rs @@ -1,96 +1,96 @@ -use once_cell::sync::Lazy; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Mutex, time::Instant}; -use uuid::Uuid; - -use crate::events::WatchEventData; - -#[derive(Serialize, Deserialize, Clone)] -pub struct SubtitleTrack { - pub url: String, - pub name: String, -} - -#[derive(Clone)] -pub struct WatchSession { - pub video_url: String, - pub subtitle_tracks: Vec, - - is_playing: bool, - playing_from_timestamp: u64, - playing_from_instant: Instant, -} - -#[derive(Serialize)] -pub struct WatchSessionView { - pub video_url: String, - pub subtitle_tracks: Vec, - pub current_time_ms: u64, - pub is_playing: bool, -} - -impl WatchSession { - pub fn new(video_url: String, subtitle_tracks: Vec) -> Self { - WatchSession { - video_url, - subtitle_tracks, - is_playing: false, - playing_from_timestamp: 0, - playing_from_instant: Instant::now(), - } - } - - pub fn view(&self) -> WatchSessionView { - WatchSessionView { - video_url: self.video_url.clone(), - subtitle_tracks: self.subtitle_tracks.clone(), - current_time_ms: self.get_time_ms() as u64, - is_playing: self.is_playing, - } - } - - pub fn get_time_ms(&self) -> u64 { - if !self.is_playing { - return self.playing_from_timestamp; - } - - self.playing_from_timestamp + self.playing_from_instant.elapsed().as_millis() as u64 - } - - pub fn set_time_ms(&mut self, time_ms: u64) { - self.playing_from_timestamp = time_ms; - self.playing_from_instant = Instant::now(); - } - - pub fn set_playing(&mut self, playing: bool, time_ms: u64) { - self.set_time_ms(time_ms); - self.is_playing = playing; - } -} - -pub static SESSIONS: Lazy>> = - Lazy::new(|| Mutex::new(HashMap::new())); - -pub fn get_session(uuid: Uuid) -> Option { - SESSIONS.lock().unwrap().get(&uuid).cloned() -} - -pub fn handle_watch_event_data( - uuid: Uuid, - watch_session: &mut WatchSession, - event: WatchEventData, -) { - match event { - WatchEventData::SetPlaying { playing, time } => { - watch_session.set_playing(playing, time); - } - - WatchEventData::SetTime { from: _, to } => { - watch_session.set_time_ms(to); - } - - _ => {} - }; - - let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone()); -} +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Mutex, time::Instant}; +use uuid::Uuid; + +use crate::events::WatchEventData; + +#[derive(Serialize, Deserialize, Clone)] +pub struct SubtitleTrack { + pub url: String, + pub name: String, +} + +#[derive(Clone)] +pub struct WatchSession { + pub video_url: String, + pub subtitle_tracks: Vec, + + is_playing: bool, + playing_from_timestamp: u64, + playing_from_instant: Instant, +} + +#[derive(Serialize)] +pub struct WatchSessionView { + pub video_url: String, + pub subtitle_tracks: Vec, + pub current_time_ms: u64, + pub is_playing: bool, +} + +impl WatchSession { + pub fn new(video_url: String, subtitle_tracks: Vec) -> Self { + WatchSession { + video_url, + subtitle_tracks, + is_playing: false, + playing_from_timestamp: 0, + playing_from_instant: Instant::now(), + } + } + + pub fn view(&self) -> WatchSessionView { + WatchSessionView { + video_url: self.video_url.clone(), + subtitle_tracks: self.subtitle_tracks.clone(), + current_time_ms: self.get_time_ms() as u64, + is_playing: self.is_playing, + } + } + + pub fn get_time_ms(&self) -> u64 { + if !self.is_playing { + return self.playing_from_timestamp; + } + + self.playing_from_timestamp + self.playing_from_instant.elapsed().as_millis() as u64 + } + + pub fn set_time_ms(&mut self, time_ms: u64) { + self.playing_from_timestamp = time_ms; + self.playing_from_instant = Instant::now(); + } + + pub fn set_playing(&mut self, playing: bool, time_ms: u64) { + self.set_time_ms(time_ms); + self.is_playing = playing; + } +} + +pub static SESSIONS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +pub fn get_session(uuid: Uuid) -> Option { + SESSIONS.lock().unwrap().get(&uuid).cloned() +} + +pub fn handle_watch_event_data( + uuid: Uuid, + watch_session: &mut WatchSession, + event: WatchEventData, +) { + match event { + WatchEventData::SetPlaying { playing, time } => { + watch_session.set_playing(playing, time); + } + + WatchEventData::SetTime { from: _, to } => { + watch_session.set_time_ms(to); + } + + _ => {} + }; + + let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone()); +}