Use WebSockets to communicate with the server instead of HTTP verbs

This means that we can filter out our own events and have an outgoing
debounce set up when we receive an incoming event. Hopefully this means
that we stop seeing repeat events when there is more latency involved.
This commit is contained in:
Charlotte Som 2021-10-25 01:51:26 +01:00
parent 3e00d296f8
commit 7796d8e5f0
5 changed files with 213 additions and 129 deletions

View file

@ -32,6 +32,9 @@ const createVideoElement = (videoUrl, subtitles) => {
return video;
}
let outgoingDebounce = false;
let outgoingDebounceCallbackId = null;
/**
* @param {WebSocket} socket
* @param {HTMLVideoElement} video
@ -50,6 +53,8 @@ const setupSocketEvents = (socket, video) => {
const event = JSON.parse(messageEvent.data);
console.log(event);
outgoingDebounce = true;
switch (event.op) {
case "SetPlaying":
if (event.data.playing) {
@ -68,6 +73,70 @@ const setupSocketEvents = (socket, video) => {
}
} catch (_err) {
}
if (outgoingDebounceCallbackId) {
cancelIdleCallback(outgoingDebounceCallbackId);
outgoingDebounceCallbackId = null;
}
outgoingDebounceCallbackId = setTimeout(() => {
outgoingDebounce = false;
}, 500);
});
}
/**
* @param {string} sessionId
* @param {HTMLVideoElement} video
* @param {WebSocket} socket
*/
const setupVideoEvents = (sessionId, video, socket) => {
const currentVideoTime = () => (video.currentTime * 1000) | 0;
video.addEventListener("pause", async event => {
if (outgoingDebounce) {
return;
}
socket.send(JSON.stringify({
"op": "SetPlaying",
"data": {
"playing": false,
"time": currentVideoTime(),
}
}));
});
video.addEventListener("play", event => {
if (outgoingDebounce) {
return;
}
socket.send(JSON.stringify({
"op": "SetPlaying",
"data": {
"playing": true,
"time": currentVideoTime(),
}
}));
});
let firstSeekComplete = false;
video.addEventListener("seeked", async event => {
if (!firstSeekComplete) {
// The first seeked event is performed by the browser when the video is loading
firstSeekComplete = true;
return;
}
if (outgoingDebounce) {
return;
}
socket.send(JSON.stringify({
"op": "SetTime",
"data": currentVideoTime(),
}));
});
}
@ -94,44 +163,8 @@ const setupVideo = async (sessionId, videoUrl, subtitles, currentTime, playing,
// Auto-play is probably disabled, we should uhhhhhhh do something about it
}
video.addEventListener("pause", async event => {
await fetch(`/sess/${sessionId}/playing`, {
method: "PUT",
body: JSON.stringify(false),
headers: {
"Content-Type": "application/json"
},
});
});
video.addEventListener("play", async event => {
await fetch(`/sess/${sessionId}/playing`, {
method: "PUT",
body: JSON.stringify(true),
headers: {
"Content-Type": "application/json"
},
});
});
let firstSeekComplete = false;
video.addEventListener("seeked", async event => {
if (!firstSeekComplete) {
// The first seeked event is performed by the browser when the video is loading
firstSeekComplete = true;
return;
}
await fetch(`/sess/${sessionId}/current_time`, {
method: "PUT",
body: JSON.stringify((video.currentTime * 1000) | 0),
headers: {
"Content-Type": "application/json"
},
});
});
setupSocketEvents(socket, video);
setupVideoEvents(sessionId, video, socket);
}
/** @param {string} sessionId */

View file

