Compare commits
	
		
			No commits in common. "48c3eb1830638b7cb6acd0e58d692b2bd822bc8f" and "e806786a1c9db3c2754312febcfe622de0d0fca0" have entirely different histories.
		
	
	
		
			48c3eb1830
			...
			e806786a1c
		
	
		
					 17 changed files with 42 additions and 346 deletions
				
			
		|  | @ -13,11 +13,11 @@ pub use content::*; | |||
| 
 | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct ChatMessage { | ||||
|     pub origin: ChatMessageReference, | ||||
|     pub origin: ChatReference, | ||||
|     pub author: ChatAuthor, | ||||
|     pub content: ChatMessageContent, | ||||
|     pub attachments: Vec<()>, | ||||
|     pub replying: Option<ChatMessageReference>, | ||||
|     pub replying: Option<ChatReference>, | ||||
| } | ||||
| 
 | ||||
| pub mod event; | ||||
|  |  | |||
|  | @ -3,18 +3,3 @@ pub struct ChatReference { | |||
|     pub service: &'static str, | ||||
|     pub id: String, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||||
| pub struct ChatMessageReference { | ||||
|     pub channel: ChatReference, | ||||
|     pub message_id: String, | ||||
| } | ||||
| 
 | ||||
| impl ChatMessageReference { | ||||
|     pub fn new(channel: ChatReference, message_id: impl ToString) -> Self { | ||||
|         Self { | ||||
|             channel, | ||||
|             message_id: message_id.to_string(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -2,49 +2,7 @@ use color_eyre::Result; | |||
| use tracing::info; | ||||
| use tracing_subscriber::EnvFilter; | ||||
| 
 | ||||
| use phoebe::{ | ||||
|     get_linked_channels, link_messages, | ||||
|     prelude::{ChatEvent, SqlitePool}, | ||||
|     service::Service, | ||||
|     DynServiceLookup, | ||||
| }; | ||||
| 
 | ||||
| async fn handle_events( | ||||
|     dyn_service: DynServiceLookup, | ||||
|     db: SqlitePool, | ||||
|     mut service: Box<dyn Service + Send + Sync>, | ||||
|     mut rx: tokio::sync::broadcast::Receiver<ChatEvent>, | ||||
| ) { | ||||
|     info!("Handling events for {}…", service.tag()); | ||||
| 
 | ||||
|     let mut conn = db | ||||
|         .acquire() | ||||
|         .await | ||||
|         .expect("Failed to acquire core DB connection"); | ||||
| 
 | ||||
|     while let Ok(event) = rx.recv().await { | ||||
|         match event { | ||||
|             phoebe::prelude::ChatEvent::NewMessage(message) => { | ||||
|                 let linked_channels = | ||||
|                     get_linked_channels(&mut conn, dyn_service, &message.origin.channel).await; | ||||
| 
 | ||||
|                 let mut resulting_messages = vec![]; | ||||
|                 for destination_channel in linked_channels { | ||||
|                     resulting_messages.extend( | ||||
|                         service | ||||
|                             .send_chat_message(&message, destination_channel) | ||||
|                             .await, | ||||
|                     ) | ||||
|                 } | ||||
| 
 | ||||
|                 if let Err(e) = link_messages(&mut conn, &message.origin, &resulting_messages).await | ||||
|                 { | ||||
|                     tracing::error!("Failed to link messages: {e}"); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| use phoebe::service::Service; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<()> { | ||||
|  | @ -58,22 +16,19 @@ async fn main() -> Result<()> { | |||
|     let (tx, _) = tokio::sync::broadcast::channel(512); | ||||
| 
 | ||||
|     let db = phoebe::open_core_db().await?; | ||||
| 
 | ||||
|     fn dyn_service(service: &str) -> &'static str { | ||||
|         match service { | ||||
|             "discord" => "discord", | ||||
|             "matrix" => "matrix", | ||||
|             _ => panic!("Unsupported service: {}", service), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let services: Vec<Box<dyn Service + Send + Sync>> = vec![Box::new( | ||||
|         phoebe_discord::setup(db.clone(), tx.clone(), dyn_service).await?, | ||||
|         phoebe_discord::setup(db.clone(), tx.clone()).await?, | ||||
|     )]; | ||||
| 
 | ||||
|     let handles = services | ||||
|         .into_iter() | ||||
|         .map(|srv| tokio::spawn(handle_events(dyn_service, db.clone(), srv, tx.subscribe()))); | ||||
|     let handles = services.into_iter().map(|mut srv| { | ||||
|         let mut rx = tx.subscribe(); | ||||
|         tokio::spawn(async move { | ||||
|             info!("Handling events for {}…", srv.get_service_tag()); | ||||
|             while let Ok(event) = rx.recv().await { | ||||
|                 srv.handle_chat_event(&event).await; | ||||
|             } | ||||
|         }) | ||||
|     }); | ||||
| 
 | ||||
|     let _ = futures::future::join_all(handles).await; | ||||
| 
 | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/main.db" | ||||
| DATABASE_URL="sqlite://PHOEBE_DB_ROOT/main.db" | ||||
|  |  | |||
|  | @ -11,4 +11,3 @@ sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "sqlite"] } | |||
| tracing = "0.1" | ||||
| async-trait = "0.1.53" | ||||
| eyre = "0.6.8" | ||||
| futures = "0.3.21" | ||||
|  |  | |||
|  | @ -1,3 +0,0 @@ | |||
| fn main() { | ||||
|     println!("cargo:rerun-if-changed=migrations"); | ||||
| } | ||||
|  | @ -1,10 +0,0 @@ | |||
| CREATE TABLE message_links ( | ||||
|   id INTEGER PRIMARY KEY AUTOINCREMENT | ||||
| ) STRICT; | ||||
| 
 | ||||
| CREATE TABLE messages ( | ||||
|   link_id INTEGER NOT NULL REFERENCES message_links(id),  | ||||
|   service TEXT NOT NULL, | ||||
|   channel TEXT NOT NULL, | ||||
|   message TEXT NOT NULL | ||||
| ) | ||||
|  | @ -1,8 +0,0 @@ | |||
| CREATE TABLE channel_links ( | ||||
|   from_service TEXT NOT NULL, | ||||
|   from_channel TEXT NOT NULL, | ||||
|   to_service TEXT NOT NULL, | ||||
|   to_channel TEXT NOT NULL, | ||||
| 
 | ||||
|   PRIMARY KEY (from_service, from_channel, to_service, to_channel) | ||||
| ) STRICT; | ||||
|  | @ -1,135 +1,14 @@ | |||
| use futures::{stream::BoxStream, Future}; | ||||
| pub use mid_chat; | ||||
| use mid_chat::{ChatMessageReference, ChatReference}; | ||||
| 
 | ||||
| use futures::StreamExt; | ||||
| use sqlx::{Row, SqliteConnection, SqlitePool}; | ||||
| use tokio::sync::broadcast::*; | ||||
| 
 | ||||
| pub mod db; | ||||
| pub mod prelude; | ||||
| pub mod service; | ||||
| 
 | ||||
| pub type ChatEventSender = Sender<mid_chat::event::ChatEvent>; | ||||
| pub type ChatEventReceiver = Receiver<mid_chat::event::ChatEvent>; | ||||
| pub type ChatEventSender = tokio::sync::broadcast::Sender<mid_chat::event::ChatEvent>; | ||||
| pub type ChatEventReceiver = tokio::sync::broadcast::Receiver<mid_chat::event::ChatEvent>; | ||||
| 
 | ||||
| pub type DynServiceLookup = fn(&str) -> &'static str; | ||||
| 
 | ||||
| pub async fn open_core_db() -> sqlx::Result<SqlitePool> { | ||||
| pub async fn open_core_db() -> sqlx::Result<sqlx::SqlitePool> { | ||||
|     let db = db::open("main").await?; | ||||
|     sqlx::migrate!().run(&db).await?; | ||||
|     Ok(db) | ||||
| } | ||||
| 
 | ||||
| pub async fn get_linked_channels( | ||||
|     conn: &mut SqliteConnection, | ||||
|     dyn_service: DynServiceLookup, | ||||
|     channel: &ChatReference, | ||||
| ) -> Vec<ChatReference> { | ||||
|     let from_service = channel.service; | ||||
|     let from_channel = &channel.id; | ||||
|     let query = sqlx::query!( | ||||
|         "SELECT * FROM channel_links WHERE from_service = ? AND from_channel = ?", | ||||
|         from_service, | ||||
|         from_channel | ||||
|     ); | ||||
| 
 | ||||
|     query | ||||
|         .fetch(&mut *conn) | ||||
|         .filter_map(|r| async { r.ok() }) | ||||
|         .map(|r| ChatReference { | ||||
|             service: dyn_service(&r.to_service), | ||||
|             id: r.to_channel, | ||||
|         }) | ||||
|         .collect() | ||||
|         .await | ||||
| } | ||||
| 
 | ||||
| pub async fn link_messages( | ||||
|     conn: &mut SqliteConnection, | ||||
|     origin: &ChatMessageReference, | ||||
|     messages: &[ChatMessageReference], | ||||
| ) -> sqlx::Result<()> { | ||||
|     let message_link = sqlx::query!("INSERT INTO message_links DEFAULT VALUES") | ||||
|         .execute(&mut *conn) | ||||
|         .await? | ||||
|         .last_insert_rowid(); | ||||
| 
 | ||||
|     for resultant in std::iter::once(origin).chain(messages) { | ||||
|         let service = &resultant.channel.service; | ||||
|         let channel = &resultant.channel.id; | ||||
|         let message = &resultant.message_id; | ||||
| 
 | ||||
|         let query = sqlx::query!( | ||||
|             "INSERT INTO messages VALUES (?, ?, ?, ?)", | ||||
|             message_link, | ||||
|             service, | ||||
|             channel, | ||||
|             message | ||||
|         ); | ||||
| 
 | ||||
|         let _ = query.execute(&mut *conn).await?; | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub async fn get_linked_messages<'a>( | ||||
|     conn: &'a mut SqliteConnection, | ||||
|     dyn_service: DynServiceLookup, | ||||
|     message: &ChatMessageReference, | ||||
| ) -> sqlx::Result<BoxStream<'a, ChatMessageReference>> { | ||||
|     let link_id = { | ||||
|         let service = &message.channel.service; | ||||
|         let channel = &message.channel.id; | ||||
|         let message_id = &message.message_id; | ||||
|         let query = sqlx::query!( | ||||
|             "SELECT link_id FROM messages WHERE service = ? AND channel = ? AND message = ?", | ||||
|             service, | ||||
|             channel, | ||||
|             message_id | ||||
|         ); | ||||
|         query.fetch_one(&mut *conn).await | ||||
|     }? | ||||
|     .link_id; | ||||
| 
 | ||||
|     let stream = sqlx::query("SELECT * FROM messages WHERE link_id = ?") | ||||
|         .bind(link_id) | ||||
|         .fetch(&mut *conn) | ||||
|         .filter_map(|r| futures::future::ready(r.ok())) | ||||
|         .map(move |r| { | ||||
|             ChatMessageReference::new( | ||||
|                 ChatReference { | ||||
|                     service: dyn_service(&r.get::<String, _>("service")), | ||||
|                     id: r.get("channel"), | ||||
|                 }, | ||||
|                 r.get::<String, _>("message"), | ||||
|             ) | ||||
|         }); | ||||
| 
 | ||||
|     Ok(Box::pin(stream)) | ||||
| } | ||||
| 
 | ||||
| pub async fn lookup_message<F, Fut>( | ||||
|     conn: &mut SqliteConnection, | ||||
|     dyn_service: DynServiceLookup, | ||||
|     linked_message: &ChatMessageReference, | ||||
|     filter: F, | ||||
| ) -> Option<ChatMessageReference> | ||||
| where | ||||
|     F: FnMut(&ChatMessageReference) -> Fut, | ||||
|     Fut: Future<Output = bool>, | ||||
| { | ||||
|     let references = get_linked_messages(&mut *conn, dyn_service, linked_message) | ||||
|         .await | ||||
|         .ok()? | ||||
|         .filter(filter) | ||||
|         .collect::<Vec<_>>() | ||||
|         .await; | ||||
| 
 | ||||
|     if let [reference] = references.as_slice() { | ||||
|         Some(reference.clone()) | ||||
|     } else { | ||||
|         None | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -1,7 +1,6 @@ | |||
| pub use crate::{service::Service, ChatEventReceiver, ChatEventSender}; | ||||
| 
 | ||||
| pub use async_trait::async_trait; | ||||
| pub use eyre::{self, Result}; | ||||
| pub use futures::{self, prelude::*}; | ||||
| pub use eyre::Result; | ||||
| pub use mid_chat::event::ChatEvent; | ||||
| pub use sqlx::{self, SqliteConnection, SqlitePool}; | ||||
| pub use sqlx::SqlitePool; | ||||
|  |  | |||
|  | @ -1,11 +1,7 @@ | |||
| use mid_chat::{ChatMessage, ChatMessageReference, ChatReference}; | ||||
| use mid_chat::event::ChatEvent; | ||||
| 
 | ||||
| #[async_trait::async_trait] | ||||
| pub trait Service { | ||||
|     fn tag(&self) -> &'static str; | ||||
|     async fn send_chat_message( | ||||
|         &mut self, | ||||
|         source: &ChatMessage, | ||||
|         destination_channel: ChatReference, | ||||
|     ) -> Vec<ChatMessageReference>; | ||||
|     fn get_service_tag(&self) -> &'static str; | ||||
|     async fn handle_chat_event(&mut self, event: &ChatEvent); | ||||
| } | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/discord_media.db" | ||||
| DATABASE_URL="sqlite://PHOEBE_DB_ROOT/discord_media.db" | ||||
|  |  | |||
|  | @ -69,7 +69,7 @@ fn mid_to_discord(component: &ChatContentComponent) -> String { | |||
|                 lang.as_ref() | ||||
|                     .map(|s| s.to_string()) | ||||
|                     .unwrap_or_else(|| "".to_string()), | ||||
|                 source | ||||
|                 source.to_string() | ||||
|             ) | ||||
|         } | ||||
| 
 | ||||
|  |  | |||
|  | @ -44,17 +44,20 @@ impl EventHandler for DiscordHandler { | |||
|     } | ||||
| 
 | ||||
|     async fn message(&self, ctx: Context, message: Message) { | ||||
|         let origin = ChatMessageReference::new(discord_reference(message.channel_id), message.id); | ||||
|         let origin = ChatReference { | ||||
|             service: "discord", | ||||
|             id: message.id.to_string(), | ||||
|         }; | ||||
| 
 | ||||
|         let author = self.get_author(&ctx, &message).await; | ||||
| 
 | ||||
|         let content = discord_message_format::parse(&message.content); | ||||
|         let content = super::chat_conv::convert(&content); | ||||
| 
 | ||||
|         let replies_to = message | ||||
|             .referenced_message | ||||
|             .as_ref() | ||||
|             .map(|m| ChatMessageReference::new(discord_reference(m.channel_id), m.id)); | ||||
|         let replies_to = message.referenced_message.as_ref().map(|m| ChatReference { | ||||
|             service: "discord", | ||||
|             id: m.id.to_string(), | ||||
|         }); | ||||
| 
 | ||||
|         let chat_message = ChatMessage { | ||||
|             origin, | ||||
|  |  | |||
|  | @ -1,15 +1,9 @@ | |||
| use phoebe::{ | ||||
|     mid_chat::{self, ChatMessage, ChatMessageReference, ChatReference}, | ||||
|     prelude::*, | ||||
|     DynServiceLookup, | ||||
| }; | ||||
| use phoebe::{mid_chat, prelude::*}; | ||||
| use serenity::{client::Context, Client}; | ||||
| use tracing::{debug, info}; | ||||
| 
 | ||||
| mod chat_conv; | ||||
| mod handler; | ||||
| mod lookup; | ||||
| mod sender; | ||||
| 
 | ||||
| pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference { | ||||
|     mid_chat::ChatReference { | ||||
|  | @ -19,17 +13,10 @@ pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference { | |||
| } | ||||
| 
 | ||||
| pub struct DiscordService { | ||||
|     pub core_db: SqlitePool, | ||||
|     pub discord_media_db: SqlitePool, | ||||
|     pub ctx: Context, | ||||
|     pub dyn_service: DynServiceLookup, | ||||
|     pub discord_ctx: Context, | ||||
| } | ||||
| 
 | ||||
| pub async fn setup( | ||||
|     core_db: SqlitePool, | ||||
|     tx: ChatEventSender, | ||||
|     dyn_service: DynServiceLookup, | ||||
| ) -> Result<DiscordService> { | ||||
| pub async fn setup(core_db: SqlitePool, tx: ChatEventSender) -> Result<DiscordService> { | ||||
|     info!("Setting up Discord service…"); | ||||
| 
 | ||||
|     let discord_media_db = phoebe::db::open("discord_media").await?; | ||||
|  | @ -38,12 +25,13 @@ pub async fn setup( | |||
|     let (ctx_tx, mut ctx_rx) = tokio::sync::mpsc::unbounded_channel::<Context>(); | ||||
| 
 | ||||
|     let discord_handler = handler::DiscordHandler { | ||||
|         core_db: core_db.clone(), | ||||
|         discord_media_db: discord_media_db.clone(), | ||||
|         core_db, | ||||
|         discord_media_db, | ||||
|         chat_event_tx: tx, | ||||
|         ctx_tx, | ||||
|     }; | ||||
| 
 | ||||
|     // TODO: Create a discord client
 | ||||
|     debug!("Logging in…"); | ||||
|     let discord_token = std::env::var("PHOEBE_DISCORD_TOKEN") | ||||
|         .expect("PHOEBE_DISCORD_TOKEN environment variable was not set!"); | ||||
|  | @ -62,30 +50,16 @@ pub async fn setup( | |||
|     let discord_ctx = ctx_rx.recv().await.expect("Couldn't get Discord context"); | ||||
|     debug!("Logged in!"); | ||||
| 
 | ||||
|     Ok(DiscordService { | ||||
|         core_db, | ||||
|         discord_media_db, | ||||
|         ctx: discord_ctx, | ||||
|         dyn_service, | ||||
|     }) | ||||
|     Ok(DiscordService { discord_ctx }) | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl Service for DiscordService { | ||||
|     async fn send_chat_message( | ||||
|         &mut self, | ||||
|         source: &ChatMessage, | ||||
|         destination_channel: ChatReference, | ||||
|     ) -> Vec<ChatMessageReference> { | ||||
|         assert_eq!(destination_channel.service, "discord"); | ||||
|         sender::send_discord_message(self, source, destination_channel) | ||||
|             .await | ||||
|             .ok() | ||||
|             .into_iter() | ||||
|             .collect() | ||||
|     async fn handle_chat_event(&mut self, event: &ChatEvent) { | ||||
|         dbg!(event); | ||||
|     } | ||||
| 
 | ||||
|     fn tag(&self) -> &'static str { | ||||
|     fn get_service_tag(&self) -> &'static str { | ||||
|         "discord" | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -1,18 +0,0 @@ | |||
| use phoebe::{lookup_message, mid_chat::ChatMessageReference, prelude::*}; | ||||
| 
 | ||||
| use crate::DiscordService; | ||||
| 
 | ||||
| impl DiscordService { | ||||
|     pub async fn lookup_message<F, Fut>( | ||||
|         &self, | ||||
|         linked_message: &ChatMessageReference, | ||||
|         filter: F, | ||||
|     ) -> Option<ChatMessageReference> | ||||
|     where | ||||
|         F: FnMut(&ChatMessageReference) -> Fut, | ||||
|         Fut: Future<Output = bool>, | ||||
|     { | ||||
|         let mut conn = self.core_db.acquire().await.ok()?; | ||||
|         lookup_message(&mut conn, self.dyn_service, linked_message, filter).await | ||||
|     } | ||||
| } | ||||
|  | @ -1,55 +0,0 @@ | |||
| use phoebe::{ | ||||
|     mid_chat::{ChatMessage, ChatMessageReference, ChatReference}, | ||||
|     prelude::{future, Result}, | ||||
| }; | ||||
| 
 | ||||
| use serenity::{model::prelude::*, prelude::*}; | ||||
| 
 | ||||
| use crate::{chat_conv, discord_reference, DiscordService}; | ||||
| 
 | ||||
| pub async fn send_discord_message( | ||||
|     discord: &mut DiscordService, | ||||
|     source: &ChatMessage, | ||||
|     destination_channel: ChatReference, | ||||
| ) -> Result<ChatMessageReference> { | ||||
|     // TODO: If we have a webhook we should use the webhook
 | ||||
| 
 | ||||
|     let channel_id = destination_channel.id.parse::<ChannelId>()?; | ||||
|     let formatted_message = chat_conv::format(&source.content); | ||||
|     let content = format!( | ||||
|         "{} ({}): {}", | ||||
|         source.author.display_name, source.author.reference.service, formatted_message | ||||
|     ); | ||||
| 
 | ||||
|     let discord_reply = if let Some(reply) = &source.replying { | ||||
|         if let Some(reply_ref) = discord | ||||
|             .lookup_message(reply, |r| future::ready(r.channel == destination_channel)) | ||||
|             .await | ||||
|         { | ||||
|             assert_eq!(reply_ref.channel.service, "discord"); | ||||
|             let channel_id: ChannelId = reply_ref.channel.id.parse().unwrap(); | ||||
|             let message_id: MessageId = reply_ref.message_id.parse::<u64>().unwrap().into(); | ||||
|             Some((channel_id, message_id)) | ||||
|         } else { | ||||
|             None | ||||
|         } | ||||
|     } else { | ||||
|         None | ||||
|     }; | ||||
| 
 | ||||
|     let sent_message = channel_id | ||||
|         .send_message(&discord.ctx, move |m| { | ||||
|             let m = m.content(content); | ||||
|             if let Some(reply) = discord_reply { | ||||
|                 m.reference_message(reply) | ||||
|             } else { | ||||
|                 m | ||||
|             } | ||||
|         }) | ||||
|         .await?; | ||||
| 
 | ||||
|     Ok(ChatMessageReference::new( | ||||
|         discord_reference(sent_message.channel_id), | ||||
|         sent_message.id, | ||||
|     )) | ||||
| } | ||||
		Loading…
	
		Reference in a new issue