Merge branch 'master' into prev-content

master
Devin R 2020-05-22 17:17:27 -04:00
commit 86a01a0ca6
18 changed files with 469 additions and 170 deletions

View File

@ -20,7 +20,7 @@ sqlite-cryptostore = ["matrix-sdk-base/sqlite-cryptostore"]
http = "0.2.1"
reqwest = "0.10.4"
serde_json = "1.0.53"
thiserror = "1.0.17"
thiserror = "1.0.18"
tracing = "0.1.14"
url = "2.1.1"
futures-timer = "3.0.2"

View File

@ -75,7 +75,7 @@ async fn login_and_sync(
let homeserver_url = Url::parse(&homeserver_url).expect("Couldn't parse the homeserver URL");
// create a new Client with the given homeserver url and config
let mut client = Client::new_with_config(homeserver_url, None, client_config).unwrap();
let mut client = Client::new_with_config(homeserver_url, client_config).unwrap();
client
.login(

View File

@ -45,7 +45,7 @@ async fn login(
.proxy("http://localhost:8080")?
.disable_ssl_verification();
let homeserver_url = Url::parse(&homeserver_url).expect("Couldn't parse the homeserver URL");
let mut client = Client::new_with_config(homeserver_url, None, client_config).unwrap();
let mut client = Client::new_with_config(homeserver_url, client_config).unwrap();
client.add_event_emitter(Box::new(EventCallback)).await;

View File

@ -54,7 +54,7 @@ pub async fn run() -> Result<JsValue, JsValue> {
let client_config = ClientConfig::new();
let homeserver_url = Url::parse(&homeserver_url).unwrap();
let client = Client::new_with_config(homeserver_url, None, client_config).unwrap();
let client = Client::new_with_config(homeserver_url, client_config).unwrap();
client
.login(username, password, None, Some("rust-sdk-wasm"))

View File

@ -74,7 +74,6 @@ impl std::fmt::Debug for Client {
}
}
#[derive(Default)]
/// Configuration for the creation of the `Client`.
///
/// When setting the `StateStore` it is up to the user to open/connect
@ -99,6 +98,7 @@ impl std::fmt::Debug for Client {
/// let client_config = ClientConfig::new()
/// .state_store(Box::new(store));
/// ```
#[derive(Default)]
pub struct ClientConfig {
#[cfg(not(target_arch = "wasm32"))]
proxy: Option<reqwest::Proxy>,
@ -242,11 +242,9 @@ impl Client {
/// # Arguments
///
/// * `homeserver_url` - The homeserver that the client should connect to.
/// * `session` - If a previous login exists, the access token can be
/// reused by giving a session object here.
pub fn new<U: TryInto<Url>>(homeserver_url: U, session: Option<Session>) -> Result<Self> {
pub fn new<U: TryInto<Url>>(homeserver_url: U) -> Result<Self> {
let config = ClientConfig::new();
Client::new_with_config(homeserver_url, session, config)
Client::new_with_config(homeserver_url, config)
}
/// Create a new client with the given configuration.
@ -254,12 +252,10 @@ impl Client {
/// # Arguments
///
/// * `homeserver_url` - The homeserver that the client should connect to.
/// * `session` - If a previous login exists, the access token can be
/// reused by giving a session object here.
///
/// * `config` - Configuration for the client.
pub fn new_with_config<U: TryInto<Url>>(
homeserver_url: U,
session: Option<Session>,
config: ClientConfig,
) -> Result<Self> {
#[allow(clippy::match_wild_err_arm)]
@ -298,9 +294,9 @@ impl Client {
let http_client = http_client.build()?;
let base_client = if let Some(store) = config.state_store {
BaseClient::new_with_state_store(session, store)?
BaseClient::new_with_state_store(store)?
} else {
BaseClient::new(session)?
BaseClient::new()?
};
Ok(Self {
@ -389,7 +385,7 @@ impl Client {
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// let store = JsonStore::open("path/to/store").unwrap();
/// let config = ClientConfig::new().state_store(Box::new(store));
/// let mut client = Client::new(homeserver, None).unwrap();
/// let mut client = Client::new(homeserver).unwrap();
/// # use futures::executor::block_on;
/// # block_on(async {
/// let _ = client.login("name", "password", None, None).await.unwrap();
@ -455,6 +451,16 @@ impl Client {
Ok(response)
}
/// Restore a previously logged in session.
///
/// # Arguments
///
/// * `session` - An session that the user already has from a
/// previous login call.
pub async fn restore_login(&self, session: Session) -> Result<()> {
Ok(self.base_client.restore_login(session).await?)
}
/// Join a room by `RoomId`.
///
/// Returns a `join_room_by_id::Response` consisting of the
@ -566,7 +572,6 @@ impl Client {
/// # Arguments
///
/// * `room_id` - The `RoomId` of the room to leave.
///
pub async fn leave_room(&self, room_id: &RoomId) -> Result<leave_room::Response> {
let request = leave_room::Request {
room_id: room_id.clone(),
@ -641,7 +646,7 @@ impl Client {
/// .name("name")
/// .room_version("v1.0");
///
/// let mut cli = Client::new(homeserver, None).unwrap();
/// let mut cli = Client::new(homeserver).unwrap();
/// # use futures::executor::block_on;
/// # block_on(async {
/// assert!(cli.create_room(builder).await.is_ok());
@ -685,10 +690,10 @@ impl Client {
/// .direction(Direction::Backward)
/// .limit(UInt::new(10).unwrap());
///
/// let mut cli = Client::new(homeserver, None).unwrap();
/// let mut client = Client::new(homeserver).unwrap();
/// # use futures::executor::block_on;
/// # block_on(async {
/// assert!(cli.room_messages(builder).await.is_ok());
/// assert!(client.room_messages(builder).await.is_ok());
/// # });
/// ```
pub async fn room_messages<R: Into<get_message_events::Request>>(
@ -759,21 +764,7 @@ impl Client {
///
/// * `sync_settings` - Settings for the sync call.
#[instrument]
#[allow(clippy::useless_let_if_seq)]
pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result<sync_events::Response> {
{
// if the client has been synced from the state store don't sync again
if !self.base_client.is_state_store_synced() {
// this will bail out returning false if the store has not been set up
if let Ok(synced) = self.sync_with_state_store().await {
if synced {
// once synced, update the sync token to the last known state from `StateStore`.
sync_settings.token = self.sync_token().await;
}
}
}
}
pub async fn sync(&self, sync_settings: SyncSettings) -> Result<sync_events::Response> {
let request = sync_events::Request {
filter: None,
since: sync_settings.token,
@ -857,6 +848,10 @@ impl Client {
let mut sync_settings = sync_settings;
let mut last_sync_time: Option<Instant> = None;
if sync_settings.token.is_none() {
sync_settings.token = self.sync_token().await;
}
loop {
let response = self.sync(sync_settings.clone()).await;
@ -1010,7 +1005,7 @@ impl Client {
/// use matrix_sdk::events::room::message::{MessageEventContent, TextMessageEventContent};
/// # block_on(async {
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
/// # let mut client = Client::new(homeserver, None).unwrap();
/// # let mut client = Client::new(homeserver).unwrap();
/// # let room_id = RoomId::try_from("!test:localhost").unwrap();
/// use matrix_sdk_common::uuid::Uuid;
///
@ -1244,12 +1239,81 @@ mod test {
use matrix_sdk_base::JsonStore;
use matrix_sdk_test::{EventBuilder, EventsFile};
use mockito::{mock, Matcher};
use tempfile::tempdir;
use std::convert::TryFrom;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
#[tokio::test]
async fn test_join_leave_room() {
let homeserver = Url::from_str(&mockito::server_url()).unwrap();
let room_id = RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap();
let session = Session {
access_token: "1234".to_owned(),
user_id: UserId::try_from("@example:localhost").unwrap(),
device_id: "DEVICEID".to_owned(),
};
let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
)
.with_status(200)
.with_body_from_file("../test_data/sync.json")
.create();
let dir = tempdir().unwrap();
let path: &Path = dir.path();
let store = Box::new(JsonStore::open(path).unwrap());
let config = ClientConfig::default().state_store(store);
let client = Client::new_with_config(homeserver.clone(), config).unwrap();
client.restore_login(session.clone()).await.unwrap();
let room = client.get_joined_room(&room_id).await;
assert!(room.is_none());
client.sync(SyncSettings::default()).await.unwrap();
let room = client.get_left_room(&room_id).await;
assert!(room.is_none());
let room = client.get_joined_room(&room_id).await;
assert!(room.is_some());
// test store reloads with correct room state from JsonStore
let store = Box::new(JsonStore::open(path).unwrap());
let config = ClientConfig::default().state_store(store);
let joined_client = Client::new_with_config(homeserver, config).unwrap();
joined_client.restore_login(session).await.unwrap();
// joined room reloaded from state store
joined_client.sync(SyncSettings::default()).await.unwrap();
let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_some());
let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
)
.with_status(200)
.with_body_from_file("../test_data/leave_event_sync.json")
.create();
joined_client.sync(SyncSettings::default()).await.unwrap();
let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_none());
let room = joined_client.get_left_room(&room_id).await;
assert!(room.is_some());
}
#[tokio::test]
async fn account_data() {
let homeserver = Url::from_str(&mockito::server_url()).unwrap();
@ -1268,7 +1332,8 @@ mod test {
.with_body_from_file("../test_data/sync.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
@ -1287,7 +1352,8 @@ mod test {
device_id: "DEVICEID".to_owned(),
};
let homeserver = url::Url::parse(&mockito::server_url()).unwrap();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let mut response = EventBuilder::default()
.add_room_event(EventsFile::Member, RoomEvent::RoomMember)
@ -1319,7 +1385,7 @@ mod test {
.with_body_from_file("../test_data/login_response_error.json")
.create();
let client = Client::new(homeserver, None).unwrap();
let client = Client::new(homeserver).unwrap();
if let Err(err) = client.login("example", "wordpass", None, None).await {
if let crate::Error::RumaResponse(crate::FromHttpResponseError::Http(
@ -1368,7 +1434,8 @@ mod test {
.with_body_from_file("../test_data/room_id.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let room_id = RoomId::try_from("!testroom:example.org").unwrap();
assert_eq!(
@ -1398,7 +1465,8 @@ mod test {
.with_body_from_file("../test_data/room_id.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let room_id = RoomIdOrAliasId::try_from("!testroom:example.org").unwrap();
assert_eq!(
@ -1433,7 +1501,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
if let invite_user::Response = client.invite_user_by_id(&room_id, &user).await.unwrap() {}
}
@ -1459,7 +1528,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
if let invite_user::Response = client
.invite_user_by_3pid(
@ -1496,7 +1566,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let room_id = RoomId::try_from("!testroom:example.org").unwrap();
let response = client.leave_room(&room_id).await.unwrap();
@ -1531,7 +1602,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let response = client.ban_user(&room_id, &user, None).await.unwrap();
if let ban_user::Response = response {
@ -1565,7 +1637,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let response = client.kick_user(&room_id, &user, None).await.unwrap();
if let kick_user::Response = response {
@ -1599,7 +1672,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let response = client.forget_room_by_id(&room_id).await.unwrap();
if let forget_room::Response = response {
@ -1634,7 +1708,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let response = client.read_receipt(&room_id, &event_id).await.unwrap();
if let create_receipt::Response = response {
@ -1668,7 +1743,8 @@ mod test {
.with_body_from_file("../test_data/logout_response.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let response = client
.typing_notice(
@ -1710,7 +1786,8 @@ mod test {
.with_body_from_file("../test_data/event_id.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let content = MessageEventContent::Text(TextMessageEventContent {
body: "Hello world".to_owned(),
@ -1748,7 +1825,8 @@ mod test {
.with_body_from_file("../test_data/sync.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
@ -1788,7 +1866,8 @@ mod test {
.with_body_from_file("../test_data/sync_with_summary.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync(sync_settings).await.unwrap();
@ -1812,7 +1891,8 @@ mod test {
};
let homeserver = url::Url::parse(&mockito::server_url()).unwrap();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let _m = mock(
"GET",
@ -1845,7 +1925,8 @@ mod test {
};
let homeserver = url::Url::parse(&mockito::server_url()).unwrap();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let _m = mock(
"GET",
@ -1890,12 +1971,12 @@ mod test {
.with_body_from_file("../test_data/login_response.json")
.create();
let dir = tempfile::tempdir().unwrap();
let dir = tempdir().unwrap();
// a sync response to populate our JSON store
let config =
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
let client =
Client::new_with_config(homeserver.clone(), Some(session.clone()), config).unwrap();
let client = Client::new_with_config(homeserver.clone(), config).unwrap();
client.restore_login(session.clone()).await.unwrap();
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000));
// gather state to save to the db, the first time through loading will be skipped
@ -1904,7 +1985,8 @@ mod test {
// now syncing the client will update from the state store
let config =
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
let client = Client::new_with_config(homeserver, Some(session.clone()), config).unwrap();
let client = Client::new_with_config(homeserver, config).unwrap();
client.restore_login(session.clone()).await.unwrap();
client.sync(sync_settings).await.unwrap();
let base_client = &client.base_client;
@ -1930,7 +2012,7 @@ mod test {
.with_body_from_file("../test_data/login_response.json")
.create();
let client = Client::new(homeserver, None).unwrap();
let client = Client::new(homeserver).unwrap();
client
.login("example", "wordpass", None, None)
@ -1959,7 +2041,8 @@ mod test {
.with_body_from_file("../test_data/sync.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
@ -1988,7 +2071,8 @@ mod test {
.with_body_from_file("../test_data/sync.json")
.create();
let client = Client::new(homeserver, Some(session)).unwrap();
let client = Client::new(homeserver).unwrap();
client.restore_login(session).await.unwrap();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));

View File

@ -30,8 +30,8 @@ use crate::js_int::UInt;
/// .visibility(Visibility::Public)
/// .name("name")
/// .room_version("v1.0");
/// let mut cli = Client::new(homeserver, None).unwrap();
/// cli.create_room(builder).await;
/// let mut client = Client::new(homeserver).unwrap();
/// client.create_room(builder).await;
/// # })
/// ```
#[derive(Clone, Debug, Default)]
@ -188,14 +188,14 @@ impl Into<create_room::Request> for RoomBuilder {
/// # rt.block_on(async {
/// # let room_id = RoomId::try_from("!test:localhost").unwrap();
/// # let last_sync_token = "".to_string();
/// let mut cli = Client::new(homeserver, None).unwrap();
/// let mut client = Client::new(homeserver).unwrap();
///
/// let mut builder = MessagesRequestBuilder::new();
/// builder.room_id(room_id)
/// .from(last_sync_token)
/// .direction(Direction::Forward);
///
/// cli.room_messages(builder).await.is_err();
/// client.room_messages(builder).await.is_err();
/// # })
/// ```
#[derive(Clone, Debug, Default)]
@ -342,7 +342,8 @@ mod test {
.room_alias_name("room_alias")
.topic("room topic")
.visibility(Visibility::Private);
let cli = Client::new(homeserver, Some(session)).unwrap();
let cli = Client::new(homeserver).unwrap();
cli.restore_login(session).await.unwrap();
assert!(cli.create_room(builder).await.is_ok());
}
@ -378,7 +379,8 @@ mod test {
..Default::default()
});
let cli = Client::new(homeserver, Some(session)).unwrap();
let cli = Client::new(homeserver).unwrap();
cli.restore_login(session).await.unwrap();
assert!(cli.room_messages(builder).await.is_ok());
}
}

View File

@ -25,7 +25,7 @@ matrix-sdk-common = { version = "0.1.0", path = "../matrix_sdk_common" }
matrix-sdk-crypto = { version = "0.1.0", path = "../matrix_sdk_crypto", optional = true }
# Misc dependencies
thiserror = "1.0.17"
thiserror = "1.0.18"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "0.2.21"
@ -35,7 +35,6 @@ features = ["sync", "fs"]
[dev-dependencies]
matrix-sdk-test = { version = "0.1.0", path = "../matrix_sdk_test" }
http = "0.2.1"
dirs = "2.0.2"
tracing-subscriber = "0.2.5"
tempfile = "3.1.0"

View File

@ -195,8 +195,8 @@ impl BaseClient {
///
/// * `session` - An optional session if the user already has one from a
/// previous login call.
pub fn new(session: Option<Session>) -> Result<Self> {
BaseClient::new_helper(session, None)
pub fn new() -> Result<Self> {
BaseClient::new_helper(None)
}
/// Create a new client.
@ -208,22 +208,13 @@ impl BaseClient {
///
/// * `store` - An open state store implementation that will be used through
/// the lifetime of the client.
pub fn new_with_state_store(
session: Option<Session>,
store: Box<dyn StateStore>,
) -> Result<Self> {
BaseClient::new_helper(session, Some(store))
pub fn new_with_state_store(store: Box<dyn StateStore>) -> Result<Self> {
BaseClient::new_helper(Some(store))
}
fn new_helper(session: Option<Session>, store: Option<Box<dyn StateStore>>) -> Result<Self> {
#[cfg(feature = "encryption")]
let olm = match &session {
Some(s) => Some(OlmMachine::new(&s.user_id, &s.device_id)),
None => None,
};
fn new_helper(store: Option<Box<dyn StateStore>>) -> Result<Self> {
Ok(BaseClient {
session: Arc::new(RwLock::new(session)),
session: Arc::new(RwLock::new(None)),
sync_token: Arc::new(RwLock::new(None)),
joined_rooms: Arc::new(RwLock::new(HashMap::new())),
invited_rooms: Arc::new(RwLock::new(HashMap::new())),
@ -234,7 +225,7 @@ impl BaseClient {
state_store: Arc::new(RwLock::new(store)),
needs_state_store_sync: Arc::new(AtomicBool::from(true)),
#[cfg(feature = "encryption")]
olm: Arc::new(Mutex::new(olm)),
olm: Arc::new(Mutex::new(None)),
})
}
@ -351,26 +342,49 @@ impl BaseClient {
device_id: response.device_id.clone(),
user_id: response.user_id.clone(),
};
*self.session.write().await = Some(session);
self.restore_login(session).await
}
/// Restore a previously logged in session.
///
/// # Arguments
///
/// * `session` - An session that the user already has from a
/// previous login call.
pub async fn restore_login(&self, session: Session) -> Result<()> {
#[cfg(feature = "encryption")]
{
let mut olm = self.olm.lock().await;
*olm = Some(OlmMachine::new(&response.user_id, &response.device_id));
*olm = Some(OlmMachine::new(&session.user_id, &session.device_id));
}
self.sync_with_state_store().await?;
*self.session.write().await = Some(session);
Ok(())
}
pub(crate) async fn get_or_create_joined_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
pub(crate) async fn get_or_create_joined_room(
&self,
room_id: &RoomId,
) -> Result<Arc<RwLock<Room>>> {
// If this used to be an invited or left room remove them from our other
// hashmaps.
self.invited_rooms.write().await.remove(room_id);
self.left_rooms.write().await.remove(room_id);
if self.invited_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Invited(room_id)).await?;
}
}
if self.left_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Left(room_id)).await?;
}
}
let mut rooms = self.joined_rooms.write().await;
#[allow(clippy::or_fun_call)]
rooms
Ok(rooms
.entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new(
room_id,
@ -382,7 +396,7 @@ impl BaseClient {
.expect("Receiving events while not being logged in")
.user_id,
))))
.clone()
.clone())
}
/// Get a joined room with the given room id.
@ -401,14 +415,21 @@ impl BaseClient {
self.joined_rooms.clone()
}
pub(crate) async fn get_or_create_invited_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
pub(crate) async fn get_or_create_invited_room(
&self,
room_id: &RoomId,
) -> Result<Arc<RwLock<Room>>> {
// Remove the left rooms only here, since a join -> invite action per
// spec can't happen.
self.left_rooms.write().await.remove(room_id);
if self.left_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Left(room_id)).await?;
}
}
let mut rooms = self.invited_rooms.write().await;
#[allow(clippy::or_fun_call)]
rooms
Ok(rooms
.entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new(
room_id,
@ -420,7 +441,7 @@ impl BaseClient {
.expect("Receiving events while not being logged in")
.user_id,
))))
.clone()
.clone())
}
/// Get an invited room with the given room id.
@ -439,15 +460,27 @@ impl BaseClient {
self.invited_rooms.clone()
}
pub(crate) async fn get_or_create_left_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
pub(crate) async fn get_or_create_left_room(
&self,
room_id: &RoomId,
) -> Result<Arc<RwLock<Room>>> {
// If this used to be an invited or joined room remove them from our other
// hashmaps.
self.invited_rooms.write().await.remove(room_id);
self.joined_rooms.write().await.remove(room_id);
if self.invited_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Invited(room_id)).await?;
}
}
if self.joined_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Joined(room_id)).await?;
}
}
let mut rooms = self.left_rooms.write().await;
#[allow(clippy::or_fun_call)]
rooms
Ok(rooms
.entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new(
room_id,
@ -459,7 +492,7 @@ impl BaseClient {
.expect("Receiving events while not being logged in")
.user_id,
))))
.clone()
.clone())
}
/// Get an left room with the given room id.
@ -523,13 +556,14 @@ impl BaseClient {
&self,
room_id: &RoomId,
event: &mut EventJson<RoomEvent>,
) -> (Option<EventJson<RoomEvent>>, bool) {
) -> Result<(Option<EventJson<RoomEvent>>, bool)> {
// if the event is a m.room.member event the server will sometimes
// send the `prev_content` field as part of the unsigned field this extracts and
// places it where everything else expects it.
if let Some(ev) = deserialize_prev_content(event) {
*event = ev;
if let Some(e) = deserialize_prev_content(event) {
*event = e;
}
match event.deserialize() {
#[allow(unused_mut)]
Ok(mut e) => {
@ -550,7 +584,7 @@ impl BaseClient {
}
}
let room_lock = self.get_or_create_joined_room(&room_id).await;
let room_lock = self.get_or_create_joined_room(&room_id).await?;
let mut room = room_lock.write().await;
if let RoomEvent::RoomMember(mem_event) = &mut e {
@ -563,12 +597,12 @@ impl BaseClient {
self.invalidate_group_session(room_id).await;
}
(decrypted_event, changed)
Ok((decrypted_event, changed))
} else {
(decrypted_event, room.receive_timeline_event(&e))
Ok((decrypted_event, room.receive_timeline_event(&e)))
}
}
_ => (None, false),
_ => Ok((None, false)),
}
}
@ -582,8 +616,12 @@ impl BaseClient {
/// * `room_id` - The unique id of the room the event belongs to.
///
/// * `event` - The event that should be handled by the client.
pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool {
let room_lock = self.get_or_create_joined_room(room_id).await;
pub async fn receive_joined_state_event(
&self,
room_id: &RoomId,
event: &StateEvent,
) -> Result<bool> {
let room_lock = self.get_or_create_joined_room(room_id).await?;
let mut room = room_lock.write().await;
if let StateEvent::RoomMember(e) = event {
@ -596,9 +634,9 @@ impl BaseClient {
self.invalidate_group_session(room_id).await;
}
changed
Ok(changed)
} else {
room.receive_state_event(event)
Ok(room.receive_state_event(event))
}
}
@ -616,10 +654,10 @@ impl BaseClient {
&self,
room_id: &RoomId,
event: &AnyStrippedStateEvent,
) -> bool {
let room_lock = self.get_or_create_invited_room(room_id).await;
) -> Result<bool> {
let room_lock = self.get_or_create_invited_room(room_id).await?;
let mut room = room_lock.write().await;
room.receive_stripped_state_event(event)
Ok(room.receive_stripped_state_event(event))
}
/// Receive a timeline event for a room the user has left and update the client state.
@ -636,14 +674,14 @@ impl BaseClient {
&self,
room_id: &RoomId,
event: &EventJson<RoomEvent>,
) -> bool {
) -> Result<bool> {
match event.deserialize() {
Ok(e) => {
let room_lock = self.get_or_create_left_room(room_id).await;
let room_lock = self.get_or_create_left_room(room_id).await?;
let mut room = room_lock.write().await;
room.receive_timeline_event(&e)
Ok(room.receive_timeline_event(&e))
}
_ => false,
_ => Ok(false),
}
}
@ -657,10 +695,14 @@ impl BaseClient {
/// * `room_id` - The unique id of the room the event belongs to.
///
/// * `event` - The event that should be handled by the client.
pub async fn receive_left_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool {
let room_lock = self.get_or_create_left_room(room_id).await;
pub async fn receive_left_state_event(
&self,
room_id: &RoomId,
event: &StateEvent,
) -> Result<bool> {
let room_lock = self.get_or_create_left_room(room_id).await?;
let mut room = room_lock.write().await;
room.receive_state_event(event)
Ok(room.receive_state_event(event))
}
/// Receive a presence event from a sync response and updates the client state.
@ -758,9 +800,6 @@ impl BaseClient {
}
}
// TODO do we want to move the rooms to the appropriate HashMaps when the corresponding
// event comes in e.g. move a joined room to a left room when leave event comes?
// when events change state, updated_* signals to StateStore to update database
self.iter_joined_rooms(response).await?;
self.iter_invited_rooms(&response).await?;
@ -788,7 +827,7 @@ impl BaseClient {
let matrix_room = {
for event in &joined_room.state.events {
if let Ok(e) = event.deserialize() {
if self.receive_joined_state_event(&room_id, &e).await {
if self.receive_joined_state_event(&room_id, &e).await? {
updated = true;
}
self.emit_state_event(&room_id, &e, RoomStateType::Joined)
@ -796,7 +835,7 @@ impl BaseClient {
}
}
self.get_or_create_joined_room(&room_id).await.clone()
self.get_or_create_joined_room(&room_id).await?.clone()
};
#[cfg(feature = "encryption")]
@ -829,7 +868,7 @@ impl BaseClient {
let decrypted_event = {
let (decrypt_ev, timeline_update) = self
.receive_joined_timeline_event(room_id, &mut event)
.await;
.await?;
if timeline_update {
updated = true;
};
@ -914,13 +953,13 @@ impl BaseClient {
let matrix_room = {
for event in &left_room.state.events {
if let Ok(e) = event.deserialize() {
if self.receive_left_state_event(&room_id, &e).await {
if self.receive_left_state_event(&room_id, &e).await? {
updated = true;
}
}
}
self.get_or_create_left_room(&room_id).await.clone()
self.get_or_create_left_room(&room_id).await?.clone()
};
for event in &mut left_room.state.events {
@ -935,7 +974,7 @@ impl BaseClient {
*event = e;
}
if self.receive_left_timeline_event(room_id, &event).await {
if self.receive_left_timeline_event(room_id, &event).await? {
updated = true;
};
@ -965,13 +1004,13 @@ impl BaseClient {
let matrix_room = {
for event in &invited_room.invite_state.events {
if let Ok(e) = event.deserialize() {
if self.receive_invite_state_event(&room_id, &e).await {
if self.receive_invite_state_event(&room_id, &e).await? {
updated = true;
}
}
}
self.get_or_create_invited_room(&room_id).await.clone()
self.get_or_create_invited_room(&room_id).await?.clone()
};
for event in &invited_room.invite_state.events {
@ -1569,13 +1608,15 @@ mod test {
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;
fn get_client() -> BaseClient {
async fn get_client() -> BaseClient {
let session = Session {
access_token: "1234".to_owned(),
user_id: UserId::try_from("@example:localhost").unwrap(),
device_id: "DEVICEID".to_owned(),
};
BaseClient::new(Some(session)).unwrap()
let client = BaseClient::new().unwrap();
client.restore_login(session).await.unwrap();
client
}
fn get_room_id() -> RoomId {
@ -1602,7 +1643,7 @@ mod test {
let mut sync_response = EventBuilder::default()
.add_room_event(EventsFile::Member, RoomEvent::RoomMember)
.build_sync_response();
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
let room = client.get_joined_room(&room_id).await;
@ -1644,7 +1685,7 @@ mod test {
.add_custom_left_event(&room_id, member_event(), RoomEvent::RoomMember)
.build_sync_response();
let client = get_client();
let client = get_client().await;
let room = client.get_left_room(&room_id).await;
assert!(room.is_none());
@ -1682,7 +1723,7 @@ mod test {
.add_custom_invited_event(&room_id, member_event(), AnyStrippedStateEvent::RoomMember)
.build_sync_response();
let client = get_client();
let client = get_client().await;
let room = client.get_invited_room(&room_id).await;
assert!(room.is_none());
@ -1718,10 +1759,7 @@ mod test {
use super::*;
use crate::{EventEmitter, SyncRoom};
use matrix_sdk_common::events::{
room::member::{MemberEvent, MembershipChange},
EventJson,
};
use matrix_sdk_common::events::room::member::{MemberEvent, MembershipChange};
use matrix_sdk_common::locks::RwLock;
use std::sync::{
atomic::{AtomicBool, Ordering},
@ -1749,7 +1787,7 @@ mod test {
let room_id = get_room_id();
let passed = Arc::new(AtomicBool::default());
let emitter = EE(Arc::clone(&passed));
let mut client = get_client();
let mut client = get_client().await;
client.event_emitter = Arc::new(RwLock::new(Some(Box::new(emitter))));
@ -1816,7 +1854,7 @@ mod test {
#[async_test]
#[cfg(feature = "encryption")]
async fn test_group_session_invalidation() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
let mut sync_response = EventBuilder::default()

View File

@ -32,11 +32,16 @@ pub enum Error {
#[error("the queried endpoint requires authentication but was called before logging in")]
AuthenticationRequired,
/// An error de/serializing type for the `StateStore`
/// A generic error returned when the state store fails not due to
/// IO or (de)serialization.
#[error("state store: {0}")]
StateStore(String),
/// An error when (de)serializing JSON.
#[error(transparent)]
SerdeJson(#[from] JsonError),
/// An error de/serializing type for the `StateStore`
/// An error representing IO errors.
#[error(transparent)]
IoError(#[from] IoError),

View File

@ -306,13 +306,15 @@ mod test {
use std::convert::TryFrom;
fn get_client() -> BaseClient {
async fn get_client() -> BaseClient {
let session = Session {
access_token: "1234".to_owned(),
user_id: UserId::try_from("@example:example.com").unwrap(),
device_id: "DEVICEID".to_owned(),
};
BaseClient::new(Some(session)).unwrap()
let client = BaseClient::new().unwrap();
client.restore_login(session).await.unwrap();
client
}
#[async_test]
@ -321,7 +323,7 @@ mod test {
let test_vec = Arc::clone(&vec);
let emitter = Box::new(EvEmitterTest(vec));
let client = get_client();
let client = get_client().await;
client.add_event_emitter(emitter).await;
let mut response = sync_response(SyncResponseFile::Default);
@ -352,7 +354,7 @@ mod test {
let test_vec = Arc::clone(&vec);
let emitter = Box::new(EvEmitterTest(vec));
let client = get_client();
let client = get_client().await;
client.add_event_emitter(emitter).await;
let mut response = sync_response(SyncResponseFile::Invite);
@ -371,7 +373,7 @@ mod test {
let test_vec = Arc::clone(&vec);
let emitter = Box::new(EvEmitterTest(vec));
let client = get_client();
let client = get_client().await;
client.add_event_emitter(emitter).await;
let mut response = sync_response(SyncResponseFile::Leave);

View File

@ -591,13 +591,15 @@ mod test {
use std::convert::TryFrom;
use std::ops::Deref;
fn get_client() -> BaseClient {
async fn get_client() -> BaseClient {
let session = Session {
access_token: "1234".to_owned(),
user_id: UserId::try_from("@example:localhost").unwrap(),
device_id: "DEVICEID".to_owned(),
};
BaseClient::new(Some(session)).unwrap()
let client = BaseClient::new().unwrap();
client.restore_login(session).await.unwrap();
client
}
fn get_room_id() -> RoomId {
@ -606,7 +608,7 @@ mod test {
#[async_test]
async fn user_presence() {
let client = get_client();
let client = get_client().await;
let mut response = sync_response(SyncResponseFile::Default);
@ -630,7 +632,7 @@ mod test {
#[async_test]
async fn room_events() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
let user_id = UserId::try_from("@example:localhost").unwrap();
@ -659,7 +661,7 @@ mod test {
#[async_test]
async fn calculate_aliases() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
@ -677,7 +679,7 @@ mod test {
#[async_test]
async fn calculate_alias() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
@ -695,7 +697,7 @@ mod test {
#[async_test]
async fn calculate_name() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
@ -720,7 +722,8 @@ mod test {
user_id: UserId::try_from("@example:localhost").unwrap(),
device_id: "DEVICEID".to_owned(),
};
let client = BaseClient::new(Some(session)).unwrap();
let client = BaseClient::new().unwrap();
client.restore_login(session).await.unwrap();
client.receive_sync_response(&mut response).await.unwrap();
let mut room_names = vec![];
@ -742,7 +745,8 @@ mod test {
user_id: user_id.clone(),
device_id: "DEVICEID".to_owned(),
};
let client = BaseClient::new(Some(session)).unwrap();
let client = BaseClient::new().unwrap();
client.restore_login(session).await.unwrap();
client.receive_sync_response(&mut response).await.unwrap();
let event = EncryptionEvent {

View File

@ -213,13 +213,15 @@ mod test {
use std::convert::TryFrom;
fn get_client() -> BaseClient {
async fn get_client() -> BaseClient {
let session = Session {
access_token: "1234".to_owned(),
user_id: UserId::try_from("@example:localhost").unwrap(),
device_id: "DEVICEID".to_owned(),
};
BaseClient::new(Some(session)).unwrap()
let client = BaseClient::new().unwrap();
client.restore_login(session).await.unwrap();
client
}
fn get_room_id() -> RoomId {
@ -228,7 +230,7 @@ mod test {
#[async_test]
async fn room_member_events() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();
@ -252,7 +254,7 @@ mod test {
#[async_test]
async fn member_presence_events() {
let client = get_client();
let client = get_client().await;
let room_id = get_room_id();

View File

@ -7,6 +7,7 @@ use std::sync::{
Arc,
};
use matrix_sdk_common::identifiers::RoomId;
use matrix_sdk_common::locks::RwLock;
use tokio::fs as async_fs;
use tokio::io::AsyncWriteExt;
@ -166,6 +167,28 @@ impl StateStore for JsonStore {
.await?;
file.write_all(json.as_bytes()).await.map_err(Error::from)
}
async fn delete_room_state(&self, room: RoomState<&RoomId>) -> Result<()> {
let (room_id, room_state) = match &room {
RoomState::Joined(id) => (id, "joined"),
RoomState::Invited(id) => (id, "invited"),
RoomState::Left(id) => (id, "left"),
};
if !self.user_path_set.load(Ordering::SeqCst) {
return Err(Error::StateStore("path for JsonStore not set".into()));
}
let mut to_del = self.path.read().await.clone();
to_del.push("rooms");
to_del.push(&format!("{}/{}.json", room_state, room_id));
if !to_del.exists() {
return Err(Error::StateStore(format!("file {:?} not found", to_del)));
}
tokio::fs::remove_file(to_del).await.map_err(Error::from)
}
}
#[cfg(test)]
@ -277,6 +300,53 @@ mod test {
assert_eq!(invited.get(&id), Some(&Room::new(&id, &user)));
}
#[tokio::test]
async fn test_store_load_join_leave_room_state() {
let dir = tempdir().unwrap();
let path: &Path = dir.path();
let store = JsonStore::open(path).unwrap();
let id = RoomId::try_from("!roomid:example.com").unwrap();
let user = UserId::try_from("@example:example.com").unwrap();
let room = Room::new(&id, &user);
store
.store_room_state(RoomState::Joined(&room))
.await
.unwrap();
assert!(store
.delete_room_state(RoomState::Joined(&id))
.await
.is_ok());
let AllRooms { joined, .. } = store.load_all_rooms().await.unwrap();
// test that we have removed the correct room
assert!(joined.is_empty());
}
#[tokio::test]
async fn test_store_load_invite_join_room_state() {
let dir = tempdir().unwrap();
let path: &Path = dir.path();
let store = JsonStore::open(path).unwrap();
let id = RoomId::try_from("!roomid:example.com").unwrap();
let user = UserId::try_from("@example:example.com").unwrap();
let room = Room::new(&id, &user);
store
.store_room_state(RoomState::Invited(&room))
.await
.unwrap();
assert!(store
.delete_room_state(RoomState::Invited(&id))
.await
.is_ok());
let AllRooms { invited, .. } = store.load_all_rooms().await.unwrap();
// test that we have removed the correct room
assert!(invited.is_empty());
}
#[tokio::test]
async fn test_client_sync_store() {
let dir = tempdir().unwrap();
@ -290,7 +360,8 @@ mod test {
// a sync response to populate our JSON store
let store = Box::new(JsonStore::open(path).unwrap());
let client = BaseClient::new_with_state_store(Some(session.clone()), store).unwrap();
let client = BaseClient::new_with_state_store(store).unwrap();
client.restore_login(session.clone()).await.unwrap();
let mut response = sync_response("../test_data/sync.json");
@ -299,7 +370,8 @@ mod test {
// now syncing the client will update from the state store
let store = Box::new(JsonStore::open(path).unwrap());
let client = BaseClient::new_with_state_store(Some(session.clone()), store).unwrap();
let client = BaseClient::new_with_state_store(store).unwrap();
client.restore_login(session.clone()).await.unwrap();
client.sync_with_state_store().await.unwrap();
// assert the synced client and the logged in client are equal

View File

@ -50,6 +50,11 @@ impl PartialEq for ClientState {
}
impl ClientState {
/// Create a JSON serialize-able `ClientState`.
///
/// This enables non sensitive information to be saved by `JsonStore`.
#[allow(clippy::eval_order_dependence)]
// TODO is this ok ^^^?? https://github.com/rust-lang/rust-clippy/issues/4637
pub async fn from_base_client(client: &BaseClient) -> ClientState {
let BaseClient {
sync_token,
@ -86,14 +91,22 @@ pub trait StateStore: Send + Sync {
/// An `Option::None` should be returned only if the `StateStore` tries to
/// load but no state has been stored.
async fn load_client_state(&self, _: &Session) -> Result<Option<ClientState>>;
/// Load the state of all `Room`s.
///
/// This will be mapped over in the client in order to store `Room`s in an async safe way.
async fn load_all_rooms(&self) -> Result<AllRooms>;
/// Save the current state of the `BaseClient` using the `StateStore::Store` type.
async fn store_client_state(&self, _: ClientState) -> Result<()>;
/// Save the state a single `Room`.
async fn store_room_state(&self, _: RoomState<&Room>) -> Result<()>;
/// Remove state for a room.
///
/// This is used when a user leaves a room or rejects an invitation.
async fn delete_room_state(&self, _room: RoomState<&RoomId>) -> Result<()>;
}
#[cfg(test)]

View File

@ -14,9 +14,9 @@ version = "0.1.0"
js_int = "0.1.5"
ruma-api = "0.16.1"
ruma-client-api = "0.8.0"
ruma-events = "0.21.1"
ruma-events = "0.21.2"
ruma-identifiers = "0.16.1"
instant = { version = "0.1.3", features = ["wasm-bindgen", "now"] }
instant = { version = "0.1.4", features = ["wasm-bindgen", "now"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
uuid = { version = "0.8.1", features = ["v4"] }

View File

@ -27,7 +27,7 @@ zeroize = { version = "1.1.0", features = ["zeroize_derive"] }
url = "2.1.1"
# Misc dependencies
thiserror = "1.0.17"
thiserror = "1.0.18"
tracing = "0.1.14"
atomic = "0.4.5"
dashmap = "3.11.1"

View File

@ -13,7 +13,7 @@
// limitations under the License.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto;
use std::convert::{TryFrom, TryInto};
use std::mem;
#[cfg(feature = "sqlite-cryptostore")]
use std::path::Path;
@ -1404,6 +1404,20 @@ impl OlmMachine {
let count: u64 = one_time_key_count.map_or(0, |c| (*c).into());
self.update_key_count(count);
if let Some(device_list) = &response.device_lists {
for user_id in &device_list.changed {
let user_id = if let Ok(u) = UserId::try_from(user_id.to_owned()) {
u
} else {
continue;
};
if let Err(e) = self.mark_user_as_changed(&user_id).await {
error!("Error marking a tracked user as changed {:?}", e);
}
}
}
for event_result in &mut response.to_device.events {
let event = if let Ok(e) = event_result.deserialize() {
e

View File

@ -0,0 +1,64 @@
{
"account_data": {
"events": []
},
"to_device": {
"events": []
},
"device_lists": {
"changed": [],
"left": []
},
"presence": {
"events": []
},
"rooms": {
"join": {},
"invite": {},
"leave": {
"!SVkFJHzfwvuaIEawgC:localhost": {
"timeline": {
"events": [
{
"content": {
"membership": "leave"
},
"origin_server_ts": 1589578095276,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"type": "m.room.member",
"unsigned": {
"replaces_state": "$blahblah",
"prev_content": {
"avatar_url": null,
"displayname": "me",
"membership": "invite"
},
"prev_sender": "@2example:localhost",
"age": 1757
},
"event_id": "$lQQ116Y-XqcjpSUGpuz36rNntUvOSpTjuaIvmtQ2AwA"
}
],
"prev_batch": "tokenTOKEN",
"limited": false
},
"state": {
"events": []
},
"account_data": {
"events": []
}
}
}
},
"groups": {
"join": {},
"invite": {},
"leave": {}
},
"device_one_time_keys_count": {
"signed_curve25519": 50
},
"next_batch": "s1380317562_757269739_1655566_503953763_334052043_1209862_55290918_65705002_101146"
}