use futures::{stream::BoxStream, Future}; pub use mid_chat; use mid_chat::{ChatMessageReference, ChatReference}; use futures::StreamExt; use sqlx::{Row, SqliteConnection, SqlitePool}; use tokio::sync::broadcast::*; pub mod db; pub mod prelude; pub mod service; pub type ChatEventSender = Sender; pub type ChatEventReceiver = Receiver; pub type DynServiceLookup = fn(&str) -> &'static str; pub async fn open_core_db() -> sqlx::Result { let db = db::open("main").await?; sqlx::migrate!().run(&db).await?; Ok(db) } pub async fn get_linked_channels( conn: &mut SqliteConnection, dyn_service: DynServiceLookup, channel: &ChatReference, ) -> Vec { let from_service = channel.service; let from_channel = &channel.id; let query = sqlx::query!( "SELECT * FROM channel_links WHERE from_service = ? AND from_channel = ?", from_service, from_channel ); query .fetch(&mut *conn) .filter_map(|r| async { r.ok() }) .map(|r| ChatReference { service: dyn_service(&r.to_service), id: r.to_channel, }) .collect() .await } pub async fn link_messages( conn: &mut SqliteConnection, origin: &ChatMessageReference, messages: &[ChatMessageReference], ) -> sqlx::Result<()> { let message_link = sqlx::query!("INSERT INTO message_links DEFAULT VALUES") .execute(&mut *conn) .await? .last_insert_rowid(); for resultant in std::iter::once(origin).chain(messages) { let service = &resultant.channel.service; let channel = &resultant.channel.id; let message = &resultant.message_id; let query = sqlx::query!( "INSERT INTO messages VALUES (?, ?, ?, ?)", message_link, service, channel, message ); let _ = query.execute(&mut *conn).await?; } Ok(()) } pub async fn get_message_link_id( conn: &mut SqliteConnection, message: &ChatMessageReference, ) -> sqlx::Result { let service = &message.channel.service; let channel = &message.channel.id; let message_id = &message.message_id; let query = sqlx::query!( "SELECT link_id FROM messages WHERE service = ? AND channel = ? AND message = ?", service, channel, message_id ); let r = query.fetch_one(&mut *conn).await?; Ok(r.link_id) } pub async fn get_linked_messages<'a>( conn: &'a mut SqliteConnection, dyn_service: DynServiceLookup, message: &ChatMessageReference, ) -> sqlx::Result> { let link_id = get_message_link_id(&mut *conn, message).await?; let stream = sqlx::query("SELECT * FROM messages WHERE link_id = ?") .bind(link_id) .fetch(&mut *conn) .filter_map(|r| futures::future::ready(r.ok())) .map(move |r| { ChatMessageReference::new( ChatReference { service: dyn_service(&r.get::("service")), id: r.get("channel"), }, r.get::("message"), ) }); Ok(Box::pin(stream)) } pub async fn lookup_message( conn: &mut SqliteConnection, dyn_service: DynServiceLookup, linked_message: &ChatMessageReference, filter: F, ) -> Option where F: FnMut(&ChatMessageReference) -> Fut, Fut: Future, { let references = get_linked_messages(&mut *conn, dyn_service, linked_message) .await .ok()? .filter(filter) .collect::>() .await; if let [reference] = references.as_slice() { Some(reference.clone()) } else { None } }