Compare commits

..

No commits in common. "48c3eb1830638b7cb6acd0e58d692b2bd822bc8f" and "e806786a1c9db3c2754312febcfe622de0d0fca0" have entirely different histories.

17 changed files with 42 additions and 346 deletions

View File

@ -13,11 +13,11 @@ pub use content::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ChatMessage { pub struct ChatMessage {
pub origin: ChatMessageReference, pub origin: ChatReference,
pub author: ChatAuthor, pub author: ChatAuthor,
pub content: ChatMessageContent, pub content: ChatMessageContent,
pub attachments: Vec<()>, pub attachments: Vec<()>,
pub replying: Option<ChatMessageReference>, pub replying: Option<ChatReference>,
} }
pub mod event; pub mod event;

View File

@ -3,18 +3,3 @@ pub struct ChatReference {
pub service: &'static str, pub service: &'static str,
pub id: String, pub id: String,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChatMessageReference {
pub channel: ChatReference,
pub message_id: String,
}
impl ChatMessageReference {
pub fn new(channel: ChatReference, message_id: impl ToString) -> Self {
Self {
channel,
message_id: message_id.to_string(),
}
}
}

View File

@ -2,49 +2,7 @@ use color_eyre::Result;
use tracing::info; use tracing::info;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use phoebe::{ use phoebe::service::Service;
get_linked_channels, link_messages,
prelude::{ChatEvent, SqlitePool},
service::Service,
DynServiceLookup,
};
async fn handle_events(
dyn_service: DynServiceLookup,
db: SqlitePool,
mut service: Box<dyn Service + Send + Sync>,
mut rx: tokio::sync::broadcast::Receiver<ChatEvent>,
) {
info!("Handling events for {}…", service.tag());
let mut conn = db
.acquire()
.await
.expect("Failed to acquire core DB connection");
while let Ok(event) = rx.recv().await {
match event {
phoebe::prelude::ChatEvent::NewMessage(message) => {
let linked_channels =
get_linked_channels(&mut conn, dyn_service, &message.origin.channel).await;
let mut resulting_messages = vec![];
for destination_channel in linked_channels {
resulting_messages.extend(
service
.send_chat_message(&message, destination_channel)
.await,
)
}
if let Err(e) = link_messages(&mut conn, &message.origin, &resulting_messages).await
{
tracing::error!("Failed to link messages: {e}");
}
}
}
}
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -58,22 +16,19 @@ async fn main() -> Result<()> {
let (tx, _) = tokio::sync::broadcast::channel(512); let (tx, _) = tokio::sync::broadcast::channel(512);
let db = phoebe::open_core_db().await?; let db = phoebe::open_core_db().await?;
fn dyn_service(service: &str) -> &'static str {
match service {
"discord" => "discord",
"matrix" => "matrix",
_ => panic!("Unsupported service: {}", service),
}
}
let services: Vec<Box<dyn Service + Send + Sync>> = vec![Box::new( let services: Vec<Box<dyn Service + Send + Sync>> = vec![Box::new(
phoebe_discord::setup(db.clone(), tx.clone(), dyn_service).await?, phoebe_discord::setup(db.clone(), tx.clone()).await?,
)]; )];
let handles = services let handles = services.into_iter().map(|mut srv| {
.into_iter() let mut rx = tx.subscribe();
.map(|srv| tokio::spawn(handle_events(dyn_service, db.clone(), srv, tx.subscribe()))); tokio::spawn(async move {
info!("Handling events for {}…", srv.get_service_tag());
while let Ok(event) = rx.recv().await {
srv.handle_chat_event(&event).await;
}
})
});
let _ = futures::future::join_all(handles).await; let _ = futures::future::join_all(handles).await;

View File

@ -1 +1 @@
DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/main.db" DATABASE_URL="sqlite://PHOEBE_DB_ROOT/main.db"

View File

@ -11,4 +11,3 @@ sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "sqlite"] }
tracing = "0.1" tracing = "0.1"
async-trait = "0.1.53" async-trait = "0.1.53"
eyre = "0.6.8" eyre = "0.6.8"
futures = "0.3.21"

View File

@ -1,3 +0,0 @@
fn main() {
println!("cargo:rerun-if-changed=migrations");
}

View File

