make EventEmitter shared refs, in AsyncClient::sync lock only in inner most scope

master
Devin R 2020-04-14 18:10:10 -04:00
parent a5ab7d97da
commit cf029b2e4f
6 changed files with 165 additions and 339 deletions

View File

@ -8,41 +8,55 @@ use matrix_sdk::{
events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent},
AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings,
}; };
use tokio::runtime::{Handle, Runtime};
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
struct CommandBot { struct CommandBot {
client: AsyncClient, client: Mutex<AsyncClient>,
// sender: Sender<(RoomId, MessageEventContent)>
} }
impl CommandBot { impl CommandBot {
pub fn new(client: AsyncClient) -> Self { pub fn new(client: AsyncClient) -> Self {
Self { client } Self {
client: Mutex::new(client),
}
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl EventEmitter for CommandBot { impl EventEmitter for CommandBot {
async fn on_room_message(&mut self, room: Arc<Mutex<Room>>, event: Arc<Mutex<MessageEvent>>) { async fn on_room_message(&self, room: &Room, event: &MessageEvent) {
if let MessageEvent { let msg_body = if let MessageEvent {
content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
.. ..
} = event.lock().await.deref() } = event
{ {
let room = room.lock().await; msg_body.clone()
if msg_body.contains("!party") { } else {
println!("!party found"); String::new()
let content = MessageEventContent::Text(TextMessageEventContent { };
body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(),
format: None, if msg_body.contains("!party") {
formatted_body: None, let content = MessageEventContent::Text(TextMessageEventContent {
relates_to: None, body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(),
}); format: None,
self.client formatted_body: None,
.room_send(&room.room_id, content, None) relates_to: None,
.await });
.unwrap(); let room_id = &room.room_id;
println!("message sent");
} println!("sending");
self.client
.lock()
.await
.room_send(&room_id, content, None)
.await
.unwrap();
println!("message sent");
} }
} }
} }
@ -52,6 +66,7 @@ async fn login_and_sync(
homeserver_url: String, homeserver_url: String,
username: String, username: String,
password: String, password: String,
exec: Handle,
) -> Result<(), matrix_sdk::Error> { ) -> Result<(), matrix_sdk::Error> {
let client_config = AsyncClientConfig::new(); let client_config = AsyncClientConfig::new();
// .proxy("http://localhost:8080")? // .proxy("http://localhost:8080")?
@ -60,9 +75,7 @@ async fn login_and_sync(
let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap();
client client
.add_event_emitter(Arc::new(Mutex::new(Box::new(CommandBot::new( .add_event_emitter(Box::new(CommandBot::new(client.clone())))
client.clone(),
)))))
.await; .await;
client client
@ -76,13 +89,16 @@ async fn login_and_sync(
println!("logged in as {}", username); println!("logged in as {}", username);
client.sync_forever(SyncSettings::new(), |_| async {}).await; exec.spawn(async move {
client.sync_forever(SyncSettings::new(), |_| async {}).await;
})
.await
.unwrap();
Ok(()) Ok(())
} }
#[tokio::main] fn main() -> Result<(), matrix_sdk::Error> {
async fn main() -> Result<(), matrix_sdk::Error> {
let (homeserver_url, username, password) = let (homeserver_url, username, password) =
match (env::args().nth(1), env::args().nth(2), env::args().nth(3)) { match (env::args().nth(1), env::args().nth(2), env::args().nth(3)) {
(Some(a), Some(b), Some(c)) => (a, b, c), (Some(a), Some(b), Some(c)) => (a, b, c),
@ -94,6 +110,16 @@ async fn main() -> Result<(), matrix_sdk::Error> {
exit(1) exit(1)
} }
}; };
login_and_sync(homeserver_url, username, password).await?;
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
let exec = runtime.handle().clone();
runtime.block_on(async { login_and_sync(homeserver_url, username, password, exec).await })?;
Ok(()) Ok(())
} }

View File

@ -14,15 +14,14 @@ struct EventCallback;
#[async_trait::async_trait] #[async_trait::async_trait]
impl EventEmitter for EventCallback { impl EventEmitter for EventCallback {
async fn on_room_message(&mut self, room: Arc<Mutex<Room>>, event: Arc<Mutex<MessageEvent>>) { async fn on_room_message(&self, room: &Room, event: &MessageEvent) {
if let MessageEvent { if let MessageEvent {
content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
sender, sender,
.. ..
} = event.lock().await.deref() } = event
{ {
let rooms = room.lock().await; let member = room.members.get(&sender).unwrap();
let member = rooms.members.get(&sender).unwrap();
println!( println!(
"{}: {}", "{}: {}",
member.display_name.as_ref().unwrap_or(&sender.to_string()), member.display_name.as_ref().unwrap_or(&sender.to_string()),
@ -43,9 +42,7 @@ async fn login(
let homeserver_url = Url::parse(&homeserver_url)?; let homeserver_url = Url::parse(&homeserver_url)?;
let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap();
client client.add_event_emitter(Box::new(EventCallback)).await;
.add_event_emitter(Arc::new(Mutex::new(Box::new(EventCallback))))
.await;
client client
.login(username, password, None, Some("rust-sdk".to_string())) .login(username, password, None, Some("rust-sdk".to_string()))

View File

@ -273,10 +273,7 @@ impl AsyncClient {
/// Add `EventEmitter` to `AsyncClient`. /// Add `EventEmitter` to `AsyncClient`.
/// ///
/// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur.
pub async fn add_event_emitter( pub async fn add_event_emitter(&mut self, emitter: Box<dyn EventEmitter>) {
&mut self,
emitter: Arc<tokio::sync::Mutex<Box<dyn EventEmitter>>>,
) {
self.base_client.write().await.event_emitter = Some(emitter); self.base_client.write().await.event_emitter = Some(emitter);
} }
@ -301,7 +298,7 @@ impl AsyncClient {
/// Returns the rooms this client knows about. /// Returns the rooms this client knows about.
/// ///
/// A `HashMap` of room id to `matrix::models::Room` /// A `HashMap` of room id to `matrix::models::Room`
pub async fn get_rooms(&self) -> HashMap<RoomId, Arc<tokio::sync::Mutex<Room>>> { pub async fn get_rooms(&self) -> HashMap<RoomId, Arc<tokio::sync::RwLock<Room>>> {
self.base_client.read().await.joined_rooms.clone() self.base_client.read().await.joined_rooms.clone()
} }
@ -565,9 +562,8 @@ impl AsyncClient {
let mut response = self.send(request).await?; let mut response = self.send(request).await?;
for (room_id, room) in &mut response.rooms.join { for (room_id, room) in &mut response.rooms.join {
let mut client = self.base_client.write().await;
let _matrix_room = { let _matrix_room = {
let mut client = self.base_client.write().await;
for event in &room.state.events { for event in &room.state.events {
if let EventResult::Ok(e) = event { if let EventResult::Ok(e) = event {
client.receive_joined_state_event(&room_id, &e).await; client.receive_joined_state_event(&room_id, &e).await;
@ -580,12 +576,14 @@ impl AsyncClient {
// re looping is not ideal here // re looping is not ideal here
for event in &mut room.state.events { for event in &mut room.state.events {
if let EventResult::Ok(e) = event { if let EventResult::Ok(e) = event {
let client = self.base_client.read().await;
client.emit_state_event(room_id, e).await; client.emit_state_event(room_id, e).await;
} }
} }
for mut event in &mut room.timeline.events { for mut event in &mut room.timeline.events {
let decrypted_event = { let decrypted_event = {
let mut client = self.base_client.write().await;
client client
.receive_joined_timeline_event(room_id, &mut event) .receive_joined_timeline_event(room_id, &mut event)
.await .await
@ -596,6 +594,7 @@ impl AsyncClient {
} }
if let EventResult::Ok(e) = event { if let EventResult::Ok(e) = event {
let client = self.base_client.read().await;
client.emit_timeline_event(room_id, e).await; client.emit_timeline_event(room_id, e).await;
} }
} }
@ -604,8 +603,8 @@ impl AsyncClient {
for account_data in &mut room.account_data.events { for account_data in &mut room.account_data.events {
{ {
if let EventResult::Ok(e) = account_data { if let EventResult::Ok(e) = account_data {
let mut client = self.base_client.write().await;
client.receive_account_data_event(&room_id, e).await; client.receive_account_data_event(&room_id, e).await;
client.emit_account_data_event(room_id, e).await; client.emit_account_data_event(room_id, e).await;
} }
} }
@ -617,6 +616,7 @@ impl AsyncClient {
for presence in &mut response.presence.events { for presence in &mut response.presence.events {
{ {
if let EventResult::Ok(e) = presence { if let EventResult::Ok(e) = presence {
let mut client = self.base_client.write().await;
client.receive_presence_event(&room_id, e).await; client.receive_presence_event(&room_id, e).await;
client.emit_presence_event(room_id, e).await; client.emit_presence_event(room_id, e).await;
@ -627,6 +627,7 @@ impl AsyncClient {
for ephemeral in &mut room.ephemeral.events { for ephemeral in &mut room.ephemeral.events {
{ {
if let EventResult::Ok(e) = ephemeral { if let EventResult::Ok(e) = ephemeral {
let mut client = self.base_client.write().await;
client.receive_ephemeral_event(&room_id, e).await; client.receive_ephemeral_event(&room_id, e).await;
client.emit_ephemeral_event(room_id, e).await; client.emit_ephemeral_event(room_id, e).await;
@ -810,7 +811,6 @@ impl AsyncClient {
} else { } else {
request_builder request_builder
}; };
let mut response = request_builder.send().await?; let mut response = request_builder.send().await?;
trace!("Got response: {:?}", response); trace!("Got response: {:?}", response);
@ -892,7 +892,7 @@ impl AsyncClient {
let room = client.joined_rooms.get(room_id); let room = client.joined_rooms.get(room_id);
match room { match room {
Some(r) => r.lock().await.is_encrypted(), Some(r) => r.write().await.is_encrypted(),
None => false, None => false,
} }
}; };
@ -901,7 +901,7 @@ impl AsyncClient {
let missing_sessions = { let missing_sessions = {
let client = self.base_client.read().await; let client = self.base_client.read().await;
let room = client.joined_rooms.get(room_id); let room = client.joined_rooms.get(room_id);
let room = room.as_ref().unwrap().lock().await; let room = room.as_ref().unwrap().write().await;
let users = room.members.keys(); let users = room.members.keys();
self.base_client self.base_client
.read() .read()

View File

@ -17,6 +17,7 @@ use std::collections::HashMap;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -36,7 +37,9 @@ use crate::models::Room;
use crate::session::Session; use crate::session::Session;
use crate::EventEmitter; use crate::EventEmitter;
#[cfg(feature = "encryption")]
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::RwLock;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use crate::crypto::{OlmMachine, OneTimeKeys}; use crate::crypto::{OlmMachine, OneTimeKeys};
@ -65,14 +68,14 @@ pub struct Client {
/// The current sync token that should be used for the next sync call. /// The current sync token that should be used for the next sync call.
pub sync_token: Option<Token>, pub sync_token: Option<Token>,
/// A map of the rooms our user is joined in. /// A map of the rooms our user is joined in.
pub joined_rooms: HashMap<RoomId, Arc<Mutex<Room>>>, pub joined_rooms: HashMap<RoomId, Arc<RwLock<Room>>>,
/// A list of ignored users. /// A list of ignored users.
pub ignored_users: Vec<UserId>, pub ignored_users: Vec<UserId>,
/// The push ruleset for the logged in user. /// The push ruleset for the logged in user.
pub push_ruleset: Option<Ruleset>, pub push_ruleset: Option<Ruleset>,
/// Any implementor of EventEmitter will act as the callbacks for various /// Any implementor of EventEmitter will act as the callbacks for various
/// events. /// events.
pub event_emitter: Option<Arc<Mutex<Box<dyn EventEmitter>>>>, pub event_emitter: Option<Box<dyn EventEmitter>>,
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc<Mutex<Option<OlmMachine>>>, olm: Arc<Mutex<Option<OlmMachine>>>,
@ -125,10 +128,7 @@ impl Client {
/// Add `EventEmitter` to `Client`. /// Add `EventEmitter` to `Client`.
/// ///
/// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur.
pub async fn add_event_emitter( pub async fn add_event_emitter(&mut self, emitter: Box<dyn EventEmitter>) {
&mut self,
emitter: Arc<tokio::sync::Mutex<Box<dyn EventEmitter>>>,
) {
self.event_emitter = Some(emitter); self.event_emitter = Some(emitter);
} }
@ -160,7 +160,7 @@ impl Client {
pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option<String> { pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option<String> {
if let Some(room) = self.joined_rooms.get(room_id) { if let Some(room) = self.joined_rooms.get(room_id) {
let room = room.lock().await; let room = room.read().await;
Some(room.room_name.calculate_name(room_id, &room.members)) Some(room.room_name.calculate_name(room_id, &room.members))
} else { } else {
None None
@ -170,17 +170,17 @@ impl Client {
pub(crate) async fn calculate_room_names(&self) -> Vec<String> { pub(crate) async fn calculate_room_names(&self) -> Vec<String> {
let mut res = Vec::new(); let mut res = Vec::new();
for (id, room) in &self.joined_rooms { for (id, room) in &self.joined_rooms {
let room = room.lock().await; let room = room.read().await;
res.push(room.room_name.calculate_name(id, &room.members)) res.push(room.room_name.calculate_name(id, &room.members))
} }
res res
} }
pub(crate) fn get_or_create_room(&mut self, room_id: &RoomId) -> &mut Arc<Mutex<Room>> { pub(crate) fn get_or_create_room(&mut self, room_id: &RoomId) -> &mut Arc<RwLock<Room>> {
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
self.joined_rooms self.joined_rooms
.entry(room_id.clone()) .entry(room_id.clone())
.or_insert(Arc::new(Mutex::new(Room::new( .or_insert(Arc::new(RwLock::new(Room::new(
room_id, room_id,
&self &self
.session .session
@ -190,7 +190,7 @@ impl Client {
)))) ))))
} }
pub(crate) fn get_room(&self, room_id: &RoomId) -> Option<&Arc<Mutex<Room>>> { pub(crate) fn get_room(&self, room_id: &RoomId) -> Option<&Arc<RwLock<Room>>> {
self.joined_rooms.get(room_id) self.joined_rooms.get(room_id)
} }
@ -259,7 +259,7 @@ impl Client {
} }
} }
let mut room = self.get_or_create_room(&room_id).lock().await; let mut room = self.get_or_create_room(&room_id).write().await;
room.receive_timeline_event(e); room.receive_timeline_event(e);
decrypted_event decrypted_event
} }
@ -282,7 +282,7 @@ impl Client {
room_id: &RoomId, room_id: &RoomId,
event: &StateEvent, event: &StateEvent,
) -> bool { ) -> bool {
let mut room = self.get_or_create_room(room_id).lock().await; let mut room = self.get_or_create_room(room_id).write().await;
room.receive_state_event(event) room.receive_state_event(event)
} }
@ -303,7 +303,7 @@ impl Client {
) -> bool { ) -> bool {
// this should be the room that was just created in the `Client::sync` loop. // this should be the room that was just created in the `Client::sync` loop.
if let Some(room) = self.get_room(room_id) { if let Some(room) = self.get_room(room_id) {
let mut room = room.lock().await; let mut room = room.write().await;
room.receive_presence_event(event) room.receive_presence_event(event)
} else { } else {
false false
@ -376,7 +376,7 @@ impl Client {
// part where we already iterate through the rooms to avoid yet // part where we already iterate through the rooms to avoid yet
// another room loop. // another room loop.
for room in self.joined_rooms.values() { for room in self.joined_rooms.values() {
let room = room.lock().await; let room = room.write().await;
if !room.is_encrypted() { if !room.is_encrypted() {
continue; continue;
} }
@ -459,7 +459,7 @@ impl Client {
match &mut *olm { match &mut *olm {
Some(o) => { Some(o) => {
let room = room.lock().await; let room = room.write().await;
let members = room.members.keys(); let members = room.members.keys();
Ok(o.share_group_session(room_id, members).await?) Ok(o.share_group_session(room_id, members).await?)
} }
@ -573,37 +573,26 @@ impl Client {
Ok(()) Ok(())
} }
pub(crate) async fn emit_timeline_event(&mut self, room_id: &RoomId, event: &mut RoomEvent) { pub(crate) async fn emit_timeline_event(&self, room_id: &RoomId, event: &RoomEvent) {
match event { match event {
RoomEvent::RoomMember(mem) => { RoomEvent::RoomMember(mem) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_member(room.read().await.deref(), &mem).await;
.await
.on_room_member(Arc::clone(&room), Arc::new(Mutex::new(mem.clone())))
.await;
} }
} }
} }
RoomEvent::RoomName(name) => { RoomEvent::RoomName(name) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_name(room.read().await.deref(), &name).await;
.await
.on_room_name(Arc::clone(&room), Arc::new(Mutex::new(name.clone())))
.await;
} }
} }
} }
RoomEvent::RoomCanonicalAlias(canonical) => { RoomEvent::RoomCanonicalAlias(canonical) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_canonical_alias(room.read().await.deref(), &canonical)
.await
.on_room_canonical_alias(
Arc::clone(&room),
Arc::new(Mutex::new(canonical.clone())),
)
.await; .await;
} }
} }
@ -611,12 +600,7 @@ impl Client {
RoomEvent::RoomAliases(aliases) => { RoomEvent::RoomAliases(aliases) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_aliases(room.read().await.deref(), &aliases)
.await
.on_room_aliases(
Arc::clone(&room),
Arc::new(Mutex::new(aliases.clone())),
)
.await; .await;
} }
} }
@ -624,32 +608,21 @@ impl Client {
RoomEvent::RoomAvatar(avatar) => { RoomEvent::RoomAvatar(avatar) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_avatar(room.read().await.deref(), &avatar).await;
.await
.on_room_avatar(Arc::clone(&room), Arc::new(Mutex::new(avatar.clone())))
.await;
} }
} }
} }
RoomEvent::RoomMessage(msg) => { RoomEvent::RoomMessage(msg) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_message(room.read().await.deref(), &msg).await;
.await
.on_room_message(Arc::clone(&room), Arc::new(Mutex::new(msg.clone())))
.await;
} }
} }
} }
RoomEvent::RoomMessageFeedback(msg_feedback) => { RoomEvent::RoomMessageFeedback(msg_feedback) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_message_feedback(room.read().await.deref(), &msg_feedback)
.await
.on_room_message_feedback(
Arc::clone(&room),
Arc::new(Mutex::new(msg_feedback.clone())),
)
.await; .await;
} }
} }
@ -657,12 +630,7 @@ impl Client {
RoomEvent::RoomRedaction(redaction) => { RoomEvent::RoomRedaction(redaction) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_redaction(room.read().await.deref(), &redaction)
.await
.on_room_redaction(
Arc::clone(&room),
Arc::new(Mutex::new(redaction.clone())),
)
.await; .await;
} }
} }
@ -670,12 +638,7 @@ impl Client {
RoomEvent::RoomPowerLevels(power) => { RoomEvent::RoomPowerLevels(power) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_room_power_levels(room.read().await.deref(), &power)
.await
.on_room_power_levels(
Arc::clone(&room),
Arc::new(Mutex::new(power.clone())),
)
.await; .await;
} }
} }
@ -684,40 +647,26 @@ impl Client {
} }
} }
pub(crate) async fn emit_state_event(&mut self, room_id: &RoomId, event: &mut StateEvent) { pub(crate) async fn emit_state_event(&self, room_id: &RoomId, event: &StateEvent) {
match event { match event {
StateEvent::RoomMember(member) => { StateEvent::RoomMember(member) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_member(room.read().await.deref(), &member).await;
.await
.on_state_member(
Arc::clone(&room),
Arc::new(Mutex::new(member.clone())),
)
.await;
} }
} }
} }
StateEvent::RoomName(name) => { StateEvent::RoomName(name) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_name(room.read().await.deref(), &name).await;
.await
.on_state_name(Arc::clone(&room), Arc::new(Mutex::new(name.clone())))
.await;
} }
} }
} }
StateEvent::RoomCanonicalAlias(canonical) => { StateEvent::RoomCanonicalAlias(canonical) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_canonical_alias(room.read().await.deref(), &canonical)
.await
.on_state_canonical_alias(
Arc::clone(&room),
Arc::new(Mutex::new(canonical.clone())),
)
.await; .await;
} }
} }
@ -725,12 +674,7 @@ impl Client {
StateEvent::RoomAliases(aliases) => { StateEvent::RoomAliases(aliases) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_aliases(room.read().await.deref(), &aliases)
.await
.on_state_aliases(
Arc::clone(&room),
Arc::new(Mutex::new(aliases.clone())),
)
.await; .await;
} }
} }
@ -738,25 +682,14 @@ impl Client {
StateEvent::RoomAvatar(avatar) => { StateEvent::RoomAvatar(avatar) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_avatar(room.read().await.deref(), &avatar).await;
.await
.on_state_avatar(
Arc::clone(&room),
Arc::new(Mutex::new(avatar.clone())),
)
.await;
} }
} }
} }
StateEvent::RoomPowerLevels(power) => { StateEvent::RoomPowerLevels(power) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_power_levels(room.read().await.deref(), &power)
.await
.on_state_power_levels(
Arc::clone(&room),
Arc::new(Mutex::new(power.clone())),
)
.await; .await;
} }
} }
@ -764,12 +697,7 @@ impl Client {
StateEvent::RoomJoinRules(rules) => { StateEvent::RoomJoinRules(rules) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_state_join_rules(room.read().await.deref(), &rules)
.await
.on_state_join_rules(
Arc::clone(&room),
Arc::new(Mutex::new(rules.clone())),
)
.await; .await;
} }
} }
@ -778,21 +706,12 @@ impl Client {
} }
} }
pub(crate) async fn emit_account_data_event( pub(crate) async fn emit_account_data_event(&self, room_id: &RoomId, event: &NonRoomEvent) {
&mut self,
room_id: &RoomId,
event: &mut NonRoomEvent,
) {
match event { match event {
NonRoomEvent::Presence(presence) => { NonRoomEvent::Presence(presence) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_presence(room.read().await.deref(), &presence)
.await
.on_account_presence(
Arc::clone(&room),
Arc::new(Mutex::new(presence.clone())),
)
.await; .await;
} }
} }
@ -800,12 +719,7 @@ impl Client {
NonRoomEvent::IgnoredUserList(ignored) => { NonRoomEvent::IgnoredUserList(ignored) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_ignored_users(room.read().await.deref(), &ignored)
.await
.on_account_ignored_users(
Arc::clone(&room),
Arc::new(Mutex::new(ignored.clone())),
)
.await; .await;
} }
} }
@ -813,12 +727,7 @@ impl Client {
NonRoomEvent::PushRules(rules) => { NonRoomEvent::PushRules(rules) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_push_rules(room.read().await.deref(), &rules)
.await
.on_account_push_rules(
Arc::clone(&room),
Arc::new(Mutex::new(rules.clone())),
)
.await; .await;
} }
} }
@ -826,12 +735,7 @@ impl Client {
NonRoomEvent::FullyRead(full_read) => { NonRoomEvent::FullyRead(full_read) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_data_fully_read(room.read().await.deref(), &full_read)
.await
.on_account_data_fully_read(
Arc::clone(&room),
Arc::new(Mutex::new(full_read.clone())),
)
.await; .await;
} }
} }
@ -840,21 +744,12 @@ impl Client {
} }
} }
pub(crate) async fn emit_ephemeral_event( pub(crate) async fn emit_ephemeral_event(&self, room_id: &RoomId, event: &NonRoomEvent) {
&mut self,
room_id: &RoomId,
event: &mut NonRoomEvent,
) {
match event { match event {
NonRoomEvent::Presence(presence) => { NonRoomEvent::Presence(presence) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_presence(room.read().await.deref(), &presence)
.await
.on_account_presence(
Arc::clone(&room),
Arc::new(Mutex::new(presence.clone())),
)
.await; .await;
} }
} }
@ -862,12 +757,7 @@ impl Client {
NonRoomEvent::IgnoredUserList(ignored) => { NonRoomEvent::IgnoredUserList(ignored) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_ignored_users(room.read().await.deref(), &ignored)
.await
.on_account_ignored_users(
Arc::clone(&room),
Arc::new(Mutex::new(ignored.clone())),
)
.await; .await;
} }
} }
@ -875,12 +765,7 @@ impl Client {
NonRoomEvent::PushRules(rules) => { NonRoomEvent::PushRules(rules) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_push_rules(room.read().await.deref(), &rules)
.await
.on_account_push_rules(
Arc::clone(&room),
Arc::new(Mutex::new(rules.clone())),
)
.await; .await;
} }
} }
@ -888,12 +773,7 @@ impl Client {
NonRoomEvent::FullyRead(full_read) => { NonRoomEvent::FullyRead(full_read) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_account_data_fully_read(room.read().await.deref(), &full_read)
.await
.on_account_data_fully_read(
Arc::clone(&room),
Arc::new(Mutex::new(full_read.clone())),
)
.await; .await;
} }
} }
@ -902,16 +782,10 @@ impl Client {
} }
} }
pub(crate) async fn emit_presence_event( pub(crate) async fn emit_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) {
&mut self,
room_id: &RoomId,
event: &mut PresenceEvent,
) {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id) {
ee.lock() ee.on_presence_event(room.read().await.deref(), &event)
.await
.on_presence_event(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await; .await;
} }
} }

