fuck it, converting the rest to unix line endings

main
easrng 2023-02-01 20:29:47 -05:00
parent 1bd7071cec
commit f3ee2ecc83
4 changed files with 436 additions and 436 deletions

View File

@ -1,52 +1,52 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct Viewer { pub struct Viewer {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub nickname: Option<String>, pub nickname: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub colour: Option<String>, pub colour: Option<String>,
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "op", content = "data")] #[serde(tag = "op", content = "data")]
pub enum WatchEventData { pub enum WatchEventData {
SetPlaying { SetPlaying {
playing: bool, playing: bool,
time: u64, time: u64,
}, },
SetTime { SetTime {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
from: Option<u64>, from: Option<u64>,
to: u64, to: u64,
}, },
UserJoin, UserJoin,
UserLeave, UserLeave,
ChatMessage(String), ChatMessage(String),
Ping(String), Ping(String),
UpdateViewerList(Vec<Viewer>), UpdateViewerList(Vec<Viewer>),
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct WatchEvent { pub struct WatchEvent {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub user: Option<String>, pub user: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub colour: Option<String>, pub colour: Option<String>,
#[serde(flatten)] #[serde(flatten)]
pub data: WatchEventData, pub data: WatchEventData,
#[serde(default)] #[serde(default)]
pub reflected: bool, pub reflected: bool,
} }
impl WatchEvent { impl WatchEvent {
pub fn new(user: String, colour: String, data: WatchEventData) -> Self { pub fn new(user: String, colour: String, data: WatchEventData) -> Self {
WatchEvent { WatchEvent {
user: Some(user), user: Some(user),
colour: Some(colour), colour: Some(colour),
data, data,
reflected: false, reflected: false,
} }
} }
} }

View File

@ -1,132 +1,132 @@
use serde_json::json; use serde_json::json;
use std::net::IpAddr; use std::net::IpAddr;
use uuid::Uuid; use uuid::Uuid;
use warb::{hyper::StatusCode, Filter, Reply}; use warb::{hyper::StatusCode, Filter, Reply};
use warp as warb; // i think it's funny use warp as warb; // i think it's funny
mod events; mod events;
mod utils; mod utils;
mod viewer_connection; mod viewer_connection;
mod watch_session; mod watch_session;
use serde::Deserialize; use serde::Deserialize;
use crate::{ use crate::{
viewer_connection::ws_subscribe, viewer_connection::ws_subscribe,
watch_session::{get_session, SubtitleTrack, WatchSession, SESSIONS}, watch_session::{get_session, SubtitleTrack, WatchSession, SESSIONS},
}; };
#[derive(Deserialize)] #[derive(Deserialize)]
struct StartSessionBody { struct StartSessionBody {
video_url: String, video_url: String,
#[serde(default = "Vec::new")] #[serde(default = "Vec::new")]
subtitle_tracks: Vec<SubtitleTrack>, subtitle_tracks: Vec<SubtitleTrack>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct SubscribeQuery { struct SubscribeQuery {
nickname: String, nickname: String,
colour: String, colour: String,
} }
async fn get_emoji_list() -> Result<impl warb::Reply, warb::Rejection> { async fn get_emoji_list() -> Result<impl warb::Reply, warb::Rejection> {
use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use tokio_stream::{wrappers::ReadDirStream, StreamExt};
let dir = tokio::fs::read_dir("frontend/emojis") let dir = tokio::fs::read_dir("frontend/emojis")
.await .await
.expect("Couldn't read emojis directory!"); .expect("Couldn't read emojis directory!");
let files = ReadDirStream::new(dir) let files = ReadDirStream::new(dir)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.map(|e| e.file_name().to_string_lossy().to_string()) .map(|e| e.file_name().to_string_lossy().to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await; .await;
Ok(warb::reply::json(&files)) Ok(warb::reply::json(&files))
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let start_session_route = warb::path!("start_session") let start_session_route = warb::path!("start_session")
.and(warb::path::end()) .and(warb::path::end())
.and(warb::post()) .and(warb::post())
.and(warb::body::json()) .and(warb::body::json())
.map(|body: StartSessionBody| { .map(|body: StartSessionBody| {
let mut sessions = SESSIONS.lock().unwrap(); let mut sessions = SESSIONS.lock().unwrap();
let session_uuid = Uuid::new_v4(); let session_uuid = Uuid::new_v4();
let session = WatchSession::new(body.video_url, body.subtitle_tracks); let session = WatchSession::new(body.video_url, body.subtitle_tracks);
let session_view = session.view(); let session_view = session.view();
sessions.insert(session_uuid, session); sessions.insert(session_uuid, session);
warb::reply::json(&json!({ "id": session_uuid.to_string(), "session": session_view })) warb::reply::json(&json!({ "id": session_uuid.to_string(), "session": session_view }))
}); });
let get_emoji_route = warb::path!("emojos").and_then(get_emoji_list); let get_emoji_route = warb::path!("emojos").and_then(get_emoji_list);
enum RequestedSession { enum RequestedSession {
Session(Uuid, WatchSession), Session(Uuid, WatchSession),
Error(warb::reply::WithStatus<warb::reply::Json>), Error(warb::reply::WithStatus<warb::reply::Json>),
} }
let get_running_session = warb::path::path("sess") let get_running_session = warb::path::path("sess")
.and(warb::path::param::<String>()) .and(warb::path::param::<String>())
.map(|session_id: String| { .map(|session_id: String| {
if let Ok(uuid) = Uuid::parse_str(&session_id) { if let Ok(uuid) = Uuid::parse_str(&session_id) {
get_session(uuid) get_session(uuid)
.map(|sess| RequestedSession::Session(uuid, sess)) .map(|sess| RequestedSession::Session(uuid, sess))
.unwrap_or_else(|| { .unwrap_or_else(|| {
RequestedSession::Error(warb::reply::with_status( RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "session does not exist" })), warb::reply::json(&json!({ "error": "session does not exist" })),
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
)) ))
}) })
} else { } else {
RequestedSession::Error(warb::reply::with_status( RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "invalid session UUID" })), warb::reply::json(&json!({ "error": "invalid session UUID" })),
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
)) ))
} }
}); });
let get_status_route = get_running_session let get_status_route = get_running_session
.and(warb::path::end()) .and(warb::path::end())
.map(|requested_session| match requested_session { .map(|requested_session| match requested_session {
RequestedSession::Session(_, sess) => { RequestedSession::Session(_, sess) => {
warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK) warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK)
} }
RequestedSession::Error(e) => e, RequestedSession::Error(e) => e,
}); });
let ws_subscribe_route = get_running_session let ws_subscribe_route = get_running_session
.and(warb::path!("subscribe")) .and(warb::path!("subscribe"))
.and(warb::query()) .and(warb::query())
.and(warb::ws()) .and(warb::ws())
.map( .map(
|requested_session, query: SubscribeQuery, ws: warb::ws::Ws| match requested_session { |requested_session, query: SubscribeQuery, ws: warb::ws::Ws| match requested_session {
RequestedSession::Session(uuid, _) => ws RequestedSession::Session(uuid, _) => ws
.on_upgrade(move |ws| ws_subscribe(uuid, query.nickname, query.colour, ws)) .on_upgrade(move |ws| ws_subscribe(uuid, query.nickname, query.colour, ws))
.into_response(), .into_response(),
RequestedSession::Error(error_response) => error_response.into_response(), RequestedSession::Error(error_response) => error_response.into_response(),
}, },
); );
let routes = start_session_route let routes = start_session_route
.or(get_status_route) .or(get_status_route)
.or(ws_subscribe_route) .or(ws_subscribe_route)
.or(get_emoji_route) .or(get_emoji_route)
.or(warb::path::end().and(warb::fs::file("frontend/index.html"))) .or(warb::path::end().and(warb::fs::file("frontend/index.html")))
.or(warb::fs::dir("frontend")); .or(warb::fs::dir("frontend"));
let ip = std::env::var("IP") let ip = std::env::var("IP")
.ok() .ok()
.and_then(|s| s.parse::<IpAddr>().ok()) .and_then(|s| s.parse::<IpAddr>().ok())
.unwrap_or_else(|| [127, 0, 0, 1].into()); .unwrap_or_else(|| [127, 0, 0, 1].into());
let port = std::env::var("PORT") let port = std::env::var("PORT")
.ok() .ok()
.and_then(|s| s.parse::<u16>().ok()) .and_then(|s| s.parse::<u16>().ok())
.unwrap_or(3000); .unwrap_or(3000);
println!("Listening at http://{}:{} ...", &ip, &port); println!("Listening at http://{}:{} ...", &ip, &port);
warb::serve(routes).run((ip, port)).await; warb::serve(routes).run((ip, port)).await;
} }

