async_client: Add the event callbacks back.

master
Damir Jelić 2019-11-26 19:06:29 +01:00
parent 07979c6d32
commit e9dfabf847
2 changed files with 27 additions and 54 deletions

View File

@ -1,9 +1,4 @@
#![feature(async_closure)] use std::sync::{Arc, RwLock};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::{env, process::exit}; use std::{env, process::exit};
use url::Url; use url::Url;
@ -12,13 +7,12 @@ use matrix_nio::{
events::{ events::{
collections::all::RoomEvent, collections::all::RoomEvent,
room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, room::message::{MessageEvent, MessageEventContent, TextMessageEventContent},
EventType,
}, },
AsyncClient, AsyncClientConfig, Room, SyncSettings, AsyncClient, AsyncClientConfig, Room, SyncSettings,
}; };
async fn async_helper(room: Arc<Mutex<Room>>, event: Arc<RoomEvent>) { async fn async_cb(room: Arc<RwLock<Room>>, event: Arc<RoomEvent>) {
let room = room.lock().unwrap(); let room = room.read().unwrap();
if let RoomEvent::RoomMessage(MessageEvent { if let RoomEvent::RoomMessage(MessageEvent {
content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
sender, sender,
@ -34,13 +28,6 @@ async fn async_helper(room: Arc<Mutex<Room>>, event: Arc<RoomEvent>) {
} }
} }
fn async_callback(
room: Arc<Mutex<Room>>,
event: Arc<RoomEvent>,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(async_helper(room, event))
}
async fn login( async fn login(
homeserver_url: String, homeserver_url: String,
username: String, username: String,
@ -52,10 +39,10 @@ async fn login(
let homeserver_url = Url::parse(&homeserver_url)?; let homeserver_url = Url::parse(&homeserver_url)?;
let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap();
// client.add_event_future(EventType::RoomMessage, Box::new(async_callback)); client.add_event_future(async_cb);
client.login(username, password, None).await?; client.login(username, password, None).await?;
let response = client.sync(SyncSettings::new()).await?; let _response = client.sync(SyncSettings::new()).await?;
Ok(()) Ok(())
} }

View File

@ -1,7 +1,5 @@
use futures::future::{BoxFuture, Future, FutureExt};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -13,9 +11,7 @@ use url::Url;
use ruma_api::Endpoint; use ruma_api::Endpoint;
use ruma_events::collections::all::RoomEvent; use ruma_events::collections::all::RoomEvent;
use ruma_events::room::message::MessageEvent;
use ruma_events::room::message::MessageEventContent; use ruma_events::room::message::MessageEventContent;
use ruma_events::Event;
pub use ruma_events::EventType; pub use ruma_events::EventType;
use ruma_identifiers::RoomId; use ruma_identifiers::RoomId;
@ -26,12 +22,8 @@ use crate::error::{Error, InnerError};
use crate::session::Session; use crate::session::Session;
use crate::VERSION; use crate::VERSION;
type RoomEventCallback = Box<dyn FnMut(&Room, &RoomEvent)>; type RoomEventCallbackF =
type RoomEventCallbackF = Box< Box<dyn FnMut(Arc<RwLock<Room>>, Arc<RoomEvent>) -> BoxFuture<'static, ()>>;
dyn FnMut(Arc<RwLock<Room>>, Arc<RoomEvent>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>
+ Send
+ Sync,
>;
#[derive(Clone)] #[derive(Clone)]
pub struct AsyncClient { pub struct AsyncClient {
@ -43,10 +35,8 @@ pub struct AsyncClient {
base_client: Arc<RwLock<BaseClient>>, base_client: Arc<RwLock<BaseClient>>,
/// The transaction id. /// The transaction id.
transaction_id: Arc<AtomicU64>, transaction_id: Arc<AtomicU64>,
// /// Event callbacks /// Event futures
// event_callbacks: Vec<RoomEventCallback>, event_futures: Arc<RwLock<Vec<RoomEventCallbackF>>>,
// /// Event futures
// event_futures: Vec<RoomEventCallbackF>,
} }
#[derive(Default, Debug)] #[derive(Default, Debug)]
@ -163,7 +153,7 @@ impl AsyncClient {
base_client: Arc::new(RwLock::new(BaseClient::new(session))), base_client: Arc::new(RwLock::new(BaseClient::new(session))),
transaction_id: Arc::new(AtomicU64::new(0)), transaction_id: Arc::new(AtomicU64::new(0)),
// event_callbacks: Vec::new(), // event_callbacks: Vec::new(),
// event_futures: Vec::new(), event_futures: Arc::new(RwLock::new(Vec::new())),
}) })
} }
@ -171,21 +161,18 @@ impl AsyncClient {
self.base_client.read().unwrap().logged_in() self.base_client.read().unwrap().logged_in()
} }
// pub fn add_event_callback( pub fn add_event_future<C: 'static>(
// &mut self, &mut self,
// event_type: EventType, mut callback: impl FnMut(Arc<RwLock<Room>>, Arc<RoomEvent>) -> C + 'static,
// callback: RoomEventCallback, ) where
// ) { C: Future<Output = ()> + Send,
// self.event_callbacks.push(callback); {
// } let mut futures = self.event_futures.write().unwrap();
// pub fn add_event_future( let future = move |room, event| callback(room, event).boxed();
// &mut self,
// event_type: EventType, futures.push(Box::new(future));
// callback: RoomEventCallbackF, }
// ) {
// self.event_futures.push(callback);
// }
pub async fn login<S: Into<String>>( pub async fn login<S: Into<String>>(
&mut self, &mut self,
@ -246,14 +233,13 @@ impl AsyncClient {
client.receive_joined_timeline_event(&room_id, &event); client.receive_joined_timeline_event(&room_id, &event);
let room = client.joined_rooms.get(&room_id).unwrap(); let room = client.joined_rooms.get(&room_id).unwrap();
let mut cb_futures = self.event_futures.write().unwrap();
// for mut cb in &mut self.event_callbacks { let event = Arc::new(event.clone());
// cb(&room.lock().unwrap(), &event);
// }
// for mut cb in &mut self.event_futures { for cb in &mut cb_futures.iter_mut() {
// cb(room.clone(), Arc::new(event.clone())).await; cb(room.clone(), event.clone()).await;
// } }
} }
client.receive_sync_response(&response); client.receive_sync_response(&response);