Browse Source

Initial commit

pull/1/head
Charlotte Som 10 months ago
commit
468843c430
  1. 12
      .editorconfig
  2. 1
      .gitignore
  3. 1151
      Cargo.lock
  4. 15
      Cargo.toml
  5. 36
      frontend/index.html
  6. 175
      frontend/main.js
  7. 101
      frontend/styles.css
  8. 69
      src/events.rs
  9. 140
      src/main.rs
  10. 66
      src/watch_session.rs

12
.editorconfig

@ -0,0 +1,12 @@
root = true
[*]
indent_style = space
indent_size = 2
end_of_line = crlf
charset = utf-8
trim_trailing_whitespace = false
insert_final_newline = true
[*.rs]
indent_size = 4

1
.gitignore vendored

@ -0,0 +1 @@
/target

1151
Cargo.lock generated

File diff suppressed because it is too large Load Diff

15
Cargo.toml

@ -0,0 +1,15 @@
[package]
name = "watch-party"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.17"
futures-util = "0.3.17"
once_cell = "1.8.0"
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
tokio = { version = "1.12.0", features = ["full"] }
tokio-stream = "0.1.7"
uuid = { version = "0.8.2", features = ["v4"] }
warp = "0.3.1"

36
frontend/index.html

@ -0,0 +1,36 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>watch party :D</title>
<link rel="stylesheet" href="/styles.css" />
</head>
<body>
<noscript>
This site will <em>not</em> work without JavaScript, and there's not
really any way around that :(
</noscript>
<div id="pre-join-controls">
<form id="join-session-form">
<h2>Join a session</h2>
<label for="session-id">Session ID:</label>
<input
type="text"
id="join-session-id"
placeholder="123e4567-e89b-12d3-a456-426614174000"
required
/>
<button>Join</button>
</form>
<p>
No session to join? <a href="/create.html">Create a session</a> instead.
</p>
</div>
<script src="/main.js"></script>
</body>
</html>

175
frontend/main.js

@ -0,0 +1,175 @@
/**
* @param {string} videoUrl
* @param {{name: string, url: string}[]} subtitles
*/
const createVideoElement = (videoUrl, subtitles) => {
document.querySelector("#pre-join-controls").style["display"] = "none";
const video = document.createElement("video");
video.controls = true;
video.autoplay = false;
const source = document.createElement("source");
source.src = videoUrl;
video.appendChild(source);
let first = true;
for (const { name, url } of subtitles) {
const track = document.createElement("track");
track.label = name;
track.src = url;
track.kind = "captions";
if (first) {
track.default = true;
first = false;
}
video.appendChild(track);
}
return video;
}
/**
* @param {WebSocket} socket
* @param {HTMLVideoElement} video
*/
const setupSocketEvents = (socket, video) => {
const setVideoTime = time => {
const timeSecs = time / 1000.0;
if (Math.abs(video.currentTime - timeSecs) > 0.5) {
video.currentTime = timeSecs;
}
}
socket.addEventListener("message", async messageEvent => {
try {
const event = JSON.parse(messageEvent.data);
console.log(event);
switch (event.op) {
case "SetPlaying":
if (event.data.playing) {
await video.play();
} else {
video.pause();
}
setVideoTime(event.data.time);
break;
case "SetTime":
setVideoTime(event.data);
break;
}
} catch (_err) {
}
});
}
/**
* @param {string} videoUrl
* @param {{name: string, url: string}[]} subtitles
* @param {number} currentTime
* @param {boolean} playing
* @param {WebSocket} socket
*/
const setupVideo = async (sessionId, videoUrl, subtitles, currentTime, playing, socket) => {
const video = createVideoElement(videoUrl, subtitles);
document.body.appendChild(video);
video.currentTime = (currentTime / 1000.0);
try {
if (playing) {
await video.play()
} else {
video.pause()
}
} catch (err) {
// Auto-play is probably disabled, we should uhhhhhhh do something about it
}
video.addEventListener("pause", async event => {
await fetch(`/sess/${sessionId}/playing`, {
method: "PUT",
body: JSON.stringify(false),
headers: {
"Content-Type": "application/json"
},
});
});
video.addEventListener("play", async event => {
await fetch(`/sess/${sessionId}/playing`, {
method: "PUT",
body: JSON.stringify(true),
headers: {
"Content-Type": "application/json"
},
});
});
let firstSeekComplete = false;
video.addEventListener("seeked", async event => {
if (!firstSeekComplete) {
// The first seeked event is performed by the browser when the video is loading
firstSeekComplete = true;
return;
}
await fetch(`/sess/${sessionId}/current_time`, {
method: "PUT",
body: JSON.stringify((video.currentTime * 1000) | 0),
headers: {
"Content-Type": "application/json"
},
});
});
setupSocketEvents(socket, video);
}
/** @param {string} sessionId */
const joinSession = async (sessionId) => {
try {
window.location.hash = sessionId;
const {
video_url, subtitle_tracks,
current_time_ms, is_playing
} = await fetch(`/sess/${sessionId}`).then(r => r.json());
const wsUrl = new URL(`/sess/${sessionId}/subscribe`, window.location.href);
wsUrl.protocol = { "http:": "ws:", "https:": "wss:" }[wsUrl.protocol];
const socket = new WebSocket(wsUrl.toString());
setupVideo(sessionId, video_url, subtitle_tracks, current_time_ms, is_playing, socket);
} catch (err) {
// TODO: Show an error on the screen
console.error(err);
}
}
const main = () => {
document.querySelector("#join-session-form").addEventListener("submit", event => {
event.preventDefault();
const sessionId = document.querySelector("#join-session-id").value;
joinSession(sessionId);
});
if (window.location.hash.match(/#[0-9a-f\-]+/)) {
document.querySelector("#join-session-id").value = window.location.hash.substring(1);
}
};
if (document.readyState === "complete") {
main();
} else {
document.addEventListener("DOMContentLoaded", main);
}

101
frontend/styles.css

@ -0,0 +1,101 @@
:root {
--bg: rgb(28, 23, 36);
--fg: rgb(234, 234, 248);
--accent: hsl(275, 57%, 68%);
}
html {
background-color: var(--bg);
color: var(--fg);
font-size: 1.125rem;
font-family: sans-serif;
}
html,
body {
margin: 0;
}
video {
width: 100vw;
height: auto;
}
a {
color: var(--accent);
}
label {
display: block;
}
input[type="url"],
input[type="text"] {
box-sizing: border-box;
background: #fff;
background-clip: padding-box;
border: 1px solid rgba(0, 0, 0, 0.12);
border-radius: 6px;
color: rgba(0, 0, 0, 0.8);
display: block;
margin: 0.5em 0;
padding: 0.5em 1em;
line-height: 1.5;
font-family: sans-serif;
font-size: 1em;
width: 500px;
resize: none;
overflow-x: wrap;
overflow-y: scroll;
}
button {
background-color: var(--accent);
border: var(--accent);
border-radius: 6px;
color: #fff;
padding: 0.5em 1em;
display: inline-block;
font-weight: 400;
text-align: center;
white-space: nowrap;
vertical-align: middle;
font-family: sans-serif;
font-size: 1em;
width: 500px;
user-select: none;
border: 1px solid rgba(0, 0, 0, 0);
line-height: 1.5;
}
button.small-button {
font-size: 0.75em;
padding-top: 0;
padding-bottom: 0;
}
.subtitle-track-group {
display: flex;
}
.subtitle-track-group > * {
margin-top: 0 !important;
margin-bottom: 0 !important;
margin-right: 1ch !important;
}
#pre-join-controls {
width: 60%;
margin: 0 auto;
margin-top: 4em;
}
#join-session-form {
margin-bottom: 4em;
}

