use color_eyre::Result; use futures::{future, StreamExt}; use tracing::info; use tracing_subscriber::EnvFilter; use phoebe::{ get_linked_channels, get_linked_messages, link_messages, prelude::{ChatEvent, SqlitePool}, service::Service, unlink_message, DynServiceLookup, }; async fn handle_events( dyn_service: DynServiceLookup, db: SqlitePool, mut service: Box, mut rx: tokio::sync::broadcast::Receiver, ) { info!("Handling events for {}…", service.tag()); let mut conn = db .acquire() .await .expect("Failed to acquire core DB connection"); while let Ok(event) = rx.recv().await { match event { ChatEvent::NewMessage(message) => { let linked_channels = get_linked_channels(&mut conn, dyn_service, &message.origin.channel).await; let mut resulting_messages = vec![]; for destination_channel in linked_channels { resulting_messages.extend( service .send_chat_message(&message, destination_channel) .await, ) } if !resulting_messages.is_empty() { if let Err(e) = link_messages(&mut conn, &message.origin, &resulting_messages).await { tracing::error!("Failed to link messages: {e}"); } } } ChatEvent::DeleteMessage(origin) => { let messages = if let Ok(message_stream) = get_linked_messages(&mut conn, dyn_service, &origin).await { message_stream .filter(|r| future::ready(r.channel.service == service.tag())) .collect::>() .await } else { vec![] }; if !messages.is_empty() { if let Err(e) = unlink_message(&mut conn, &origin).await { tracing::error!("Failed to unlink origin message: {e}"); } for message in messages { if service.delete_message(&message).await { if let Err(e) = unlink_message(&mut conn, &message).await { tracing::error!("Failed to unlink related message: {e}"); } } } } } ChatEvent::EditMessage(prev_origin, message_edit) => { let messages = if let Ok(message_stream) = get_linked_messages(&mut conn, dyn_service, &prev_origin).await { message_stream .filter(|r| future::ready(r.channel.service == service.tag())) .collect::>() .await } else { vec![] }; let mut resulting_messages = vec![]; for message in messages { resulting_messages.extend(service.edit_message(&message, &message_edit).await) } if !resulting_messages.is_empty() { if let Err(e) = link_messages(&mut conn, &prev_origin, &resulting_messages).await { tracing::error!("Failed to link messages: {e}"); } } } } } } #[tokio::main] async fn main() -> Result<()> { color_eyre::install()?; tracing_subscriber::fmt() .with_target(true) .with_env_filter(EnvFilter::from_default_env()) .init(); let (tx, _) = tokio::sync::broadcast::channel(512); let db = phoebe::open_core_db().await?; fn dyn_service(service: &str) -> &'static str { match service { "discord" => "discord", "matrix" => "matrix", _ => panic!("Unsupported service: {}", service), } } let services: Vec> = vec![Box::new( phoebe_discord::setup(db.clone(), tx.clone(), dyn_service).await?, )]; let handles = services .into_iter() .map(|srv| tokio::spawn(handle_events(dyn_service, db.clone(), srv, tx.subscribe()))); let _ = future::join_all(handles).await; Ok(()) }