@ -1,69 +1,8 @@
use once_cell::sync::Lazy;
use std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
};
use futures::{SinkExt, StreamExt, TryFutureExt};
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use warp::ws::{Message, WebSocket};
static CONNECTED_VIEWERS: Lazy<RwLock<HashMap<usize, ConnectedViewer>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1);
#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "op", content = "data")]
pub enum WatchEvent {
SetPlaying { playing: bool, time: u64 },
SetTime(u64),
}
pub struct ConnectedViewer {
pub session_uuid: Uuid,
pub tx: UnboundedSender<WatchEvent>,
}
pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) {
let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed);
let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split();
let (tx, rx) = mpsc::unbounded_channel::<WatchEvent>();
let mut rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(async move {
while let Some(event) = rx.next().await {
viewer_ws_tx
.send(Message::text(
serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"),
))
.unwrap_or_else(|e| eprintln!("ws send error: {}", e))
.await;
}
});
CONNECTED_VIEWERS
.write()
.await
.insert(viewer_id, ConnectedViewer { session_uuid, tx });
while let Some(Ok(_)) = viewer_ws_rx.next().await {}
CONNECTED_VIEWERS.write().await.remove(&viewer_id);
}
pub async fn ws_publish(session_uuid: Uuid, event: WatchEvent) {
for viewer in CONNECTED_VIEWERS.read().await.values() {
if viewer.session_uuid != session_uuid {
continue;
}
let _ = viewer.tx.send(event.clone());
}
}

View file

