diff --git a/Cargo.toml b/Cargo.toml index d96d8149..605a8d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,18 +15,19 @@ js_int = "0.1.2" futures = "0.3.1" reqwest = "0.10.0-alpha.2" http = "0.1.19" -ruma-api = "0.11.0" -ruma-client-api = "0.4.0" +async-std = "1.2.0" +ruma-api = "0.12.0-alpha.1" +ruma-client-api = "0.5.0-alpha.1" ruma-events = "0.15.1" log = "0.4.8" ruma-identifiers = "0.14.0" -serde_json = "1.0.41" +serde_json = "1.0.42" serde_urlencoded = "0.6.1" url = "2.1.0" [dependencies.serde] -version = "1.0.102" +version = "1.0.103" features = ["derive"] [dev-dependencies] diff --git a/examples/login.rs b/examples/login.rs index 3c9b0012..47aa3265 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -45,7 +45,7 @@ async fn login( let homeserver_url = Url::parse(&homeserver_url)?; let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - client.add_event_future(async_cb); + client.add_event_callback(async_cb); client.login(username, password, None).await?; let _response = client.sync(SyncSettings::new()).await?; diff --git a/src/async_client.rs b/src/async_client.rs index 3f61b833..3db767a8 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -9,7 +9,7 @@ use js_int::UInt; use reqwest::header::{HeaderValue, InvalidHeaderValue}; use url::Url; -use ruma_api::Endpoint; +use ruma_api::{Endpoint, Outgoing}; use ruma_events::collections::all::RoomEvent; use ruma_events::room::message::MessageEventContent; use ruma_events::EventResult; @@ -23,9 +23,12 @@ use crate::error::{Error, InnerError}; use crate::session::Session; use crate::VERSION; -type RoomEventCallbackF = +type RoomEventCallback = Box>, Arc>) -> BoxFuture<'static, ()> + Send>; +type ResponseCallback = + Box) -> BoxFuture<'static, ()> + Send>; + #[derive(Clone)] pub struct AsyncClient { /// The URL of the homeserver to connect to. @@ -36,8 +39,10 @@ pub struct AsyncClient { base_client: Arc>, /// The transaction id. transaction_id: Arc, - /// Event futures - event_futures: Arc>>, + /// Event callbacks + event_callbacks: Arc>>, + // /// Response callbacks. + response_callbacks: Arc>>, } #[derive(Default, Debug)] @@ -68,7 +73,7 @@ impl AsyncClientConfig { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct SyncSettings { pub(crate) timeout: Option, pub(crate) token: Option, @@ -153,7 +158,8 @@ impl AsyncClient { http_client, base_client: Arc::new(RwLock::new(BaseClient::new(session))), transaction_id: Arc::new(AtomicU64::new(0)), - event_futures: Arc::new(Mutex::new(Vec::new())), + event_callbacks: Arc::new(Mutex::new(Vec::new())), + response_callbacks: Arc::new(Mutex::new(Vec::new())), }) } @@ -161,19 +167,32 @@ impl AsyncClient { self.base_client.read().unwrap().logged_in() } - pub fn add_event_future( + pub fn add_event_callback( &mut self, mut callback: impl FnMut(Arc>, Arc>) -> C + 'static + Send, ) where C: Future + Send, { - let mut futures = self.event_futures.lock().unwrap(); + let mut futures = self.event_callbacks.lock().unwrap(); let future = move |room, event| callback(room, event).boxed(); futures.push(Box::new(future)); } + pub fn add_response_callback( + &mut self, + mut callback: impl FnMut(Arc) -> C + 'static + Send, + ) where + C: Future + Send, + { + let mut futures = self.response_callbacks.lock().unwrap(); + + let future = move |response| callback(response).boxed(); + + futures.push(Box::new(future)); + } + pub async fn login>( &mut self, user: S, @@ -199,7 +218,7 @@ impl AsyncClient { pub async fn sync( &mut self, sync_settings: SyncSettings, - ) -> Result { + ) -> Result { let request = sync_events::Request { filter: None, since: sync_settings.token, @@ -234,7 +253,7 @@ impl AsyncClient { let event = Arc::new(event.clone()); let callbacks = { - let mut cb_futures = self.event_futures.lock().unwrap(); + let mut cb_futures = self.event_callbacks.lock().unwrap(); let mut callbacks = Vec::new(); for cb in &mut cb_futures.iter_mut() { @@ -258,7 +277,15 @@ impl AsyncClient { async fn sync_forever() {} - async fn send(&self, request: Request) -> Result { + async fn send( + &self, + request: Request, + ) -> Result<::Incoming, Error> + where + Request::Incoming: TryFrom>, Error = ruma_api::Error>, + ::Incoming: + TryFrom>, Error = ruma_api::Error>, + { let request: http::Request> = request.try_into()?; let url = request.uri(); let url = self @@ -297,7 +324,7 @@ impl AsyncClient { let status = response.status(); let body = response.bytes().await?.as_ref().to_owned(); let response = HttpResponse::builder().status(status).body(body).unwrap(); - let response = Request::Response::try_from(response)?; + let response = ::Incoming::try_from(response)?; Ok(response) } diff --git a/src/base_client.rs b/src/base_client.rs index af461377..0be83910 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -243,7 +243,7 @@ impl Client { room.receive_state_event(event) } - pub fn receive_sync_response(&mut self, response: &api::sync::sync_events::Response) { + pub fn receive_sync_response(&mut self, response: &api::sync::sync_events::IncomingResponse) { self.sync_token = Some(response.next_batch.clone()); } }