From e9dfabf8471beee4705f8f7909d0ae2f58702946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 26 Nov 2019 19:06:29 +0100 Subject: [PATCH] async_client: Add the event callbacks back. --- examples/login.rs | 23 ++++-------------- src/async_client.rs | 58 +++++++++++++++++---------------------------- 2 files changed, 27 insertions(+), 54 deletions(-) diff --git a/examples/login.rs b/examples/login.rs index b7e8e384..4948b3ef 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -1,9 +1,4 @@ -#![feature(async_closure)] - -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::{env, process::exit}; use url::Url; @@ -12,13 +7,12 @@ use matrix_nio::{ events::{ collections::all::RoomEvent, room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - EventType, }, AsyncClient, AsyncClientConfig, Room, SyncSettings, }; -async fn async_helper(room: Arc>, event: Arc) { - let room = room.lock().unwrap(); +async fn async_cb(room: Arc>, event: Arc) { + let room = room.read().unwrap(); if let RoomEvent::RoomMessage(MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), sender, @@ -34,13 +28,6 @@ async fn async_helper(room: Arc>, event: Arc) { } } -fn async_callback( - room: Arc>, - event: Arc, -) -> Pin + Send + Sync>> { - Box::pin(async_helper(room, event)) -} - async fn login( homeserver_url: String, username: String, @@ -52,10 +39,10 @@ async fn login( let homeserver_url = Url::parse(&homeserver_url)?; let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - // client.add_event_future(EventType::RoomMessage, Box::new(async_callback)); + client.add_event_future(async_cb); client.login(username, password, None).await?; - let response = client.sync(SyncSettings::new()).await?; + let _response = client.sync(SyncSettings::new()).await?; Ok(()) } diff --git a/src/async_client.rs b/src/async_client.rs index fc00f11c..b6534f3e 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -1,7 +1,5 @@ +use futures::future::{BoxFuture, Future, FutureExt}; use std::convert::{TryFrom, TryInto}; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; @@ -13,9 +11,7 @@ use url::Url; use ruma_api::Endpoint; use ruma_events::collections::all::RoomEvent; -use ruma_events::room::message::MessageEvent; use ruma_events::room::message::MessageEventContent; -use ruma_events::Event; pub use ruma_events::EventType; use ruma_identifiers::RoomId; @@ -26,12 +22,8 @@ use crate::error::{Error, InnerError}; use crate::session::Session; use crate::VERSION; -type RoomEventCallback = Box; -type RoomEventCallbackF = Box< - dyn FnMut(Arc>, Arc) -> Pin + Send + Sync>> - + Send - + Sync, ->; +type RoomEventCallbackF = + Box>, Arc) -> BoxFuture<'static, ()>>; #[derive(Clone)] pub struct AsyncClient { @@ -43,10 +35,8 @@ pub struct AsyncClient { base_client: Arc>, /// The transaction id. transaction_id: Arc, - // /// Event callbacks - // event_callbacks: Vec, - // /// Event futures - // event_futures: Vec, + /// Event futures + event_futures: Arc>>, } #[derive(Default, Debug)] @@ -163,7 +153,7 @@ impl AsyncClient { base_client: Arc::new(RwLock::new(BaseClient::new(session))), transaction_id: Arc::new(AtomicU64::new(0)), // event_callbacks: Vec::new(), - // event_futures: Vec::new(), + event_futures: Arc::new(RwLock::new(Vec::new())), }) } @@ -171,21 +161,18 @@ impl AsyncClient { self.base_client.read().unwrap().logged_in() } - // pub fn add_event_callback( - // &mut self, - // event_type: EventType, - // callback: RoomEventCallback, - // ) { - // self.event_callbacks.push(callback); - // } + pub fn add_event_future( + &mut self, + mut callback: impl FnMut(Arc>, Arc) -> C + 'static, + ) where + C: Future + Send, + { + let mut futures = self.event_futures.write().unwrap(); - // pub fn add_event_future( - // &mut self, - // event_type: EventType, - // callback: RoomEventCallbackF, - // ) { - // self.event_futures.push(callback); - // } + let future = move |room, event| callback(room, event).boxed(); + + futures.push(Box::new(future)); + } pub async fn login>( &mut self, @@ -246,14 +233,13 @@ impl AsyncClient { client.receive_joined_timeline_event(&room_id, &event); let room = client.joined_rooms.get(&room_id).unwrap(); + let mut cb_futures = self.event_futures.write().unwrap(); - // for mut cb in &mut self.event_callbacks { - // cb(&room.lock().unwrap(), &event); - // } + let event = Arc::new(event.clone()); - // for mut cb in &mut self.event_futures { - // cb(room.clone(), Arc::new(event.clone())).await; - // } + for cb in &mut cb_futures.iter_mut() { + cb(room.clone(), event.clone()).await; + } } client.receive_sync_response(&response);