@ -1,25 +1,22 @@
use std::{collections::HashMap, net::IpAddr, sync::Mutex};
use once_cell::sync::Lazy;
use serde_json::json;
use std::net::IpAddr;
use uuid::Uuid;
use warb::{hyper::StatusCode, Filter, Reply};
use warp as warb; // i think it's funny
mod events;
mod viewer_connection;
mod watch_session;
use serde::Deserialize;
use crate::{
events::{ws_publish, ws_subscribe, WatchEvent},
watch_session::{SubtitleTrack, WatchSession},
events::WatchEvent,
viewer_connection::{ws_publish, ws_subscribe},
watch_session::{get_session, handle_watch_event, SubtitleTrack, WatchSession, SESSIONS},
};
static SESSIONS: Lazy<Mutex<HashMap<Uuid, WatchSession>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[derive(Deserialize)]
struct StartSessionBody {
pub video_url: String,
@ -52,14 +49,14 @@ async fn main() {
.and(warb::path::param::<String>())
.map(|session_id: String| {
if let Ok(uuid) = Uuid::parse_str(&session_id) {
if let Some(session) = SESSIONS.lock().unwrap().get(&uuid) {
RequestedSession::Session(uuid, session.clone())
} else {
RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "session does not exist" })),
StatusCode::NOT_FOUND,
))
}
get_session(uuid)
.map(|sess| RequestedSession::Session(uuid, sess))
.unwrap_or_else(|| {
RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "session does not exist" })),
StatusCode::NOT_FOUND,
))
})
} else {
RequestedSession::Error(warb::reply::with_status(
warb::reply::json(&json!({ "error": "invalid session UUID" })),
@ -83,13 +80,13 @@ async fn main() {
.and(warb::body::json())
.map(|requested_session, playing: bool| match requested_session {
RequestedSession::Session(uuid, mut sess) => {
sess.set_playing(playing);
let time = sess.get_time_ms();
SESSIONS.lock().unwrap().insert(uuid, sess.clone());
let event = WatchEvent::SetPlaying {
playing,
time: sess.get_time_ms(),
};
tokio::spawn(async move {
ws_publish(uuid, WatchEvent::SetPlaying { playing, time }).await
});
handle_watch_event(uuid, &mut sess, event.clone());
tokio::spawn(ws_publish(uuid, None, event));
warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK)
}
@ -103,12 +100,10 @@ async fn main() {
.map(
|requested_session, current_time_ms: u64| match requested_session {
RequestedSession::Session(uuid, mut sess) => {
sess.set_time_ms(current_time_ms);
SESSIONS.lock().unwrap().insert(uuid, sess.clone());
let event = WatchEvent::SetTime(current_time_ms);
tokio::spawn(async move {
ws_publish(uuid, WatchEvent::SetTime(current_time_ms)).await
});
handle_watch_event(uuid, &mut sess, event.clone());
tokio::spawn(ws_publish(uuid, None, event));
warb::reply::with_status(warb::reply::json(&sess.view()), StatusCode::OK)
}

93
src/viewer_connection.rs Normal file
View file

@ -0,0 +1,93 @@
use once_cell::sync::Lazy;
use std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
};
use futures::{SinkExt, StreamExt, TryFutureExt};
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use warp::ws::{Message, WebSocket};
use crate::{
events::WatchEvent,
watch_session::{get_session, handle_watch_event},
};
static CONNECTED_VIEWERS: Lazy<RwLock<HashMap<usize, ConnectedViewer>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1);
pub struct ConnectedViewer {
pub session: Uuid,
pub viewer_id: usize,
pub tx: UnboundedSender<WatchEvent>,
}
pub async fn ws_subscribe(session_uuid: Uuid, ws: WebSocket) {
let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed);
let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split();
let (tx, rx) = mpsc::unbounded_channel::<WatchEvent>();
let mut rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(async move {
while let Some(event) = rx.next().await {
viewer_ws_tx
.send(Message::text(
serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"),
))
.unwrap_or_else(|e| eprintln!("ws send error: {}", e))
.await;
}
});
CONNECTED_VIEWERS.write().await.insert(
viewer_id,
ConnectedViewer {
viewer_id,
session: session_uuid,
tx,
},
);
while let Some(Ok(message)) = viewer_ws_rx.next().await {
let event: WatchEvent = match message
.to_str()
.ok()
.and_then(|s| serde_json::from_str(s).ok())
{
Some(e) => e,
None => continue,
};
handle_watch_event(
session_uuid,
&mut get_session(session_uuid).unwrap(),
event.clone(),
);
ws_publish(session_uuid, Some(viewer_id), event).await;
}
CONNECTED_VIEWERS.write().await.remove(&viewer_id);
}
pub async fn ws_publish(session_uuid: Uuid, viewer_id: Option<usize>, event: WatchEvent) {
for viewer in CONNECTED_VIEWERS.read().await.values() {
if viewer_id == Some(viewer.viewer_id) {
continue;
}
if viewer.session != session_uuid {
continue;
}
let _ = viewer.tx.send(event.clone());
}
}

View file

@ -1,5 +1,9 @@
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use std::{collections::HashMap, sync::Mutex, time::Instant};
use uuid::Uuid;
use crate::events::WatchEvent;
#[derive(Serialize, Deserialize, Clone)]
pub struct SubtitleTrack {
@ -15,7 +19,6 @@ pub struct WatchSession {
is_playing: bool,
playing_from_timestamp: u64,
playing_from_instant: Instant,
// TODO: How do we keep track of the current playing time ?
}
#[derive(Serialize)]
@ -59,8 +62,29 @@ impl WatchSession {
self.playing_from_instant = Instant::now();
}
pub fn set_playing(&mut self, playing: bool) {
self.set_time_ms(self.get_time_ms());
pub fn set_playing(&mut self, playing: bool, time_ms: u64) {
self.set_time_ms(time_ms);
self.is_playing = playing;
}
}
pub static SESSIONS: Lazy<Mutex<HashMap<Uuid, WatchSession>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub fn get_session(uuid: Uuid) -> Option<WatchSession> {
SESSIONS.lock().unwrap().get(&uuid).cloned()
}
pub fn handle_watch_event(uuid: Uuid, watch_session: &mut WatchSession, event: WatchEvent) {
match event {
WatchEvent::SetPlaying { playing, time } => {
watch_session.set_playing(playing, time);
}
WatchEvent::SetTime(time) => {
watch_session.set_time_ms(time);
}
};
let _ = SESSIONS.lock().unwrap().insert(uuid, watch_session.clone());
}