View File

@ -13,8 +13,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
use crate::events::{ use crate::events::{
fully_read::FullyReadEvent, fully_read::FullyReadEvent,
ignored_user_list::IgnoredUserListEvent, ignored_user_list::IgnoredUserListEvent,
@ -34,7 +32,6 @@ use crate::events::{
}; };
use crate::models::Room; use crate::models::Room;
use tokio::sync::Mutex;
/// This trait allows any type implementing `EventEmitter` to specify event callbacks for each event. /// This trait allows any type implementing `EventEmitter` to specify event callbacks for each event.
/// The `AsyncClient` calls each method when the corresponding event is received. /// The `AsyncClient` calls each method when the corresponding event is received.
/// ///
@ -57,15 +54,14 @@ use tokio::sync::Mutex;
/// ///
/// #[async_trait::async_trait] /// #[async_trait::async_trait]
/// impl EventEmitter for EventCallback { /// impl EventEmitter for EventCallback {
/// async fn on_room_message(&mut self, room: Arc<Mutex<Room>>, event: Arc<Mutex<MessageEvent>>) { /// async fn on_room_message(&self, room: &Room, event: &MessageEvent) {
/// if let MessageEvent { /// if let MessageEvent {
/// content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), /// content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
/// sender, /// sender,
/// .. /// ..
/// } = event.lock().await.deref() /// } = event
/// { /// {
/// let rooms = room.lock().await; /// let member = room.members.get(&sender).unwrap();
/// let member = rooms.members.get(&sender).unwrap();
/// println!( /// println!(
/// "{}: {}", /// "{}: {}",
/// member /// member
@ -82,193 +78,128 @@ use tokio::sync::Mutex;
pub trait EventEmitter: Send + Sync { pub trait EventEmitter: Send + Sync {
// ROOM EVENTS from `IncomingTimeline` // ROOM EVENTS from `IncomingTimeline`
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event.
async fn on_room_member(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<MemberEvent>>) {} async fn on_room_member(&self, _: &Room, _: &MemberEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event.
async fn on_room_name(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NameEvent>>) {} async fn on_room_name(&self, _: &Room, _: &NameEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event.
async fn on_room_canonical_alias( async fn on_room_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {}
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<CanonicalAliasEvent>>,
) {
}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event.
async fn on_room_aliases(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AliasesEvent>>) {} async fn on_room_aliases(&self, _: &Room, _: &AliasesEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event.
async fn on_room_avatar(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AvatarEvent>>) {} async fn on_room_avatar(&self, _: &Room, _: &AvatarEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event.
async fn on_room_message(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<MessageEvent>>) {} async fn on_room_message(&self, _: &Room, _: &MessageEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event.
async fn on_room_message_feedback( async fn on_room_message_feedback(&self, _: &Room, _: &FeedbackEvent) {}
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<FeedbackEvent>>,
) {
}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event.
async fn on_room_redaction(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RedactionEvent>>) {} async fn on_room_redaction(&self, _: &Room, _: &RedactionEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event.
async fn on_room_power_levels(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PowerLevelsEvent>>) { async fn on_room_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {}
}
// `RoomEvent`s from `IncomingState` // `RoomEvent`s from `IncomingState`
/// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event.
async fn on_state_member(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<MemberEvent>>) {} async fn on_state_member(&self, _: &Room, _: &MemberEvent) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomName` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomName` event.
async fn on_state_name(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NameEvent>>) {} async fn on_state_name(&self, _: &Room, _: &NameEvent) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event.
async fn on_state_canonical_alias( async fn on_state_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {}
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<CanonicalAliasEvent>>,
) {
}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event.
async fn on_state_aliases(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AliasesEvent>>) {} async fn on_state_aliases(&self, _: &Room, _: &AliasesEvent) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event.
async fn on_state_avatar(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AvatarEvent>>) {} async fn on_state_avatar(&self, _: &Room, _: &AvatarEvent) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event.
async fn on_state_power_levels( async fn on_state_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {}
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<PowerLevelsEvent>>,
) {
}
/// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event.
async fn on_state_join_rules(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<JoinRulesEvent>>) {} async fn on_state_join_rules(&self, _: &Room, _: &JoinRulesEvent) {}
// `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData` // `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData`
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event.
async fn on_account_presence(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PresenceEvent>>) {} async fn on_account_presence(&self, _: &Room, _: &PresenceEvent) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event.
async fn on_account_ignored_users( async fn on_account_ignored_users(&self, _: &Room, _: &IgnoredUserListEvent) {}
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<IgnoredUserListEvent>>,
) {
}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event.
async fn on_account_push_rules(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PushRulesEvent>>) {} async fn on_account_push_rules(&self, _: &Room, _: &PushRulesEvent) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event.
async fn on_account_data_fully_read( async fn on_account_data_fully_read(&self, _: &Room, _: &FullyReadEvent) {}
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<FullyReadEvent>>,
) {
}
// `PresenceEvent` is a struct so there is only the one method // `PresenceEvent` is a struct so there is only the one method
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event.
async fn on_presence_event(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PresenceEvent>>) {} async fn on_presence_event(&self, _: &Room, _: &PresenceEvent) {}
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct EvEmitterTest(Arc<Mutex<Vec<String>>>); pub struct EvEmitterTest(Arc<Mutex<Vec<String>>>);
#[async_trait::async_trait] #[async_trait::async_trait]
impl EventEmitter for EvEmitterTest { impl EventEmitter for EvEmitterTest {
async fn on_room_member(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<MemberEvent>>) { async fn on_room_member(&self, _: &Room, _: &MemberEvent) {
self.0.lock().await.push("member".to_string()) self.0.lock().await.push("member".to_string())
} }
async fn on_room_name(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NameEvent>>) { async fn on_room_name(&self, _: &Room, _: &NameEvent) {
self.0.lock().await.push("name".to_string()) self.0.lock().await.push("name".to_string())
} }
async fn on_room_canonical_alias( async fn on_room_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<CanonicalAliasEvent>>,
) {
self.0.lock().await.push("canonical".to_string()) self.0.lock().await.push("canonical".to_string())
} }
async fn on_room_aliases(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AliasesEvent>>) { async fn on_room_aliases(&self, _: &Room, _: &AliasesEvent) {
self.0.lock().await.push("aliases".to_string()) self.0.lock().await.push("aliases".to_string())
} }
async fn on_room_avatar(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AvatarEvent>>) { async fn on_room_avatar(&self, _: &Room, _: &AvatarEvent) {
self.0.lock().await.push("avatar".to_string()) self.0.lock().await.push("avatar".to_string())
} }
async fn on_room_message(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<MessageEvent>>) { async fn on_room_message(&self, _: &Room, _: &MessageEvent) {
self.0.lock().await.push("message".to_string()) self.0.lock().await.push("message".to_string())
} }
async fn on_room_message_feedback( async fn on_room_message_feedback(&self, _: &Room, _: &FeedbackEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<FeedbackEvent>>,
) {
self.0.lock().await.push("feedback".to_string()) self.0.lock().await.push("feedback".to_string())
} }
async fn on_room_redaction(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RedactionEvent>>) { async fn on_room_redaction(&self, _: &Room, _: &RedactionEvent) {
self.0.lock().await.push("redaction".to_string()) self.0.lock().await.push("redaction".to_string())
} }
async fn on_room_power_levels( async fn on_room_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<PowerLevelsEvent>>,
) {
self.0.lock().await.push("power".to_string()) self.0.lock().await.push("power".to_string())
} }
async fn on_state_member(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<MemberEvent>>) { async fn on_state_member(&self, _: &Room, _: &MemberEvent) {
self.0.lock().await.push("state member".to_string()) self.0.lock().await.push("state member".to_string())
} }
async fn on_state_name(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NameEvent>>) { async fn on_state_name(&self, _: &Room, _: &NameEvent) {
self.0.lock().await.push("state name".to_string()) self.0.lock().await.push("state name".to_string())
} }
async fn on_state_canonical_alias( async fn on_state_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<CanonicalAliasEvent>>,
) {
self.0.lock().await.push("state canonical".to_string()) self.0.lock().await.push("state canonical".to_string())
} }
async fn on_state_aliases(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AliasesEvent>>) { async fn on_state_aliases(&self, _: &Room, _: &AliasesEvent) {
self.0.lock().await.push("state aliases".to_string()) self.0.lock().await.push("state aliases".to_string())
} }
async fn on_state_avatar(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<AvatarEvent>>) { async fn on_state_avatar(&self, _: &Room, _: &AvatarEvent) {
self.0.lock().await.push("state avatar".to_string()) self.0.lock().await.push("state avatar".to_string())
} }
async fn on_state_power_levels( async fn on_state_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<PowerLevelsEvent>>,
) {
self.0.lock().await.push("state power".to_string()) self.0.lock().await.push("state power".to_string())
} }
async fn on_state_join_rules( async fn on_state_join_rules(&self, _: &Room, _: &JoinRulesEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<JoinRulesEvent>>,
) {
self.0.lock().await.push("state rules".to_string()) self.0.lock().await.push("state rules".to_string())
} }
async fn on_account_presence(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PresenceEvent>>) { async fn on_account_presence(&self, _: &Room, _: &PresenceEvent) {
self.0.lock().await.push("account presence".to_string()) self.0.lock().await.push("account presence".to_string())
} }
async fn on_account_ignored_users( async fn on_account_ignored_users(&self, _: &Room, _: &IgnoredUserListEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<IgnoredUserListEvent>>,
) {
self.0.lock().await.push("account ignore".to_string()) self.0.lock().await.push("account ignore".to_string())
} }
async fn on_account_push_rules( async fn on_account_push_rules(&self, _: &Room, _: &PushRulesEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<PushRulesEvent>>,
) {
self.0.lock().await.push("".to_string()) self.0.lock().await.push("".to_string())
} }
async fn on_account_data_fully_read( async fn on_account_data_fully_read(&self, _: &Room, _: &FullyReadEvent) {
&mut self,
_: Arc<Mutex<Room>>,
_: Arc<Mutex<FullyReadEvent>>,
) {
self.0.lock().await.push("account read".to_string()) self.0.lock().await.push("account read".to_string())
} }
async fn on_presence_event(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PresenceEvent>>) { async fn on_presence_event(&self, _: &Room, _: &PresenceEvent) {
self.0.lock().await.push("presence event".to_string()) self.0.lock().await.push("presence event".to_string())
} }
} }
@ -303,11 +234,9 @@ mod test {
let vec = Arc::new(Mutex::new(Vec::new())); let vec = Arc::new(Mutex::new(Vec::new()));
let test_vec = Arc::clone(&vec); let test_vec = Arc::clone(&vec);
let emitter = Arc::new(Mutex::new( let emitter = Box::new(EvEmitterTest(vec)) as Box<(dyn EventEmitter)>;
Box::new(EvEmitterTest(vec)) as Box<(dyn EventEmitter)>
));
let mut client = AsyncClient::new(homeserver, Some(session)).unwrap(); let mut client = AsyncClient::new(homeserver, Some(session)).unwrap();
client.add_event_emitter(Arc::clone(&emitter)).await; client.add_event_emitter(emitter).await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync(sync_settings).await.unwrap();

View File

@ -427,7 +427,7 @@ mod test {
let room = &rooms let room = &rooms
.get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap()) .get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap())
.unwrap() .unwrap()
.lock() .read()
.await; .await;
assert_eq!(2, room.members.len()); assert_eq!(2, room.members.len());