@ -1,10 +0,0 @@
CREATE TABLE message_links (
id INTEGER PRIMARY KEY AUTOINCREMENT
) STRICT;
CREATE TABLE messages (
link_id INTEGER NOT NULL REFERENCES message_links(id),
service TEXT NOT NULL,
channel TEXT NOT NULL,
message TEXT NOT NULL
)

View File

@ -1,8 +0,0 @@
CREATE TABLE channel_links (
from_service TEXT NOT NULL,
from_channel TEXT NOT NULL,
to_service TEXT NOT NULL,
to_channel TEXT NOT NULL,
PRIMARY KEY (from_service, from_channel, to_service, to_channel)
) STRICT;

View File

@ -1,135 +1,14 @@
use futures::{stream::BoxStream, Future};
pub use mid_chat; pub use mid_chat;
use mid_chat::{ChatMessageReference, ChatReference};
use futures::StreamExt;
use sqlx::{Row, SqliteConnection, SqlitePool};
use tokio::sync::broadcast::*;
pub mod db; pub mod db;
pub mod prelude; pub mod prelude;
pub mod service; pub mod service;
pub type ChatEventSender = Sender<mid_chat::event::ChatEvent>; pub type ChatEventSender = tokio::sync::broadcast::Sender<mid_chat::event::ChatEvent>;
pub type ChatEventReceiver = Receiver<mid_chat::event::ChatEvent>; pub type ChatEventReceiver = tokio::sync::broadcast::Receiver<mid_chat::event::ChatEvent>;
pub type DynServiceLookup = fn(&str) -> &'static str; pub async fn open_core_db() -> sqlx::Result<sqlx::SqlitePool> {
pub async fn open_core_db() -> sqlx::Result<SqlitePool> {
let db = db::open("main").await?; let db = db::open("main").await?;
sqlx::migrate!().run(&db).await?; sqlx::migrate!().run(&db).await?;
Ok(db) Ok(db)
} }
pub async fn get_linked_channels(
conn: &mut SqliteConnection,
dyn_service: DynServiceLookup,
channel: &ChatReference,
) -> Vec<ChatReference> {
let from_service = channel.service;
let from_channel = &channel.id;
let query = sqlx::query!(
"SELECT * FROM channel_links WHERE from_service = ? AND from_channel = ?",
from_service,
from_channel
);
query
.fetch(&mut *conn)
.filter_map(|r| async { r.ok() })
.map(|r| ChatReference {
service: dyn_service(&r.to_service),
id: r.to_channel,
})
.collect()
.await
}
pub async fn link_messages(
conn: &mut SqliteConnection,
origin: &ChatMessageReference,
messages: &[ChatMessageReference],
) -> sqlx::Result<()> {
let message_link = sqlx::query!("INSERT INTO message_links DEFAULT VALUES")
.execute(&mut *conn)
.await?
.last_insert_rowid();
for resultant in std::iter::once(origin).chain(messages) {
let service = &resultant.channel.service;
let channel = &resultant.channel.id;
let message = &resultant.message_id;
let query = sqlx::query!(
"INSERT INTO messages VALUES (?, ?, ?, ?)",
message_link,
service,
channel,
message
);
let _ = query.execute(&mut *conn).await?;
}
Ok(())
}
pub async fn get_linked_messages<'a>(
conn: &'a mut SqliteConnection,
dyn_service: DynServiceLookup,
message: &ChatMessageReference,
) -> sqlx::Result<BoxStream<'a, ChatMessageReference>> {
let link_id = {
let service = &message.channel.service;
let channel = &message.channel.id;
let message_id = &message.message_id;
let query = sqlx::query!(
"SELECT link_id FROM messages WHERE service = ? AND channel = ? AND message = ?",
service,
channel,
message_id
);
query.fetch_one(&mut *conn).await
}?
.link_id;
let stream = sqlx::query("SELECT * FROM messages WHERE link_id = ?")
.bind(link_id)
.fetch(&mut *conn)
.filter_map(|r| futures::future::ready(r.ok()))
.map(move |r| {
ChatMessageReference::new(
ChatReference {
service: dyn_service(&r.get::<String, _>("service")),
id: r.get("channel"),
},
r.get::<String, _>("message"),
)
});
Ok(Box::pin(stream))
}
pub async fn lookup_message<F, Fut>(
conn: &mut SqliteConnection,
dyn_service: DynServiceLookup,
linked_message: &ChatMessageReference,
filter: F,
) -> Option<ChatMessageReference>
where
F: FnMut(&ChatMessageReference) -> Fut,
Fut: Future<Output = bool>,
{
let references = get_linked_messages(&mut *conn, dyn_service, linked_message)
.await
.ok()?
.filter(filter)
.collect::<Vec<_>>()
.await;
if let [reference] = references.as_slice() {
Some(reference.clone())
} else {
None
}
}