View File

@ -1,156 +1,156 @@
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
}; };
use futures::{SinkExt, StreamExt, TryFutureExt}; use futures::{SinkExt, StreamExt, TryFutureExt};
use tokio::sync::{ use tokio::sync::{
mpsc::{self, UnboundedSender}, mpsc::{self, UnboundedSender},
RwLock, RwLock,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid; use uuid::Uuid;
use warp::ws::{Message, WebSocket}; use warp::ws::{Message, WebSocket};
use crate::{ use crate::{
events::{Viewer, WatchEvent, WatchEventData}, events::{Viewer, WatchEvent, WatchEventData},
utils::truncate_str, utils::truncate_str,
watch_session::{get_session, handle_watch_event_data}, watch_session::{get_session, handle_watch_event_data},
}; };
static CONNECTED_VIEWERS: Lazy<RwLock<HashMap<usize, ConnectedViewer>>> = static CONNECTED_VIEWERS: Lazy<RwLock<HashMap<usize, ConnectedViewer>>> =
Lazy::new(|| RwLock::new(HashMap::new())); Lazy::new(|| RwLock::new(HashMap::new()));
static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1); static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1);
pub struct ConnectedViewer { pub struct ConnectedViewer {
pub session: Uuid, pub session: Uuid,
pub viewer_id: usize, pub viewer_id: usize,
pub tx: UnboundedSender<WatchEvent>, pub tx: UnboundedSender<WatchEvent>,
pub nickname: Option<String>, pub nickname: Option<String>,
pub colour: Option<String>, pub colour: Option<String>,
} }
pub async fn ws_subscribe(session_uuid: Uuid, nickname: String, colour: String, ws: WebSocket) { pub async fn ws_subscribe(session_uuid: Uuid, nickname: String, colour: String, ws: WebSocket) {
let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed); let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed);
let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split(); let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split();
let (tx, rx) = mpsc::unbounded_channel::<WatchEvent>(); let (tx, rx) = mpsc::unbounded_channel::<WatchEvent>();
let mut rx = UnboundedReceiverStream::new(rx); let mut rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(async move { tokio::task::spawn(async move {
while let Some(event) = rx.next().await { while let Some(event) = rx.next().await {
viewer_ws_tx viewer_ws_tx
.send(Message::text( .send(Message::text(
serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"), serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"),
)) ))
.unwrap_or_else(|e| eprintln!("ws send error: {}", e)) .unwrap_or_else(|e| eprintln!("ws send error: {}", e))
.await; .await;
} }
}); });
let mut colour = colour; let mut colour = colour;
if colour.len() != 6 || !colour.chars().all(|x| x.is_ascii_hexdigit()) { if colour.len() != 6 || !colour.chars().all(|x| x.is_ascii_hexdigit()) {
colour = String::from("7ed0ff"); colour = String::from("7ed0ff");
} }
let nickname = truncate_str(&nickname, 50).to_string(); let nickname = truncate_str(&nickname, 50).to_string();
CONNECTED_VIEWERS.write().await.insert( CONNECTED_VIEWERS.write().await.insert(
viewer_id, viewer_id,
ConnectedViewer { ConnectedViewer {
viewer_id, viewer_id,
session: session_uuid, session: session_uuid,
tx, tx,
nickname: Some(nickname.clone()), nickname: Some(nickname.clone()),
colour: Some(colour.clone()), colour: Some(colour.clone()),
}, },
); );
ws_publish( ws_publish(
session_uuid, session_uuid,
None, None,
WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserJoin), WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserJoin),
) )
.await; .await;
update_viewer_list(session_uuid).await; update_viewer_list(session_uuid).await;
while let Some(Ok(message)) = viewer_ws_rx.next().await { while let Some(Ok(message)) = viewer_ws_rx.next().await {
let event: WatchEventData = match message let event: WatchEventData = match message
.to_str() .to_str()
.ok() .ok()
.and_then(|s| serde_json::from_str(s).ok()) .and_then(|s| serde_json::from_str(s).ok())
{ {
Some(e) => e, Some(e) => e,
None => continue, None => continue,
}; };
let session = &mut get_session(session_uuid).unwrap(); let session = &mut get_session(session_uuid).unwrap();
// server side event modification where neccessary // server side event modification where neccessary
let event: WatchEventData = match event { let event: WatchEventData = match event {
WatchEventData::SetTime { from: _, to } => WatchEventData::SetTime { WatchEventData::SetTime { from: _, to } => WatchEventData::SetTime {
from: Some(session.get_time_ms()), from: Some(session.get_time_ms()),
to, to,
}, },
_ => event, _ => event,
}; };
handle_watch_event_data(session_uuid, session, event.clone()); handle_watch_event_data(session_uuid, session, event.clone());
ws_publish( ws_publish(
session_uuid, session_uuid,
Some(viewer_id), Some(viewer_id),
WatchEvent::new(nickname.clone(), colour.clone(), event), WatchEvent::new(nickname.clone(), colour.clone(), event),
) )
.await; .await;
} }
ws_publish( ws_publish(
session_uuid, session_uuid,
None, None,
WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserLeave), WatchEvent::new(nickname.clone(), colour.clone(), WatchEventData::UserLeave),
) )
.await; .await;
CONNECTED_VIEWERS.write().await.remove(&viewer_id); CONNECTED_VIEWERS.write().await.remove(&viewer_id);
update_viewer_list(session_uuid).await; update_viewer_list(session_uuid).await;
} }
pub async fn ws_publish(session_uuid: Uuid, skip_viewer_id: Option<usize>, event: WatchEvent) { pub async fn ws_publish(session_uuid: Uuid, skip_viewer_id: Option<usize>, event: WatchEvent) {
for viewer in CONNECTED_VIEWERS.read().await.values() { for viewer in CONNECTED_VIEWERS.read().await.values() {
if viewer.session != session_uuid { if viewer.session != session_uuid {
continue; continue;
} }
let _ = viewer.tx.send(WatchEvent { let _ = viewer.tx.send(WatchEvent {
reflected: skip_viewer_id == Some(viewer.viewer_id), reflected: skip_viewer_id == Some(viewer.viewer_id),
..event.clone() ..event.clone()
}); });
} }
} }
async fn update_viewer_list(session_uuid: Uuid) { async fn update_viewer_list(session_uuid: Uuid) {
let mut viewers = Vec::new(); let mut viewers = Vec::new();
for viewer in CONNECTED_VIEWERS.read().await.values() { for viewer in CONNECTED_VIEWERS.read().await.values() {
if viewer.session == session_uuid { if viewer.session == session_uuid {
viewers.push(Viewer { viewers.push(Viewer {
nickname: viewer.nickname.clone(), nickname: viewer.nickname.clone(),
colour: viewer.colour.clone(), colour: viewer.colour.clone(),
}) })
} }
} }
ws_publish( ws_publish(
session_uuid, session_uuid,
None, None,
WatchEvent::new( WatchEvent::new(
String::from("server"), String::from("server"),
String::from(""), String::from(""),
WatchEventData::UpdateViewerList(viewers), WatchEventData::UpdateViewerList(viewers),
), ),
) )
.await; .await;
} }