69
src/events.rs

@ -0,0 +1,69 @@
use once_cell::sync::Lazy;
use std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
};
use futures::{SinkExt, StreamExt, TryFutureExt};
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use warp::ws::{Message, WebSocket};
static CONNECTED_VIEWERS: Lazy<RwLock<HashMap<usize, ConnectedViewer>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1);
#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "op", content = "data")]
pub enum WatchEvent {
SetPlaying { playing: bool, time: u64 },
SetTime(u64),
}
pub struct ConnectedViewer {
pub session_uuid: Uuid,
pub tx: UnboundedSender<WatchEvent>,
}
pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) {
let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed);
let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split();
let (tx, rx) = mpsc::unbounded_channel::<WatchEvent>();
let mut rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(async move {
while let Some(event) = rx.next().await {
viewer_ws_tx
.send(Message::text(
serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"),
))
.unwrap_or_else(|e| eprintln!("ws send error: {}", e))
.await;
}
});
CONNECTED_VIEWERS
.write()
.await
.insert(viewer_id, ConnectedViewer { session_uuid, tx });
while let Some(Ok(_)) = viewer_ws_rx.next().await {}
CONNECTED_VIEWERS.write().await.remove(&viewer_id);
}
pub async fn ws_publish(session_uuid: Uuid, event: WatchEvent) {
for viewer in CONNECTED_VIEWERS.read().await.values() {
if viewer.session_uuid != session_uuid {
continue;
}
let _ = viewer.tx.send(event.clone());
}
}

140
src/main.rs

