Route messages from a source channel to a destination
Hyperfixation commit lol
This commit is contained in:
		
							parent
							
								
									e806786a1c
								
							
						
					
					
						commit
						b9daf95a04
					
				
					 16 changed files with 266 additions and 35 deletions
				
			
		|  | @ -13,11 +13,11 @@ pub use content::*; | |||
| 
 | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct ChatMessage { | ||||
|     pub origin: ChatReference, | ||||
|     pub origin: ChatMessageReference, | ||||
|     pub author: ChatAuthor, | ||||
|     pub content: ChatMessageContent, | ||||
|     pub attachments: Vec<()>, | ||||
|     pub replying: Option<ChatReference>, | ||||
|     pub replying: Option<ChatMessageReference>, | ||||
| } | ||||
| 
 | ||||
| pub mod event; | ||||
|  |  | |||
|  | @ -3,3 +3,18 @@ pub struct ChatReference { | |||
|     pub service: &'static str, | ||||
|     pub id: String, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||||
| pub struct ChatMessageReference { | ||||
|     pub channel: ChatReference, | ||||
|     pub message_id: String, | ||||
| } | ||||
| 
 | ||||
| impl ChatMessageReference { | ||||
|     pub fn new(channel: ChatReference, message_id: impl ToString) -> Self { | ||||
|         Self { | ||||
|             channel, | ||||
|             message_id: message_id.to_string(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -2,7 +2,55 @@ use color_eyre::Result; | |||
| use tracing::info; | ||||
| use tracing_subscriber::EnvFilter; | ||||
| 
 | ||||
| use phoebe::service::Service; | ||||
| use phoebe::{ | ||||
|     get_linked_channels, link_messages, | ||||
|     prelude::{ChatEvent, SqlitePool}, | ||||
|     service::Service, | ||||
| }; | ||||
| 
 | ||||
| fn dyn_service(service: &str) -> &'static str { | ||||
|     match service { | ||||
|         "discord" => "discord", | ||||
|         "matrix" => "matrix", | ||||
|         _ => panic!("Unsupported service: {}", service), | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| async fn handle_events( | ||||
|     db: SqlitePool, | ||||
|     mut service: Box<dyn Service + Send + Sync>, | ||||
|     mut rx: tokio::sync::broadcast::Receiver<ChatEvent>, | ||||
| ) { | ||||
|     info!("Handling events for {}…", service.tag()); | ||||
| 
 | ||||
|     let mut conn = db | ||||
|         .acquire() | ||||
|         .await | ||||
|         .expect("Failed to acquire core DB connection"); | ||||
| 
 | ||||
|     while let Ok(event) = rx.recv().await { | ||||
|         match event { | ||||
|             phoebe::prelude::ChatEvent::NewMessage(message) => { | ||||
|                 let linked_channels = | ||||
|                     get_linked_channels(&mut conn, &message.origin.channel, dyn_service).await; | ||||
| 
 | ||||
|                 let mut resulting_messages = vec![]; | ||||
|                 for destination_channel in linked_channels { | ||||
|                     resulting_messages.extend( | ||||
|                         service | ||||
|                             .send_chat_message(&message, destination_channel) | ||||
|                             .await, | ||||
|                     ) | ||||
|                 } | ||||
| 
 | ||||
|                 if let Err(e) = link_messages(&mut conn, &message.origin, &resulting_messages).await | ||||
|                 { | ||||
|                     tracing::error!("Failed to link messages: {e}"); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<()> { | ||||
|  | @ -20,15 +68,9 @@ async fn main() -> Result<()> { | |||
|         phoebe_discord::setup(db.clone(), tx.clone()).await?, | ||||
|     )]; | ||||
| 
 | ||||
|     let handles = services.into_iter().map(|mut srv| { | ||||
|         let mut rx = tx.subscribe(); | ||||
|         tokio::spawn(async move { | ||||
|             info!("Handling events for {}…", srv.get_service_tag()); | ||||
|             while let Ok(event) = rx.recv().await { | ||||
|                 srv.handle_chat_event(&event).await; | ||||
|             } | ||||
|         }) | ||||
|     }); | ||||
|     let handles = services | ||||
|         .into_iter() | ||||
|         .map(|srv| tokio::spawn(handle_events(db.clone(), srv, tx.subscribe()))); | ||||
| 
 | ||||
|     let _ = futures::future::join_all(handles).await; | ||||
| 
 | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| DATABASE_URL="sqlite://PHOEBE_DB_ROOT/main.db" | ||||
| DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/main.db" | ||||
|  |  | |||
|  | @ -11,3 +11,5 @@ sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "sqlite"] } | |||
| tracing = "0.1" | ||||
| async-trait = "0.1.53" | ||||
| eyre = "0.6.8" | ||||
| tokio-stream = "0.1.8" | ||||
| futures = "0.3.21" | ||||
|  |  | |||
							
								
								
									
										3
									
								
								phoebe/build.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								phoebe/build.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,3 @@ | |||
| fn main() { | ||||
|     println!("cargo:rerun-if-changed=migrations"); | ||||
| } | ||||
|  | @ -0,0 +1,10 @@ | |||
| CREATE TABLE message_links ( | ||||
|   id INTEGER PRIMARY KEY AUTOINCREMENT | ||||
| ) STRICT; | ||||
| 
 | ||||
| CREATE TABLE messages ( | ||||
|   link_id INTEGER NOT NULL REFERENCES message_links(id),  | ||||
|   service TEXT NOT NULL, | ||||
|   channel TEXT NOT NULL, | ||||
|   message TEXT NOT NULL | ||||
| ) | ||||
|  | @ -0,0 +1,8 @@ | |||
| CREATE TABLE channel_links ( | ||||
|   from_service TEXT NOT NULL, | ||||
|   from_channel TEXT NOT NULL, | ||||
|   to_service TEXT NOT NULL, | ||||
|   to_channel TEXT NOT NULL, | ||||
| 
 | ||||
|   PRIMARY KEY (from_service, from_channel, to_service, to_channel) | ||||
| ) STRICT; | ||||
|  | @ -1,14 +1,111 @@ | |||
| use futures::stream::BoxStream; | ||||
| pub use mid_chat; | ||||
| use mid_chat::{ChatMessageReference, ChatReference}; | ||||
| 
 | ||||
| use sqlx::{Row, SqliteConnection, SqlitePool}; | ||||
| use tokio::sync::broadcast::*; | ||||
| use tokio_stream::StreamExt; | ||||
| 
 | ||||
| pub mod db; | ||||
| pub mod prelude; | ||||
| pub mod service; | ||||
| 
 | ||||
| pub type ChatEventSender = tokio::sync::broadcast::Sender<mid_chat::event::ChatEvent>; | ||||
| pub type ChatEventReceiver = tokio::sync::broadcast::Receiver<mid_chat::event::ChatEvent>; | ||||
| pub type ChatEventSender = Sender<mid_chat::event::ChatEvent>; | ||||
| pub type ChatEventReceiver = Receiver<mid_chat::event::ChatEvent>; | ||||
| 
 | ||||
| pub async fn open_core_db() -> sqlx::Result<sqlx::SqlitePool> { | ||||
| pub type DynServiceLookup = fn(&str) -> &'static str; | ||||
| 
 | ||||
| pub async fn open_core_db() -> sqlx::Result<SqlitePool> { | ||||
|     let db = db::open("main").await?; | ||||
|     sqlx::migrate!().run(&db).await?; | ||||
|     Ok(db) | ||||
| } | ||||
| 
 | ||||
| pub async fn get_linked_channels( | ||||
|     conn: &mut SqliteConnection, | ||||
|     channel: &ChatReference, | ||||
|     dyn_service: DynServiceLookup, | ||||
| ) -> Vec<ChatReference> { | ||||
|     let from_service = channel.service; | ||||
|     let from_channel = &channel.id; | ||||
|     let query = sqlx::query!( | ||||
|         "SELECT * FROM channel_links WHERE from_service = ? AND from_channel = ?", | ||||
|         from_service, | ||||
|         from_channel | ||||
|     ); | ||||
| 
 | ||||
|     query | ||||
|         .fetch(&mut *conn) | ||||
|         .filter_map(Result::ok) | ||||
|         .map(|r| ChatReference { | ||||
|             service: dyn_service(&r.to_service), | ||||
|             id: r.to_channel, | ||||
|         }) | ||||
|         .collect() | ||||
|         .await | ||||
| } | ||||
| 
 | ||||
| pub async fn link_messages( | ||||
|     conn: &mut SqliteConnection, | ||||
|     origin: &ChatMessageReference, | ||||
|     messages: &[ChatMessageReference], | ||||
| ) -> sqlx::Result<()> { | ||||
|     let message_link = sqlx::query!("INSERT INTO message_links DEFAULT VALUES") | ||||
|         .execute(&mut *conn) | ||||
|         .await? | ||||
|         .last_insert_rowid(); | ||||
| 
 | ||||
|     for resultant in std::iter::once(origin).chain(messages) { | ||||
|         let service = &resultant.channel.service; | ||||
|         let channel = &resultant.channel.id; | ||||
|         let message = &resultant.message_id; | ||||
| 
 | ||||
|         let query = sqlx::query!( | ||||
|             "INSERT INTO messages VALUES (?, ?, ?, ?)", | ||||
|             message_link, | ||||
|             service, | ||||
|             channel, | ||||
|             message | ||||
|         ); | ||||
| 
 | ||||
|         let _ = query.execute(&mut *conn).await?; | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub async fn get_linked_messages<'a>( | ||||
|     conn: &'a mut SqliteConnection, | ||||
|     message: &ChatMessageReference, | ||||
|     dyn_service: DynServiceLookup, | ||||
| ) -> sqlx::Result<BoxStream<'a, ChatMessageReference>> { | ||||
|     let link_id = { | ||||
|         let service = &message.channel.service; | ||||
|         let channel = &message.channel.id; | ||||
|         let message_id = &message.message_id; | ||||
|         let query = sqlx::query!( | ||||
|             "SELECT link_id FROM messages WHERE service = ? AND channel = ? AND message = ?", | ||||
|             service, | ||||
|             channel, | ||||
|             message_id | ||||
|         ); | ||||
|         query.fetch_one(&mut *conn).await | ||||
|     }? | ||||
|     .link_id; | ||||
| 
 | ||||
|     let stream = sqlx::query("SELECT * FROM messages WHERE link_id = ?") | ||||
|         .bind(link_id) | ||||
|         .fetch(&mut *conn) | ||||
|         .filter_map(Result::ok) | ||||
|         .map(move |r| { | ||||
|             ChatMessageReference::new( | ||||
|                 ChatReference { | ||||
|                     service: dyn_service(&r.get::<String, _>("service")), | ||||
|                     id: r.get("channel"), | ||||
|                 }, | ||||
|                 r.get::<String, _>("message"), | ||||
|             ) | ||||
|         }); | ||||
| 
 | ||||
|     Ok(Box::pin(stream)) | ||||
| } | ||||
|  |  | |||
|  | @ -3,4 +3,4 @@ pub use crate::{service::Service, ChatEventReceiver, ChatEventSender}; | |||
| pub use async_trait::async_trait; | ||||
| pub use eyre::Result; | ||||
| pub use mid_chat::event::ChatEvent; | ||||
| pub use sqlx::SqlitePool; | ||||
| pub use sqlx::{SqliteConnection, SqlitePool}; | ||||
|  |  | |||
|  | @ -1,7 +1,11 @@ | |||
| use mid_chat::event::ChatEvent; | ||||
| use mid_chat::{ChatMessage, ChatMessageReference, ChatReference}; | ||||
| 
 | ||||
| #[async_trait::async_trait] | ||||
| pub trait Service { | ||||
|     fn get_service_tag(&self) -> &'static str; | ||||
|     async fn handle_chat_event(&mut self, event: &ChatEvent); | ||||
|     fn tag(&self) -> &'static str; | ||||
|     async fn send_chat_message( | ||||
|         &mut self, | ||||
|         source: &ChatMessage, | ||||
|         destination_channel: ChatReference, | ||||
|     ) -> Vec<ChatMessageReference>; | ||||
| } | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| DATABASE_URL="sqlite://PHOEBE_DB_ROOT/discord_media.db" | ||||
| DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/discord_media.db" | ||||
|  |  | |||
|  | @ -69,7 +69,7 @@ fn mid_to_discord(component: &ChatContentComponent) -> String { | |||
|                 lang.as_ref() | ||||
|                     .map(|s| s.to_string()) | ||||
|                     .unwrap_or_else(|| "".to_string()), | ||||
|                 source.to_string() | ||||
|                 source | ||||
|             ) | ||||
|         } | ||||
| 
 | ||||
|  |  | |||
|  | @ -44,20 +44,17 @@ impl EventHandler for DiscordHandler { | |||
|     } | ||||
| 
 | ||||
|     async fn message(&self, ctx: Context, message: Message) { | ||||
|         let origin = ChatReference { | ||||
|             service: "discord", | ||||
|             id: message.id.to_string(), | ||||
|         }; | ||||
|         let origin = ChatMessageReference::new(discord_reference(message.channel_id), message.id); | ||||
| 
 | ||||
|         let author = self.get_author(&ctx, &message).await; | ||||
| 
 | ||||
|         let content = discord_message_format::parse(&message.content); | ||||
|         let content = super::chat_conv::convert(&content); | ||||
| 
 | ||||
|         let replies_to = message.referenced_message.as_ref().map(|m| ChatReference { | ||||
|             service: "discord", | ||||
|             id: m.id.to_string(), | ||||
|         }); | ||||
|         let replies_to = message | ||||
|             .referenced_message | ||||
|             .as_ref() | ||||
|             .map(|m| ChatMessageReference::new(discord_reference(m.channel_id), m.id)); | ||||
| 
 | ||||
|         let chat_message = ChatMessage { | ||||
|             origin, | ||||
|  |  | |||
|  | @ -1,9 +1,13 @@ | |||
| use phoebe::{mid_chat, prelude::*}; | ||||
| use phoebe::{ | ||||
|     mid_chat::{self, ChatMessage, ChatMessageReference, ChatReference}, | ||||
|     prelude::*, | ||||
| }; | ||||
| use serenity::{client::Context, Client}; | ||||
| use tracing::{debug, info}; | ||||
| 
 | ||||
| mod chat_conv; | ||||
| mod handler; | ||||
| mod sender; | ||||
| 
 | ||||
| pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference { | ||||
|     mid_chat::ChatReference { | ||||
|  | @ -31,7 +35,6 @@ pub async fn setup(core_db: SqlitePool, tx: ChatEventSender) -> Result<DiscordSe | |||
|         ctx_tx, | ||||
|     }; | ||||
| 
 | ||||
|     // TODO: Create a discord client
 | ||||
|     debug!("Logging in…"); | ||||
|     let discord_token = std::env::var("PHOEBE_DISCORD_TOKEN") | ||||
|         .expect("PHOEBE_DISCORD_TOKEN environment variable was not set!"); | ||||
|  | @ -55,11 +58,20 @@ pub async fn setup(core_db: SqlitePool, tx: ChatEventSender) -> Result<DiscordSe | |||
| 
 | ||||
| #[async_trait] | ||||
| impl Service for DiscordService { | ||||
|     async fn handle_chat_event(&mut self, event: &ChatEvent) { | ||||
|         dbg!(event); | ||||
|     async fn send_chat_message( | ||||
|         &mut self, | ||||
|         source: &ChatMessage, | ||||
|         destination_channel: ChatReference, | ||||
|     ) -> Vec<ChatMessageReference> { | ||||
|         assert_eq!(destination_channel.service, "discord"); | ||||
|         sender::send_discord_message(&mut self.discord_ctx, source, destination_channel) | ||||
|             .await | ||||
|             .ok() | ||||
|             .into_iter() | ||||
|             .collect() | ||||
|     } | ||||
| 
 | ||||
|     fn get_service_tag(&self) -> &'static str { | ||||
|     fn tag(&self) -> &'static str { | ||||
|         "discord" | ||||
|     } | ||||
| } | ||||
|  |  | |||
							
								
								
									
										41
									
								
								services/phoebe-discord/src/sender.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										41
									
								
								services/phoebe-discord/src/sender.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,41 @@ | |||
| use phoebe::{ | ||||
|     mid_chat::{ChatMessage, ChatMessageReference, ChatReference}, | ||||
|     prelude::Result, | ||||
| }; | ||||
| 
 | ||||
| use serenity::{model::prelude::*, prelude::*}; | ||||
| 
 | ||||
| use crate::{chat_conv, discord_reference}; | ||||
| 
 | ||||
| pub async fn send_discord_message( | ||||
|     context: &mut Context, | ||||
|     source: &ChatMessage, | ||||
|     destination_channel: ChatReference, | ||||
| ) -> Result<ChatMessageReference> { | ||||
|     // TODO: If we have a webhook we should use the webhook
 | ||||
| 
 | ||||
|     let channel_id = destination_channel.id.parse::<ChannelId>()?; | ||||
|     let formatted_message = chat_conv::format(&source.content); | ||||
|     let content = format!( | ||||
|         "{} ({}): {}", | ||||
|         source.author.display_name, source.author.reference.service, formatted_message | ||||
|     ); | ||||
| 
 | ||||
|     let sent_message = channel_id | ||||
|         .send_message(&context, move |m| { | ||||
|             let m = m.content(content); | ||||
|             if let Some(reply) = &source.replying { | ||||
|                 let channel_id: ChannelId = reply.channel.id.parse().unwrap(); | ||||
|                 let message_id: MessageId = reply.message_id.parse::<u64>().unwrap().into(); | ||||
|                 m.reference_message((channel_id, message_id)) | ||||
|             } else { | ||||
|                 m | ||||
|             } | ||||
|         }) | ||||
|         .await?; | ||||
| 
 | ||||
|     Ok(ChatMessageReference::new( | ||||
|         discord_reference(sent_message.channel_id), | ||||
|         sent_message.id, | ||||
|     )) | ||||
| } | ||||
		Loading…
	
		Reference in a new issue