View File

@ -1,96 +1,96 @@
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Mutex, time::Instant}; use std::{collections::HashMap, sync::Mutex, time::Instant};
use uuid::Uuid; use uuid::Uuid;
use crate::events::WatchEventData; use crate::events::WatchEventData;
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
pub struct SubtitleTrack { pub struct SubtitleTrack {
pub url: String, pub url: String,
pub name: String, pub name: String,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct WatchSession { pub struct WatchSession {
pub video_url: String, pub video_url: String,
pub subtitle_tracks: Vec<SubtitleTrack>, pub subtitle_tracks: Vec<SubtitleTrack>,
is_playing: bool, is_playing: bool,
playing_from_timestamp: u64, playing_from_timestamp: u64,
playing_from_instant: Instant, playing_from_instant: Instant,
} }
#[derive(Serialize)] #[derive(Serialize)]
pub struct WatchSessionView { pub struct WatchSessionView {
pub video_url: String, pub video_url: String,
pub subtitle_tracks: Vec<SubtitleTrack>, pub subtitle_tracks: Vec<SubtitleTrack>,
pub current_time_ms: u64, pub current_time_ms: u64,
pub is_playing: bool, pub is_playing: bool,
} }
impl WatchSession { impl WatchSession {
pub fn new(video_url: String, subtitle_tracks: Vec<SubtitleTrack>) -> Self { pub fn new(video_url: String, subtitle_tracks: Vec<SubtitleTrack>) -> Self {
WatchSession { WatchSession {
video_url, video_url,
subtitle_tracks, subtitle_tracks,
is_playing: false, is_playing: false,
playing_from_timestamp: 0, playing_from_timestamp: 0,
playing_from_instant: Instant::now(), playing_from_instant: Instant::now(),
} }
} }
pub fn view(&self) -> WatchSessionView { pub fn view(&self) -> WatchSessionView {
WatchSessionView { WatchSessionView {
video_url: self.video_url.clone(), video_url: self.video_url.clone(),
subtitle_tracks: self.subtitle_tracks.clone(), subtitle_tracks: self.subtitle_tracks.clone(),
current_time_ms: self.get_time_ms() as u64, current_time_ms: self.get_time_ms() as u64,
is_playing: self.is_playing, is_playing: self.is_playing,
} }
} }
pub fn get_time_ms(&self) -> u64 { pub fn get_time_ms(&self) -> u64 {
if !self.is_playing { if !self.is_playing {
return self.playing_from_timestamp; return self.playing_from_timestamp;
} }
self.playing_from_timestamp + self.playing_from_instant.elapsed().as_millis() as u64 self.playing_from_timestamp + self.playing_from_instant.elapsed().as_millis() as u64
} }
pub fn set_time_ms(&mut self, time_ms: u64) { pub fn set_time_ms(&mut self, time_ms: u64) {
self.playing_from_timestamp = time_ms; self.playing_from_timestamp = time_ms;
self.playing_from_instant = Instant::now(); self.playing_from_instant = Instant::now();
} }
pub fn set_playing(&mut self, playing: bool, time_ms: u64) { pub fn set_playing(&mut self, playing: bool, time_ms: u64) {
self.set_time_ms(time_ms); self.set_time_ms(time_ms);
self.is_playing = playing; self.is_playing = playing;
} }
} }
pub static SESSIONS: Lazy<Mutex<HashMap<Uuid, WatchSession>>> = pub static SESSIONS: Lazy<Mutex<HashMap<Uuid, WatchSession>>> =
Lazy::new(|| Mutex::new(HashMap::new())); Lazy::new(|| Mutex::new(HashMap::new()));
pub fn get_session(uuid: Uuid) -> Option<WatchSession> { pub fn get_session(uuid: Uuid) -> Option<WatchSession> {
SESSIONS.lock().unwrap().get(&uuid).cloned() SESSIONS.lock().unwrap().get(&uuid).cloned()
} }
pub fn handle_watch_event_data( pub fn handle_watch_event_data(
uuid: Uuid, uuid: Uuid,
watch_session: &mut WatchSession, watch_session: &mut WatchSession,
event: WatchEventData, event: WatchEventData,
) { ) {
match event { match event {
WatchEventData::SetPlaying { playing, time } => { WatchEventData::SetPlaying { playing, time } => {
watch_session.set_playing(playing, time); watch_session.set_playing(playing, time);
} }
WatchEventData::SetTime { from: _, to } => { WatchEventData::SetTime { from: _, to } => {
watch_session.set_time_ms(to); watch_session.set_time_ms(to);
} }
_ => {} _ => {}
}; };
let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone()); let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone());
} }