View File

@ -1,7 +1,6 @@
pub use crate::{service::Service, ChatEventReceiver, ChatEventSender}; pub use crate::{service::Service, ChatEventReceiver, ChatEventSender};
pub use async_trait::async_trait; pub use async_trait::async_trait;
pub use eyre::{self, Result}; pub use eyre::Result;
pub use futures::{self, prelude::*};
pub use mid_chat::event::ChatEvent; pub use mid_chat::event::ChatEvent;
pub use sqlx::{self, SqliteConnection, SqlitePool}; pub use sqlx::SqlitePool;

View File

@ -1,11 +1,7 @@
use mid_chat::{ChatMessage, ChatMessageReference, ChatReference}; use mid_chat::event::ChatEvent;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Service { pub trait Service {
fn tag(&self) -> &'static str; fn get_service_tag(&self) -> &'static str;
async fn send_chat_message( async fn handle_chat_event(&mut self, event: &ChatEvent);
&mut self,
source: &ChatMessage,
destination_channel: ChatReference,
) -> Vec<ChatMessageReference>;
} }

View File

@ -1 +1 @@
DATABASE_URL="sqlite://${PHOEBE_DB_ROOT}/discord_media.db" DATABASE_URL="sqlite://PHOEBE_DB_ROOT/discord_media.db"

View File

@ -69,7 +69,7 @@ fn mid_to_discord(component: &ChatContentComponent) -> String {
lang.as_ref() lang.as_ref()
.map(|s| s.to_string()) .map(|s| s.to_string())
.unwrap_or_else(|| "".to_string()), .unwrap_or_else(|| "".to_string()),
source source.to_string()
) )
} }

View File

