From b9daf95a04f67604d22873746d89f668929dea10 Mon Sep 17 00:00:00 2001 From: videogame hacker Date: Sat, 16 Apr 2022 09:06:48 +0100 Subject: [PATCH] Route messages from a source channel to a destination Hyperfixation commit lol --- mid-chat/src/lib.rs | 4 +- mid-chat/src/reference.rs | 15 +++ phoebe-main/src/main.rs | 62 +++++++++-- phoebe/.env | 2 +- phoebe/Cargo.toml | 2 + phoebe/build.rs | 3 + ...20416061935_create_message_links_table.sql | 10 ++ ...20416065641_create_channel_links_table.sql | 8 ++ phoebe/src/lib.rs | 103 +++++++++++++++++- phoebe/src/prelude.rs | 2 +- phoebe/src/service.rs | 10 +- services/phoebe-discord/.env | 2 +- services/phoebe-discord/src/chat_conv.rs | 2 +- services/phoebe-discord/src/handler.rs | 13 +-- services/phoebe-discord/src/lib.rs | 22 +++- services/phoebe-discord/src/sender.rs | 41 +++++++ 16 files changed, 266 insertions(+), 35 deletions(-) create mode 100644 phoebe/build.rs create mode 100644 phoebe/migrations/20220416061935_create_message_links_table.sql create mode 100644 phoebe/migrations/20220416065641_create_channel_links_table.sql create mode 100644 services/phoebe-discord/src/sender.rs diff --git a/mid-chat/src/lib.rs b/mid-chat/src/lib.rs index 5c96a07..8fa01d7 100644 --- a/mid-chat/src/lib.rs +++ b/mid-chat/src/lib.rs @@ -13,11 +13,11 @@ pub use content::*; #[derive(Debug, Clone)] pub struct ChatMessage { - pub origin: ChatReference, + pub origin: ChatMessageReference, pub author: ChatAuthor, pub content: ChatMessageContent, pub attachments: Vec<()>, - pub replying: Option, + pub replying: Option, } pub mod event; diff --git a/mid-chat/src/reference.rs b/mid-chat/src/reference.rs index bdc70c3..617a734 100644 --- a/mid-chat/src/reference.rs +++ b/mid-chat/src/reference.rs @@ -3,3 +3,18 @@ pub struct ChatReference { pub service: &'static str, pub id: String, } + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChatMessageReference { + pub channel: ChatReference, + pub message_id: String, +} + +impl ChatMessageReference { + pub fn new(channel: ChatReference, message_id: impl ToString) -> Self { + Self { + channel, + message_id: message_id.to_string(), + } + } +} diff --git a/phoebe-main/src/main.rs b/phoebe-main/src/main.rs index c1a5934..10f59fc 100644 --- a/phoebe-main/src/main.rs +++ b/phoebe-main/src/main.rs @@ -2,7 +2,55 @@ use color_eyre::Result; use tracing::info; use tracing_subscriber::EnvFilter; -use phoebe::service::Service; +use phoebe::{ + get_linked_channels, link_messages, + prelude::{ChatEvent, SqlitePool}, + service::Service, +}; + +fn dyn_service(service: &str) -> &'static str { + match service { + "discord" => "discord", + "matrix" => "matrix", + _ => panic!("Unsupported service: {}", service), + } +} + +async fn handle_events( + db: SqlitePool, + mut service: Box, + mut rx: tokio::sync::broadcast::Receiver, +) { + info!("Handling events for {}…", service.tag()); + + let mut conn = db + .acquire() + .await + .expect("Failed to acquire core DB connection"); + + while let Ok(event) = rx.recv().await { + match event { + phoebe::prelude::ChatEvent::NewMessage(message) => { + let linked_channels = + get_linked_channels(&mut conn, &message.origin.channel, dyn_service).await; + + let mut resulting_messages = vec![]; + for destination_channel in linked_channels { + resulting_messages.extend( + service + .send_chat_message(&message, destination_channel) + .await, + ) + } + + if let Err(e) = link_messages(&mut conn, &message.origin, &resulting_messages).await + { + tracing::error!("Failed to link messages: {e}"); + } + } + } + } +} #[tokio::main] async fn main() -> Result<()> { @@ -20,15 +68,9 @@ async fn main() -> Result<()> { phoebe_discord::setup(db.clone(), tx.clone()).await?, )]; - let handles = services.into_iter().map(|mut srv| { - let mut rx = tx.subscribe(); - tokio::spawn(async move { - info!("Handling events for {}…", srv.get_service_tag()); - while let Ok(event) = rx.recv().await { - srv.handle_chat_event(&event).await; - } - }) - }); + let handles = services + .into_iter() + .map(|srv| tokio::spawn(handle_events(db.clone(), srv, tx.subscribe()))); let _ = futures::future::join_all(handles).await; diff --git a/phoebe/.env b/phoebe/.env index c7b4089..25ef1db 100644 --- a/phoebe/.env +++ b/phoebe/.env @@ -1 +1 @@ -DATABASE_URL="sqlite://PHOEBE_DB_ROOT/main.db" +DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/main.db" diff --git a/phoebe/Cargo.toml b/phoebe/Cargo.toml index 760bcdc..87b6658 100644 --- a/phoebe/Cargo.toml +++ b/phoebe/Cargo.toml @@ -11,3 +11,5 @@ sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "sqlite"] } tracing = "0.1" async-trait = "0.1.53" eyre = "0.6.8" +tokio-stream = "0.1.8" +futures = "0.3.21" diff --git a/phoebe/build.rs b/phoebe/build.rs new file mode 100644 index 0000000..3a8149e --- /dev/null +++ b/phoebe/build.rs @@ -0,0 +1,3 @@ +fn main() { + println!("cargo:rerun-if-changed=migrations"); +} diff --git a/phoebe/migrations/20220416061935_create_message_links_table.sql b/phoebe/migrations/20220416061935_create_message_links_table.sql new file mode 100644 index 0000000..037ff3e --- /dev/null +++ b/phoebe/migrations/20220416061935_create_message_links_table.sql @@ -0,0 +1,10 @@ +CREATE TABLE message_links ( + id INTEGER PRIMARY KEY AUTOINCREMENT +) STRICT; + +CREATE TABLE messages ( + link_id INTEGER NOT NULL REFERENCES message_links(id), + service TEXT NOT NULL, + channel TEXT NOT NULL, + message TEXT NOT NULL +) diff --git a/phoebe/migrations/20220416065641_create_channel_links_table.sql b/phoebe/migrations/20220416065641_create_channel_links_table.sql new file mode 100644 index 0000000..e2bfcae --- /dev/null +++ b/phoebe/migrations/20220416065641_create_channel_links_table.sql @@ -0,0 +1,8 @@ +CREATE TABLE channel_links ( + from_service TEXT NOT NULL, + from_channel TEXT NOT NULL, + to_service TEXT NOT NULL, + to_channel TEXT NOT NULL, + + PRIMARY KEY (from_service, from_channel, to_service, to_channel) +) STRICT; diff --git a/phoebe/src/lib.rs b/phoebe/src/lib.rs index 390b2ec..7741940 100644 --- a/phoebe/src/lib.rs +++ b/phoebe/src/lib.rs @@ -1,14 +1,111 @@ +use futures::stream::BoxStream; pub use mid_chat; +use mid_chat::{ChatMessageReference, ChatReference}; + +use sqlx::{Row, SqliteConnection, SqlitePool}; +use tokio::sync::broadcast::*; +use tokio_stream::StreamExt; pub mod db; pub mod prelude; pub mod service; -pub type ChatEventSender = tokio::sync::broadcast::Sender; -pub type ChatEventReceiver = tokio::sync::broadcast::Receiver; +pub type ChatEventSender = Sender; +pub type ChatEventReceiver = Receiver; -pub async fn open_core_db() -> sqlx::Result { +pub type DynServiceLookup = fn(&str) -> &'static str; + +pub async fn open_core_db() -> sqlx::Result { let db = db::open("main").await?; sqlx::migrate!().run(&db).await?; Ok(db) } + +pub async fn get_linked_channels( + conn: &mut SqliteConnection, + channel: &ChatReference, + dyn_service: DynServiceLookup, +) -> Vec { + let from_service = channel.service; + let from_channel = &channel.id; + let query = sqlx::query!( + "SELECT * FROM channel_links WHERE from_service = ? AND from_channel = ?", + from_service, + from_channel + ); + + query + .fetch(&mut *conn) + .filter_map(Result::ok) + .map(|r| ChatReference { + service: dyn_service(&r.to_service), + id: r.to_channel, + }) + .collect() + .await +} + +pub async fn link_messages( + conn: &mut SqliteConnection, + origin: &ChatMessageReference, + messages: &[ChatMessageReference], +) -> sqlx::Result<()> { + let message_link = sqlx::query!("INSERT INTO message_links DEFAULT VALUES") + .execute(&mut *conn) + .await? + .last_insert_rowid(); + + for resultant in std::iter::once(origin).chain(messages) { + let service = &resultant.channel.service; + let channel = &resultant.channel.id; + let message = &resultant.message_id; + + let query = sqlx::query!( + "INSERT INTO messages VALUES (?, ?, ?, ?)", + message_link, + service, + channel, + message + ); + + let _ = query.execute(&mut *conn).await?; + } + + Ok(()) +} + +pub async fn get_linked_messages<'a>( + conn: &'a mut SqliteConnection, + message: &ChatMessageReference, + dyn_service: DynServiceLookup, +) -> sqlx::Result> { + let link_id = { + let service = &message.channel.service; + let channel = &message.channel.id; + let message_id = &message.message_id; + let query = sqlx::query!( + "SELECT link_id FROM messages WHERE service = ? AND channel = ? AND message = ?", + service, + channel, + message_id + ); + query.fetch_one(&mut *conn).await + }? + .link_id; + + let stream = sqlx::query("SELECT * FROM messages WHERE link_id = ?") + .bind(link_id) + .fetch(&mut *conn) + .filter_map(Result::ok) + .map(move |r| { + ChatMessageReference::new( + ChatReference { + service: dyn_service(&r.get::("service")), + id: r.get("channel"), + }, + r.get::("message"), + ) + }); + + Ok(Box::pin(stream)) +} diff --git a/phoebe/src/prelude.rs b/phoebe/src/prelude.rs index 5dbfd89..2d2b381 100644 --- a/phoebe/src/prelude.rs +++ b/phoebe/src/prelude.rs @@ -3,4 +3,4 @@ pub use crate::{service::Service, ChatEventReceiver, ChatEventSender}; pub use async_trait::async_trait; pub use eyre::Result; pub use mid_chat::event::ChatEvent; -pub use sqlx::SqlitePool; +pub use sqlx::{SqliteConnection, SqlitePool}; diff --git a/phoebe/src/service.rs b/phoebe/src/service.rs index f06d351..1a02dea 100644 --- a/phoebe/src/service.rs +++ b/phoebe/src/service.rs @@ -1,7 +1,11 @@ -use mid_chat::event::ChatEvent; +use mid_chat::{ChatMessage, ChatMessageReference, ChatReference}; #[async_trait::async_trait] pub trait Service { - fn get_service_tag(&self) -> &'static str; - async fn handle_chat_event(&mut self, event: &ChatEvent); + fn tag(&self) -> &'static str; + async fn send_chat_message( + &mut self, + source: &ChatMessage, + destination_channel: ChatReference, + ) -> Vec; } diff --git a/services/phoebe-discord/.env b/services/phoebe-discord/.env index 9ee280a..0c51a39 100644 --- a/services/phoebe-discord/.env +++ b/services/phoebe-discord/.env @@ -1 +1 @@ -DATABASE_URL="sqlite://PHOEBE_DB_ROOT/discord_media.db" +DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/discord_media.db" diff --git a/services/phoebe-discord/src/chat_conv.rs b/services/phoebe-discord/src/chat_conv.rs index 647afa8..39223f8 100644 --- a/services/phoebe-discord/src/chat_conv.rs +++ b/services/phoebe-discord/src/chat_conv.rs @@ -69,7 +69,7 @@ fn mid_to_discord(component: &ChatContentComponent) -> String { lang.as_ref() .map(|s| s.to_string()) .unwrap_or_else(|| "".to_string()), - source.to_string() + source ) } diff --git a/services/phoebe-discord/src/handler.rs b/services/phoebe-discord/src/handler.rs index 6f00054..946f981 100644 --- a/services/phoebe-discord/src/handler.rs +++ b/services/phoebe-discord/src/handler.rs @@ -44,20 +44,17 @@ impl EventHandler for DiscordHandler { } async fn message(&self, ctx: Context, message: Message) { - let origin = ChatReference { - service: "discord", - id: message.id.to_string(), - }; + let origin = ChatMessageReference::new(discord_reference(message.channel_id), message.id); let author = self.get_author(&ctx, &message).await; let content = discord_message_format::parse(&message.content); let content = super::chat_conv::convert(&content); - let replies_to = message.referenced_message.as_ref().map(|m| ChatReference { - service: "discord", - id: m.id.to_string(), - }); + let replies_to = message + .referenced_message + .as_ref() + .map(|m| ChatMessageReference::new(discord_reference(m.channel_id), m.id)); let chat_message = ChatMessage { origin, diff --git a/services/phoebe-discord/src/lib.rs b/services/phoebe-discord/src/lib.rs index f5d37fc..82086ad 100644 --- a/services/phoebe-discord/src/lib.rs +++ b/services/phoebe-discord/src/lib.rs @@ -1,9 +1,13 @@ -use phoebe::{mid_chat, prelude::*}; +use phoebe::{ + mid_chat::{self, ChatMessage, ChatMessageReference, ChatReference}, + prelude::*, +}; use serenity::{client::Context, Client}; use tracing::{debug, info}; mod chat_conv; mod handler; +mod sender; pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference { mid_chat::ChatReference { @@ -31,7 +35,6 @@ pub async fn setup(core_db: SqlitePool, tx: ChatEventSender) -> Result Result Vec { + assert_eq!(destination_channel.service, "discord"); + sender::send_discord_message(&mut self.discord_ctx, source, destination_channel) + .await + .ok() + .into_iter() + .collect() } - fn get_service_tag(&self) -> &'static str { + fn tag(&self) -> &'static str { "discord" } } diff --git a/services/phoebe-discord/src/sender.rs b/services/phoebe-discord/src/sender.rs new file mode 100644 index 0000000..aa6c30f --- /dev/null +++ b/services/phoebe-discord/src/sender.rs @@ -0,0 +1,41 @@ +use phoebe::{ + mid_chat::{ChatMessage, ChatMessageReference, ChatReference}, + prelude::Result, +}; + +use serenity::{model::prelude::*, prelude::*}; + +use crate::{chat_conv, discord_reference}; + +pub async fn send_discord_message( + context: &mut Context, + source: &ChatMessage, + destination_channel: ChatReference, +) -> Result { + // TODO: If we have a webhook we should use the webhook + + let channel_id = destination_channel.id.parse::()?; + let formatted_message = chat_conv::format(&source.content); + let content = format!( + "{} ({}): {}", + source.author.display_name, source.author.reference.service, formatted_message + ); + + let sent_message = channel_id + .send_message(&context, move |m| { + let m = m.content(content); + if let Some(reply) = &source.replying { + let channel_id: ChannelId = reply.channel.id.parse().unwrap(); + let message_id: MessageId = reply.message_id.parse::().unwrap().into(); + m.reference_message((channel_id, message_id)) + } else { + m + } + }) + .await?; + + Ok(ChatMessageReference::new( + discord_reference(sent_message.channel_id), + sent_message.id, + )) +}