@ -0,0 +1,140 @@
use std::{collections::HashMap, sync::Mutex};
use once_cell::sync::Lazy;
use serde_json::json;
use uuid::Uuid;
use warb::{hyper::StatusCode, Filter, Reply};
use warp as warb; // i think it's funny
mod events;
mod watch_session;
use serde::Deserialize;
use crate::{
events::{ws_publish, ws_subscribe, WatchEvent},
watch_session::{SubtitleTrack, WatchSession},
};
static SESSIONS: Lazy<Mutex<HashMap<Uuid, WatchSession>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[derive(Deserialize)]
struct StartSessionBody {
pub video_url: String,
#[serde(default = "Vec::new")]
pub subtitle_tracks: Vec<SubtitleTrack>,
}
#[tokio::main]
async fn main() {
let start_session_route = warb::path!("start_session")
.and(warb::path::end())
.and(warb::post())
.and(warb::body::json())
.map(|body: StartSessionBody| {
let mut sessions = SESSIONS.lock().unwrap();
let session_uuid = Uuid::new_v4();
let session = WatchSession::new(body.video_url, body.subtitle_tracks);
let session_view = session.view();
sessions.insert(session_uuid, session);
warb::reply::json(&json!({ "id": session_uuid.to_string(), "session": session_view }))
});
enum RequestedSession {
Session(Uuid, WatchSession),
Error(warb::reply::WithStatus<warb::reply::Json>),
}
let get_running_session = warb::path::path("sess")
.and(warb::path::param::<String>())
.map(|session_id: String| {
if let Ok(uuid) = Uuid::parse_str(&session_id) {
if let Some(session) = SESSIONS.lock().unwrap().get(&uuid) {
RequestedSession::Session(uuid, session.clone())
} else {
RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "session does not exist" })),
StatusCode::NOT_FOUND,
))
}
} else {
RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "invalid session UUID" })),
StatusCode::BAD_REQUEST,
))
}
});
let get_status_route = get_running_session
.and(warb::path::end())
.map(|requested_session| match requested_session {
RequestedSession::Session(_, sess) => {
warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK)
}
RequestedSession::Error(e) => e,
});
let set_playing_route = get_running_session
.and(warb::path!("playing"))
.and(warb::put())
.and(warb::body::json())
.map(|requested_session, playing: bool| match requested_session {
RequestedSession::Session(uuid, mut sess) => {
sess.set_playing(playing);
let time = sess.get_time_ms();
SESSIONS.lock().unwrap().insert(uuid, sess.clone());
tokio::spawn(async move {
ws_publish(uuid, WatchEvent::SetPlaying { playing, time }).await
});
warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK)
}
RequestedSession::Error(e) => e,
});
let set_timestamp_route = get_running_session
.and(warb::path!("current_time"))
.and(warb::put())
.and(warb::body::json())
.map(
|requested_session, current_time_ms: u64| match requested_session {
RequestedSession::Session(uuid, mut sess) => {
sess.set_time_ms(current_time_ms);
SESSIONS.lock().unwrap().insert(uuid, sess.clone());
tokio::spawn(async move {
ws_publish(uuid, WatchEvent::SetTime(current_time_ms)).await
});
warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK)
}
RequestedSession::Error(e) => e,
},
);
let ws_subscribe_route = get_running_session
.and(warp::path!("subscribe"))
.and(warp::ws())
.map(
|requested_session, ws: warb::ws::Ws| match requested_session {
RequestedSession::Session(uuid, _) => ws
.on_upgrade(move |ws| ws_subscribe(uuid, ws))
.into_response(),
RequestedSession::Error(error_response) => error_response.into_response(),
},
);
let routes = start_session_route
.or(get_status_route)
.or(set_playing_route)
.or(set_timestamp_route)
.or(ws_subscribe_route)
.or(warb::path::end().and(warb::fs::file("frontend/index.html")))
.or(warb::fs::dir("frontend"));
warb::serve(routes).run(([127, 0, 0, 1], 3000)).await;
}

66
src/watch_session.rs

@ -0,0 +1,66 @@
use serde::{Deserialize, Serialize};
use std::time::Instant;
#[derive(Serialize, Deserialize, Clone)]
pub struct SubtitleTrack {
pub url: String,
pub name: String,
}
#[derive(Clone)]
pub struct WatchSession {
pub video_url: String,
pub subtitle_tracks: Vec<SubtitleTrack>,
is_playing: bool,
playing_from_timestamp: u64,
playing_from_instant: Instant,
// TODO: How do we keep track of the current playing time ?
}
#[derive(Serialize)]
pub struct WatchSessionView {
pub video_url: String,
pub subtitle_tracks: Vec<SubtitleTrack>,
pub current_time_ms: u64,
pub is_playing: bool,
}
impl WatchSession {
pub fn new(video_url: String, subtitle_tracks: Vec<SubtitleTrack>) -> Self {
WatchSession {
video_url,
subtitle_tracks,
is_playing: false,
playing_from_timestamp: 0,
playing_from_instant: Instant::now(),
}
}
pub fn view(&self) -> WatchSessionView {
WatchSessionView {
video_url: self.video_url.clone(),
subtitle_tracks: self.subtitle_tracks.clone(),
current_time_ms: self.get_time_ms() as u64,
is_playing: self.is_playing,
}
}
pub fn get_time_ms(&self) -> u64 {
if !self.is_playing {
return self.playing_from_timestamp;
}
self.playing_from_timestamp + self.playing_from_instant.elapsed().as_millis() as u64
}
pub fn set_time_ms(&mut self, time_ms: u64) {
self.playing_from_timestamp = time_ms;
self.playing_from_instant = Instant::now();
}
pub fn set_playing(&mut self, playing: bool) {
self.set_time_ms(self.get_time_ms());
self.is_playing = playing;
}
}
Loading…
Cancel
Save