@ -44,17 +44,20 @@ impl EventHandler for DiscordHandler {
} }
async fn message(&self, ctx: Context, message: Message) { async fn message(&self, ctx: Context, message: Message) {
let origin = ChatMessageReference::new(discord_reference(message.channel_id), message.id); let origin = ChatReference {
service: "discord",
id: message.id.to_string(),
};
let author = self.get_author(&ctx, &message).await; let author = self.get_author(&ctx, &message).await;
let content = discord_message_format::parse(&message.content); let content = discord_message_format::parse(&message.content);
let content = super::chat_conv::convert(&content); let content = super::chat_conv::convert(&content);
let replies_to = message let replies_to = message.referenced_message.as_ref().map(|m| ChatReference {
.referenced_message service: "discord",
.as_ref() id: m.id.to_string(),
.map(|m| ChatMessageReference::new(discord_reference(m.channel_id), m.id)); });
let chat_message = ChatMessage { let chat_message = ChatMessage {
origin, origin,

View File

@ -1,15 +1,9 @@
use phoebe::{ use phoebe::{mid_chat, prelude::*};
mid_chat::{self, ChatMessage, ChatMessageReference, ChatReference},
prelude::*,
DynServiceLookup,
};
use serenity::{client::Context, Client}; use serenity::{client::Context, Client};
use tracing::{debug, info}; use tracing::{debug, info};
mod chat_conv; mod chat_conv;
mod handler; mod handler;
mod lookup;
mod sender;
pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference { pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference {
mid_chat::ChatReference { mid_chat::ChatReference {
@ -19,17 +13,10 @@ pub fn discord_reference(id: impl ToString) -> mid_chat::ChatReference {
} }
pub struct DiscordService { pub struct DiscordService {
pub core_db: SqlitePool, pub discord_ctx: Context,
pub discord_media_db: SqlitePool,
pub ctx: Context,
pub dyn_service: DynServiceLookup,
} }
pub async fn setup( pub async fn setup(core_db: SqlitePool, tx: ChatEventSender) -> Result<DiscordService> {
core_db: SqlitePool,
tx: ChatEventSender,
dyn_service: DynServiceLookup,
) -> Result<DiscordService> {
info!("Setting up Discord service…"); info!("Setting up Discord service…");
let discord_media_db = phoebe::db::open("discord_media").await?; let discord_media_db = phoebe::db::open("discord_media").await?;
@ -38,12 +25,13 @@ pub async fn setup(
let (ctx_tx, mut ctx_rx) = tokio::sync::mpsc::unbounded_channel::<Context>(); let (ctx_tx, mut ctx_rx) = tokio::sync::mpsc::unbounded_channel::<Context>();
let discord_handler = handler::DiscordHandler { let discord_handler = handler::DiscordHandler {
core_db: core_db.clone(), core_db,
discord_media_db: discord_media_db.clone(), discord_media_db,
chat_event_tx: tx, chat_event_tx: tx,
ctx_tx, ctx_tx,
}; };
// TODO: Create a discord client
debug!("Logging in…"); debug!("Logging in…");
let discord_token = std::env::var("PHOEBE_DISCORD_TOKEN") let discord_token = std::env::var("PHOEBE_DISCORD_TOKEN")
.expect("PHOEBE_DISCORD_TOKEN environment variable was not set!"); .expect("PHOEBE_DISCORD_TOKEN environment variable was not set!");
@ -62,30 +50,16 @@ pub async fn setup(
let discord_ctx = ctx_rx.recv().await.expect("Couldn't get Discord context"); let discord_ctx = ctx_rx.recv().await.expect("Couldn't get Discord context");
debug!("Logged in!"); debug!("Logged in!");
Ok(DiscordService { Ok(DiscordService { discord_ctx })
core_db,
discord_media_db,
ctx: discord_ctx,
dyn_service,
})
} }
#[async_trait] #[async_trait]
impl Service for DiscordService { impl Service for DiscordService {
async fn send_chat_message( async fn handle_chat_event(&mut self, event: &ChatEvent) {
&mut self, dbg!(event);
source: &ChatMessage,
destination_channel: ChatReference,
) -> Vec<ChatMessageReference> {
assert_eq!(destination_channel.service, "discord");
sender::send_discord_message(self, source, destination_channel)
.await
.ok()
.into_iter()
.collect()
} }
fn tag(&self) -> &'static str { fn get_service_tag(&self) -> &'static str {
"discord" "discord"
} }
} }

View File

@ -1,18 +0,0 @@
use phoebe::{lookup_message, mid_chat::ChatMessageReference, prelude::*};
use crate::DiscordService;
impl DiscordService {
pub async fn lookup_message<F, Fut>(
&self,
linked_message: &ChatMessageReference,
filter: F,
) -> Option<ChatMessageReference>
where
F: FnMut(&ChatMessageReference) -> Fut,
Fut: Future<Output = bool>,
{
let mut conn = self.core_db.acquire().await.ok()?;
lookup_message(&mut conn, self.dyn_service, linked_message, filter).await
}
}

View File

@ -1,55 +0,0 @@
use phoebe::{
mid_chat::{ChatMessage, ChatMessageReference, ChatReference},
prelude::{future, Result},
};
use serenity::{model::prelude::*, prelude::*};
use crate::{chat_conv, discord_reference, DiscordService};
pub async fn send_discord_message(
discord: &mut DiscordService,
source: &ChatMessage,
destination_channel: ChatReference,
) -> Result<ChatMessageReference> {
// TODO: If we have a webhook we should use the webhook
let channel_id = destination_channel.id.parse::<ChannelId>()?;
let formatted_message = chat_conv::format(&source.content);
let content = format!(
"{} ({}): {}",
source.author.display_name, source.author.reference.service, formatted_message
);
let discord_reply = if let Some(reply) = &source.replying {
if let Some(reply_ref) = discord
.lookup_message(reply, |r| future::ready(r.channel == destination_channel))
.await
{
assert_eq!(reply_ref.channel.service, "discord");
let channel_id: ChannelId = reply_ref.channel.id.parse().unwrap();
let message_id: MessageId = reply_ref.message_id.parse::<u64>().unwrap().into();
Some((channel_id, message_id))
} else {
None
}
} else {
None
};
let sent_message = channel_id
.send_message(&discord.ctx, move |m| {
let m = m.content(content);
if let Some(reply) = discord_reply {
m.reference_message(reply)
} else {
m
}
})
.await?;
Ok(ChatMessageReference::new(
discord_reference(sent_message.channel_id),
sent_message.id,
))
}