2021-10-25 00:51:26 +00:00
|
|
|
use once_cell::sync::Lazy;
|
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
|
|
|
sync::atomic::{AtomicUsize, Ordering},
|
|
|
|
};
|
|
|
|
|
|
|
|
use futures::{SinkExt, StreamExt, TryFutureExt};
|
|
|
|
use tokio::sync::{
|
|
|
|
mpsc::{self, UnboundedSender},
|
|
|
|
RwLock,
|
|
|
|
};
|
|
|
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
|
|
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
use warp::ws::{Message, WebSocket};
|
|
|
|
|
|
|
|
use crate::{
|
2021-11-10 14:29:52 +00:00
|
|
|
events::{WatchEvent, WatchEventData},
|
|
|
|
watch_session::{get_session, handle_watch_event_data},
|
2021-10-25 00:51:26 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
static CONNECTED_VIEWERS: Lazy<RwLock<HashMap<usize, ConnectedViewer>>> =
|
|
|
|
Lazy::new(|| RwLock::new(HashMap::new()));
|
|
|
|
static NEXT_VIEWER_ID: AtomicUsize = AtomicUsize::new(1);
|
|
|
|
|
|
|
|
pub struct ConnectedViewer {
|
|
|
|
pub session: Uuid,
|
|
|
|
pub viewer_id: usize,
|
|
|
|
pub tx: UnboundedSender<WatchEvent>,
|
2021-11-10 14:29:52 +00:00
|
|
|
pub nickname: Option<String>,
|
2021-10-25 00:51:26 +00:00
|
|
|
}
|
|
|
|
|
2021-10-25 01:59:52 +00:00
|
|
|
pub async fn ws_subscribe(session_uuid: Uuid, nickname: String, ws: WebSocket) {
|
2021-10-25 00:51:26 +00:00
|
|
|
let viewer_id = NEXT_VIEWER_ID.fetch_add(1, Ordering::Relaxed);
|
|
|
|
let (mut viewer_ws_tx, mut viewer_ws_rx) = ws.split();
|
|
|
|
|
|
|
|
let (tx, rx) = mpsc::unbounded_channel::<WatchEvent>();
|
|
|
|
let mut rx = UnboundedReceiverStream::new(rx);
|
|
|
|
|
|
|
|
tokio::task::spawn(async move {
|
|
|
|
while let Some(event) = rx.next().await {
|
|
|
|
viewer_ws_tx
|
|
|
|
.send(Message::text(
|
|
|
|
serde_json::to_string(&event).expect("couldn't convert WatchEvent into JSON"),
|
|
|
|
))
|
|
|
|
.unwrap_or_else(|e| eprintln!("ws send error: {}", e))
|
|
|
|
.await;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
CONNECTED_VIEWERS.write().await.insert(
|
|
|
|
viewer_id,
|
|
|
|
ConnectedViewer {
|
|
|
|
viewer_id,
|
|
|
|
session: session_uuid,
|
|
|
|
tx,
|
2021-11-10 14:29:52 +00:00
|
|
|
nickname: Some(nickname.clone()),
|
2021-10-25 00:51:26 +00:00
|
|
|
},
|
|
|
|
);
|
|
|
|
|
2021-11-10 14:29:52 +00:00
|
|
|
ws_publish(
|
|
|
|
session_uuid,
|
|
|
|
None,
|
|
|
|
WatchEvent::new(nickname.clone(), WatchEventData::UserJoin),
|
|
|
|
)
|
|
|
|
.await;
|
2021-10-25 01:59:52 +00:00
|
|
|
|
2021-10-25 00:51:26 +00:00
|
|
|
while let Some(Ok(message)) = viewer_ws_rx.next().await {
|
2021-11-10 14:29:52 +00:00
|
|
|
let event: WatchEventData = match message
|
2021-10-25 00:51:26 +00:00
|
|
|
.to_str()
|
|
|
|
.ok()
|
|
|
|
.and_then(|s| serde_json::from_str(s).ok())
|
|
|
|
{
|
|
|
|
Some(e) => e,
|
|
|
|
None => continue,
|
|
|
|
};
|
|
|
|
|
2021-11-10 14:29:52 +00:00
|
|
|
handle_watch_event_data(
|
2021-10-25 00:51:26 +00:00
|
|
|
session_uuid,
|
|
|
|
&mut get_session(session_uuid).unwrap(),
|
|
|
|
event.clone(),
|
|
|
|
);
|
|
|
|
|
2021-11-10 14:29:52 +00:00
|
|
|
ws_publish(
|
|
|
|
session_uuid,
|
|
|
|
Some(viewer_id),
|
|
|
|
WatchEvent::new(nickname.clone(), event),
|
|
|
|
)
|
|
|
|
.await;
|
2021-10-25 00:51:26 +00:00
|
|
|
}
|
|
|
|
|
2021-11-10 14:29:52 +00:00
|
|
|
ws_publish(
|
|
|
|
session_uuid,
|
|
|
|
None,
|
|
|
|
WatchEvent::new(nickname.clone(), WatchEventData::UserLeave),
|
|
|
|
)
|
|
|
|
.await;
|
2021-10-25 01:59:52 +00:00
|
|
|
|
2021-10-25 00:51:26 +00:00
|
|
|
CONNECTED_VIEWERS.write().await.remove(&viewer_id);
|
|
|
|
}
|
|
|
|
|
2021-11-10 14:29:52 +00:00
|
|
|
pub async fn ws_publish(session_uuid: Uuid, skip_viewer_id: Option<usize>, event: WatchEvent) {
|
2021-10-25 00:51:26 +00:00
|
|
|
for viewer in CONNECTED_VIEWERS.read().await.values() {
|
|
|
|
if viewer.session != session_uuid {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2021-11-10 14:29:52 +00:00
|
|
|
let _ = viewer.tx.send(WatchEvent {
|
|
|
|
reflected: skip_viewer_id == Some(viewer.viewer_id),
|
|
|
|
..event.clone()
|
|
|
|
});
|
2021-10-25 00:51:26 +00:00
|
|
|
}
|
|
|
|
}
|