2022-04-16 09:16:51 +00:00
|
|
|
use futures::{stream::BoxStream, Future};
|
2022-04-11 11:52:29 +00:00
|
|
|
pub use mid_chat;
|
2022-04-16 08:06:48 +00:00
|
|
|
use mid_chat::{ChatMessageReference, ChatReference};
|
|
|
|
|
2022-04-16 09:16:51 +00:00
|
|
|
use futures::StreamExt;
|
2022-04-16 08:06:48 +00:00
|
|
|
use sqlx::{Row, SqliteConnection, SqlitePool};
|
|
|
|
use tokio::sync::broadcast::*;
|
2022-04-08 16:52:59 +00:00
|
|
|
|
2022-04-17 10:53:52 +00:00
|
|
|
pub mod attachments;
|
2022-04-08 16:52:59 +00:00
|
|
|
pub mod db;
|
|
|
|
pub mod prelude;
|
|
|
|
pub mod service;
|
|
|
|
|
2022-04-16 08:06:48 +00:00
|
|
|
pub type ChatEventSender = Sender<mid_chat::event::ChatEvent>;
|
|
|
|
pub type ChatEventReceiver = Receiver<mid_chat::event::ChatEvent>;
|
|
|
|
|
|
|
|
pub type DynServiceLookup = fn(&str) -> &'static str;
|
2022-04-08 16:52:59 +00:00
|
|
|
|
2022-04-16 08:06:48 +00:00
|
|
|
pub async fn open_core_db() -> sqlx::Result<SqlitePool> {
|
2022-04-08 16:52:59 +00:00
|
|
|
let db = db::open("main").await?;
|
|
|
|
sqlx::migrate!().run(&db).await?;
|
|
|
|
Ok(db)
|
|
|
|
}
|
2022-04-16 08:06:48 +00:00
|
|
|
|
|
|
|
pub async fn get_linked_channels(
|
|
|
|
conn: &mut SqliteConnection,
|
|
|
|
dyn_service: DynServiceLookup,
|
2022-04-16 09:16:51 +00:00
|
|
|
channel: &ChatReference,
|
2022-04-16 08:06:48 +00:00
|
|
|
) -> Vec<ChatReference> {
|
|
|
|
let from_service = channel.service;
|
|
|
|
let from_channel = &channel.id;
|
|
|
|
let query = sqlx::query!(
|
|
|
|
"SELECT * FROM channel_links WHERE from_service = ? AND from_channel = ?",
|
|
|
|
from_service,
|
|
|
|
from_channel
|
|
|
|
);
|
|
|
|
|
|
|
|
query
|
|
|
|
.fetch(&mut *conn)
|
2022-04-16 09:16:51 +00:00
|
|
|
.filter_map(|r| async { r.ok() })
|
2022-04-16 08:06:48 +00:00
|
|
|
.map(|r| ChatReference {
|
|
|
|
service: dyn_service(&r.to_service),
|
|
|
|
id: r.to_channel,
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn link_messages(
|
|
|
|
conn: &mut SqliteConnection,
|
|
|
|
origin: &ChatMessageReference,
|
|
|
|
messages: &[ChatMessageReference],
|
|
|
|
) -> sqlx::Result<()> {
|
|
|
|
let message_link = sqlx::query!("INSERT INTO message_links DEFAULT VALUES")
|
|
|
|
.execute(&mut *conn)
|
|
|
|
.await?
|
|
|
|
.last_insert_rowid();
|
|
|
|
|
2022-04-16 10:31:06 +00:00
|
|
|
let original_id = {
|
|
|
|
let service = &origin.channel.service;
|
|
|
|
let channel = &origin.channel.id;
|
|
|
|
let message = &origin.message_id;
|
|
|
|
|
|
|
|
let query = sqlx::query!(
|
|
|
|
"INSERT INTO messages (link_id, service, channel, message, original) VALUES (?, ?, ?, ?, NULL)",
|
|
|
|
message_link,
|
|
|
|
service,
|
|
|
|
channel,
|
|
|
|
message
|
|
|
|
);
|
|
|
|
|
|
|
|
query.execute(&mut *conn).await?.last_insert_rowid()
|
|
|
|
};
|
|
|
|
|
|
|
|
for resultant in messages {
|
2022-04-16 08:06:48 +00:00
|
|
|
let service = &resultant.channel.service;
|
|
|
|
let channel = &resultant.channel.id;
|
|
|
|
let message = &resultant.message_id;
|
|
|
|
|
|
|
|
let query = sqlx::query!(
|
2022-04-16 10:31:06 +00:00
|
|
|
"INSERT INTO messages (link_id, service, channel, message, original) VALUES (?, ?, ?, ?, ?)",
|
2022-04-16 08:06:48 +00:00
|
|
|
message_link,
|
|
|
|
service,
|
|
|
|
channel,
|
2022-04-16 10:31:06 +00:00
|
|
|
message,
|
|
|
|
original_id
|
2022-04-16 08:06:48 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
let _ = query.execute(&mut *conn).await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-04-17 12:03:21 +00:00
|
|
|
pub async fn unlink_message(
|
|
|
|
conn: &mut SqliteConnection,
|
|
|
|
message: &ChatMessageReference,
|
|
|
|
) -> sqlx::Result<()> {
|
|
|
|
let service = &message.channel.service;
|
|
|
|
let channel = &message.channel.id;
|
|
|
|
let message_id = &message.message_id;
|
|
|
|
let query = sqlx::query!(
|
|
|
|
"DELETE FROM messages WHERE service = ? AND channel = ? AND message = ?",
|
|
|
|
service,
|
|
|
|
channel,
|
|
|
|
message_id
|
|
|
|
);
|
|
|
|
|
|
|
|
if query.execute(&mut *conn).await?.rows_affected() == 0 {
|
|
|
|
return Err(sqlx::Error::RowNotFound);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-04-16 10:04:38 +00:00
|
|
|
pub async fn get_message_link_id(
|
|
|
|
conn: &mut SqliteConnection,
|
|
|
|
message: &ChatMessageReference,
|
|
|
|
) -> sqlx::Result<i64> {
|
|
|
|
let service = &message.channel.service;
|
|
|
|
let channel = &message.channel.id;
|
|
|
|
let message_id = &message.message_id;
|
|
|
|
let query = sqlx::query!(
|
|
|
|
"SELECT link_id FROM messages WHERE service = ? AND channel = ? AND message = ?",
|
|
|
|
service,
|
|
|
|
channel,
|
|
|
|
message_id
|
|
|
|
);
|
|
|
|
let r = query.fetch_one(&mut *conn).await?;
|
|
|
|
Ok(r.link_id)
|
|
|
|
}
|
|
|
|
|
2022-04-16 08:06:48 +00:00
|
|
|
pub async fn get_linked_messages<'a>(
|
|
|
|
conn: &'a mut SqliteConnection,
|
|
|
|
dyn_service: DynServiceLookup,
|
2022-04-16 09:16:51 +00:00
|
|
|
message: &ChatMessageReference,
|
2022-04-16 08:06:48 +00:00
|
|
|
) -> sqlx::Result<BoxStream<'a, ChatMessageReference>> {
|
2022-04-16 10:04:38 +00:00
|
|
|
let link_id = get_message_link_id(&mut *conn, message).await?;
|
2022-04-16 08:06:48 +00:00
|
|
|
|
|
|
|
let stream = sqlx::query("SELECT * FROM messages WHERE link_id = ?")
|
|
|
|
.bind(link_id)
|
|
|
|
.fetch(&mut *conn)
|
2022-04-16 09:16:51 +00:00
|
|
|
.filter_map(|r| futures::future::ready(r.ok()))
|
2022-04-16 08:06:48 +00:00
|
|
|
.map(move |r| {
|
|
|
|
ChatMessageReference::new(
|
|
|
|
ChatReference {
|
|
|
|
service: dyn_service(&r.get::<String, _>("service")),
|
|
|
|
id: r.get("channel"),
|
|
|
|
},
|
|
|
|
r.get::<String, _>("message"),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(Box::pin(stream))
|
|
|
|
}
|
2022-04-16 09:16:51 +00:00
|
|
|
|
|
|
|
pub async fn lookup_message<F, Fut>(
|
|
|
|
conn: &mut SqliteConnection,
|
|
|
|
dyn_service: DynServiceLookup,
|
|
|
|
linked_message: &ChatMessageReference,
|
|
|
|
filter: F,
|
|
|
|
) -> Option<ChatMessageReference>
|
|
|
|
where
|
|
|
|
F: FnMut(&ChatMessageReference) -> Fut,
|
|
|
|
Fut: Future<Output = bool>,
|
|
|
|
{
|
|
|
|
let references = get_linked_messages(&mut *conn, dyn_service, linked_message)
|
|
|
|
.await
|
|
|
|
.ok()?
|
|
|
|
.filter(filter)
|
|
|
|
.collect::<Vec<_>>()
|
|
|
|
.await;
|
|
|
|
|
|
|
|
if let [reference] = references.as_slice() {
|
|
|
|
Some(reference.clone())
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|