clean up presence updating and member state, add_presence_callback method for AsyncClient
This commit is contained in:
parent
0147e8c0ac
commit
4c7acd4b18
5 changed files with 202 additions and 127 deletions
|
@ -32,6 +32,7 @@ use ruma_api::{Endpoint, Outgoing};
|
|||
use ruma_events::collections::all::RoomEvent;
|
||||
use ruma_events::room::message::MessageEventContent;
|
||||
use ruma_events::EventResult;
|
||||
use ruma_events::presence::PresenceEvent;
|
||||
pub use ruma_events::EventType;
|
||||
use ruma_identifiers::RoomId;
|
||||
|
||||
|
@ -46,6 +47,10 @@ type RoomEventCallback = Box<
|
|||
dyn FnMut(Arc<SyncLock<Room>>, Arc<EventResult<RoomEvent>>) -> BoxFuture<'static, ()> + Send,
|
||||
>;
|
||||
|
||||
type PresenceEventCallback = Box<
|
||||
dyn FnMut(Arc<SyncLock<Room>>, Arc<EventResult<PresenceEvent>>) -> BoxFuture<'static, ()> + Send,
|
||||
>;
|
||||
|
||||
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -61,6 +66,7 @@ pub struct AsyncClient {
|
|||
transaction_id: Arc<AtomicU64>,
|
||||
/// Event callbacks
|
||||
event_callbacks: Arc<Mutex<Vec<RoomEventCallback>>>,
|
||||
presence_callbacks: Arc<Mutex<Vec<PresenceEventCallback>>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for AsyncClient {
|
||||
|
@ -245,6 +251,7 @@ impl AsyncClient {
|
|||
base_client: Arc::new(RwLock::new(BaseClient::new(session)?)),
|
||||
transaction_id: Arc::new(AtomicU64::new(0)),
|
||||
event_callbacks: Arc::new(Mutex::new(Vec::new())),
|
||||
presence_callbacks: Arc::new(Mutex::new(Vec::new())),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -339,6 +346,76 @@ impl AsyncClient {
|
|||
futures.push(Box::new(future));
|
||||
}
|
||||
|
||||
/// Add a callback that will be called every time the client receives a room
|
||||
/// event
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `callback` - The callback that should be called once a RoomEvent is
|
||||
/// received.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// # use matrix_sdk::events::{
|
||||
/// # collections::all::RoomEvent,
|
||||
/// # room::message::{MessageEvent, MessageEventContent, TextMessageEventContent},
|
||||
/// # presence::{PresenceEvent, PresenceEventContent},
|
||||
/// # EventResult,
|
||||
/// # };
|
||||
/// # use matrix_sdk::Room;
|
||||
/// # use std::sync::{Arc, RwLock};
|
||||
/// # use matrix_sdk::AsyncClient;
|
||||
/// # use url::Url;
|
||||
///
|
||||
/// async fn async_cb(room: Arc<RwLock<Room>>, event: Arc<EventResult<PresenceEvent>>) {
|
||||
/// let room = room.read().unwrap();
|
||||
/// let event = if let EventResult::Ok(event) = &*event {
|
||||
/// event
|
||||
/// } else {
|
||||
/// return;
|
||||
/// };
|
||||
/// let PresenceEvent {
|
||||
/// content: PresenceEventContent {
|
||||
/// avatar_url,
|
||||
/// currently_active,
|
||||
/// displayname,
|
||||
/// last_active_ago,
|
||||
/// presence,
|
||||
/// status_msg,
|
||||
/// },
|
||||
/// sender,
|
||||
/// } = event;
|
||||
/// {
|
||||
/// let member = room.members.get(&sender.to_string()).unwrap();
|
||||
/// println!(
|
||||
/// "{} is {}",
|
||||
/// displayname.as_deref().unwrap_or(&sender.to_string()),
|
||||
/// status_msg.as_deref().unwrap_or("not here")
|
||||
/// );
|
||||
/// }
|
||||
/// }
|
||||
/// # fn main() -> Result<(), matrix_sdk::Error> {
|
||||
/// let homeserver = Url::parse("http://localhost:8080")?;
|
||||
///
|
||||
/// let mut client = AsyncClient::new(homeserver, None)?;
|
||||
///
|
||||
/// client.add_presence_callback(async_cb);
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn add_presence_callback<C: 'static>(
|
||||
&mut self,
|
||||
mut callback: impl FnMut(Arc<SyncLock<Room>>, Arc<EventResult<PresenceEvent>>) -> C + 'static + Send,
|
||||
) where
|
||||
C: Future<Output = ()> + Send,
|
||||
{
|
||||
let mut futures = self.presence_callbacks.lock().unwrap();
|
||||
|
||||
let future = move |room, event| callback(room, event).boxed();
|
||||
|
||||
futures.push(Box::new(future));
|
||||
}
|
||||
|
||||
/// Login to the server.
|
||||
///
|
||||
/// # Arguments
|
||||
|
@ -424,13 +501,6 @@ impl AsyncClient {
|
|||
*event = e;
|
||||
}
|
||||
|
||||
for presence in &response.presence.events {
|
||||
let mut client = self.base_client.write().await;
|
||||
if let EventResult::Ok(e) = presence {
|
||||
client.receive_presence_event(&room_id_string, e);
|
||||
}
|
||||
}
|
||||
|
||||
let callbacks = {
|
||||
let mut cb_futures = self.event_callbacks.lock().unwrap();
|
||||
|
||||
|
@ -453,6 +523,34 @@ impl AsyncClient {
|
|||
cb.await;
|
||||
}
|
||||
}
|
||||
|
||||
// After the room has been created and state/timeline events accounted for we use the room_id of the newly created
|
||||
// to add any presence events that relate to a user in the current room. This is not super
|
||||
// efficient but we need a room_id so we would loop through now or later.
|
||||
for presence in &response.presence.events {
|
||||
let mut client = self.base_client.write().await;
|
||||
if let EventResult::Ok(e) = presence {
|
||||
client.receive_presence_event(&room_id_string, e);
|
||||
}
|
||||
|
||||
let callbacks = {
|
||||
let mut cb_futures = self.presence_callbacks.lock().unwrap();
|
||||
let event = if !cb_futures.is_empty() {
|
||||
Arc::new(presence.clone())
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
let mut callbacks = Vec::new();
|
||||
for cb in &mut cb_futures.iter_mut() {
|
||||
callbacks.push(cb(matrix_room.clone(), Arc::clone(&event)));
|
||||
}
|
||||
|
||||
callbacks
|
||||
};
|
||||
for cb in callbacks {
|
||||
cb.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut client = self.base_client.write().await;
|
||||
|
|
|
@ -236,6 +236,7 @@ impl Client {
|
|||
///
|
||||
/// * `event` - The event that should be handled by the client.
|
||||
pub fn receive_presence_event(&mut self, room_id: &str, event: &PresenceEvent) -> bool {
|
||||
// this should be guaranteed to find the room that was just created in the `Client::sync` loop.
|
||||
let mut room = self.get_or_create_room(room_id).write().unwrap();
|
||||
room.receive_presence_event(event)
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ use crate::events::collections::all::{RoomEvent, StateEvent};
|
|||
use crate::events::room::{
|
||||
aliases::AliasesEvent,
|
||||
canonical_alias::CanonicalAliasEvent,
|
||||
member::{MemberEvent, MembershipState},
|
||||
member::{MemberEvent, MembershipChange},
|
||||
name::NameEvent,
|
||||
power_levels::PowerLevelsEvent,
|
||||
};
|
||||
|
@ -148,48 +148,14 @@ impl Room {
|
|||
true
|
||||
}
|
||||
|
||||
// fn remove_member(&mut self, event: &MemberEvent) -> bool {
|
||||
// if let Some(member) = self.members.get_mut(&event.sender.to_string()) {
|
||||
// let changed = member.membership == event.content.membership;
|
||||
// member.membership = event.content.membership;
|
||||
// changed
|
||||
// } else {
|
||||
// false
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn update_joined_member(&mut self, event: &MemberEvent) -> bool {
|
||||
// if let Some(member) = self.members.get_mut(&event.state_key) {
|
||||
// member.update(event);
|
||||
// }
|
||||
|
||||
// false
|
||||
// }
|
||||
|
||||
// fn handle_join(&mut self, event: &MemberEvent) -> bool {
|
||||
// match &event.prev_content {
|
||||
// Some(c) => match c.membership {
|
||||
// MembershipState::Join => self.update_joined_member(event),
|
||||
// MembershipState::Invite => self.add_member(event),
|
||||
// MembershipState::Leave => self.remove_member(event),
|
||||
// _ => false,
|
||||
// },
|
||||
// None => self.add_member(event),
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn handle_leave(&mut self, event: &MemberEvent) -> bool {
|
||||
|
||||
// }
|
||||
|
||||
/// Handle a room.member updating the room state if necessary.
|
||||
///
|
||||
/// Returns true if the joined member list changed, false otherwise.
|
||||
pub fn handle_membership(&mut self, event: &MemberEvent) -> bool {
|
||||
match &event.content.membership {
|
||||
MembershipState::Invite | MembershipState::Join => self.add_member(event),
|
||||
match event.membership_change() {
|
||||
MembershipChange::Invited | MembershipChange::Joined => self.add_member(event),
|
||||
_ => {
|
||||
if let Some(member) = self.members.get_mut(&event.state_key) {
|
||||
if let Some(member) = self.members.get_mut(&event.sender.to_string()) {
|
||||
member.update_member(event)
|
||||
} else {
|
||||
false
|
||||
|
@ -297,98 +263,27 @@ impl Room {
|
|||
|
||||
/// Receive a presence event from an `IncomingResponse` and updates the client state.
|
||||
///
|
||||
/// Returns true if the joined member list changed, false otherwise.
|
||||
/// Returns true if the specific users presence has changed, false otherwise.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `event` - The event of the room.
|
||||
/// * `event` - The presence event for a specified room member.
|
||||
pub fn receive_presence_event(&mut self, event: &PresenceEvent) -> bool {
|
||||
let PresenceEvent {
|
||||
content:
|
||||
PresenceEventContent {
|
||||
avatar_url,
|
||||
currently_active,
|
||||
displayname,
|
||||
last_active_ago,
|
||||
presence,
|
||||
status_msg,
|
||||
},
|
||||
sender,
|
||||
} = event;
|
||||
|
||||
if let Some(user) = self
|
||||
.members
|
||||
.get_mut(&sender.to_string())
|
||||
.get_mut(&event.sender.to_string())
|
||||
.map(|m| &mut m.user)
|
||||
{
|
||||
if user.display_name == *displayname
|
||||
&& user.avatar_url == *avatar_url
|
||||
&& user.presence.as_ref() == Some(presence)
|
||||
&& user.status_msg == *status_msg
|
||||
&& user.last_active_ago == *last_active_ago
|
||||
&& user.currently_active == *currently_active
|
||||
{
|
||||
if user.did_update_presence(event) {
|
||||
false
|
||||
} else {
|
||||
user.presence_events.push(event.clone());
|
||||
*user = User {
|
||||
display_name: displayname.clone(),
|
||||
avatar_url: avatar_url.clone(),
|
||||
presence: Some(presence.clone()),
|
||||
status_msg: status_msg.clone(),
|
||||
last_active_ago: *last_active_ago,
|
||||
currently_active: *currently_active,
|
||||
// TODO better way of moving vec over
|
||||
events: user.events.clone(),
|
||||
presence_events: user.presence_events.clone(),
|
||||
};
|
||||
user.update_presence(event);
|
||||
true
|
||||
}
|
||||
} else {
|
||||
// this is probably an error as we have a `PresenceEvent` for a user
|
||||
// we dont know about
|
||||
// we don't know about
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pub struct User {
|
||||
// /// The human readable name of the user.
|
||||
// pub display_name: Option<String>,
|
||||
// /// The matrix url of the users avatar.
|
||||
// pub avatar_url: Option<String>,
|
||||
// /// The presence of the user, if found.
|
||||
// pub presence: Option<PresenceState>,
|
||||
// /// The presence status message, if found.
|
||||
// pub status_msg: Option<String>,
|
||||
// /// The time, in ms, since the user interacted with the server.
|
||||
// pub last_active_ago: Option<UInt>,
|
||||
// /// If the user should be considered active.
|
||||
// pub currently_active: Option<bool>,
|
||||
// /// The events that created the state of the current user.
|
||||
// // TODO do we want to hold the whole state or just update our structures.
|
||||
// pub events: Vec<Event>,
|
||||
// /// The `PresenceEvent`s connected to this user.
|
||||
// pub presence_events: Vec<PresenceEvent>,
|
||||
// }
|
||||
|
||||
// pub struct RoomMember {
|
||||
// /// The unique mxid of the user.
|
||||
// pub user_id: UserId,
|
||||
// /// The unique id of the room.
|
||||
// pub room_id: Option<RoomId>,
|
||||
// /// If the member is typing.
|
||||
// pub typing: Option<bool>,
|
||||
// /// The user data for this room member.
|
||||
// pub user: User,
|
||||
// /// The users power level.
|
||||
// pub power_level: Option<Int>,
|
||||
// /// The normalized power level of this `RoomMember` (0-100).
|
||||
// pub power_level_norm: Option<Int>,
|
||||
// /// The `MembershipState` of this `RoomMember`.
|
||||
// pub membership: MembershipState,
|
||||
// /// The human readable name of this room member.
|
||||
// pub name: String,
|
||||
// /// The events that created the state of this room member.
|
||||
// pub events: Vec<Event>
|
||||
// }
|
||||
|
|
|
@ -22,7 +22,7 @@ use crate::events::collections::all::{Event, RoomEvent, StateEvent};
|
|||
use crate::events::room::{
|
||||
aliases::AliasesEvent,
|
||||
canonical_alias::CanonicalAliasEvent,
|
||||
member::{MemberEvent, MemberEventContent, MembershipState},
|
||||
member::{MemberEvent, MemberEventContent, MembershipChange, MembershipState},
|
||||
name::NameEvent,
|
||||
power_levels::PowerLevelsEvent,
|
||||
};
|
||||
|
@ -73,15 +73,33 @@ impl RoomMember {
|
|||
power_level: None,
|
||||
power_level_norm: None,
|
||||
membership: event.content.membership,
|
||||
// TODO should this be `sender` ??
|
||||
name: event.state_key.clone(),
|
||||
events: vec![Event::RoomMember(event.clone())],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_member(&mut self, event: &MemberEvent) -> bool {
|
||||
let changed = self.membership == event.content.membership;
|
||||
self.membership = event.content.membership;
|
||||
changed
|
||||
use MembershipChange::*;
|
||||
|
||||
match event.membership_change() {
|
||||
ProfileChanged => {
|
||||
self.user.display_name = event.content.displayname.clone();
|
||||
self.user.avatar_url = event.content.avatar_url.clone();
|
||||
true
|
||||
},
|
||||
Banned | Kicked | KickedAndBanned
|
||||
| InvitationRejected | InvitationRevoked
|
||||
| Left | Unbanned | Joined | Invited => {
|
||||
self.membership = event.content.membership;
|
||||
true
|
||||
},
|
||||
NotImplemented => false,
|
||||
None => false,
|
||||
// TODO should this be handled somehow ??
|
||||
Error => false,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_power(&mut self, event: &PowerLevelsEvent) -> bool {
|
||||
|
|
|
@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock};
|
|||
use super::UserId;
|
||||
use crate::api::r0 as api;
|
||||
use crate::events::collections::all::{Event, RoomEvent, StateEvent};
|
||||
use crate::events::presence::{PresenceEvent, PresenceState};
|
||||
use crate::events::presence::{PresenceEvent, PresenceEventContent, PresenceState};
|
||||
use crate::events::room::{
|
||||
aliases::AliasesEvent,
|
||||
canonical_alias::CanonicalAliasEvent,
|
||||
|
@ -74,4 +74,67 @@ impl User {
|
|||
presence_events: Vec::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// If the current `PresenceEvent` updated the state of this `User`.
|
||||
///
|
||||
/// Returns true if the specific users presence has changed, false otherwise.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `presence` - The presence event for a this room member.
|
||||
pub fn did_update_presence(&self, presence: &PresenceEvent) -> bool {
|
||||
let PresenceEvent {
|
||||
content:
|
||||
PresenceEventContent {
|
||||
avatar_url,
|
||||
currently_active,
|
||||
displayname,
|
||||
last_active_ago,
|
||||
presence,
|
||||
status_msg,
|
||||
},
|
||||
..
|
||||
} = presence;
|
||||
self.display_name == *displayname
|
||||
&& self.avatar_url == *avatar_url
|
||||
&& self.presence.as_ref() == Some(presence)
|
||||
&& self.status_msg == *status_msg
|
||||
&& self.last_active_ago == *last_active_ago
|
||||
&& self.currently_active == *currently_active
|
||||
}
|
||||
|
||||
/// Updates the `User`s presence.
|
||||
///
|
||||
/// This should only be used if `did_update_presence` was true.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `presence` - The presence event for a this room member.
|
||||
pub fn update_presence(&mut self, presence_ev: &PresenceEvent) {
|
||||
let PresenceEvent {
|
||||
content:
|
||||
PresenceEventContent {
|
||||
avatar_url,
|
||||
currently_active,
|
||||
displayname,
|
||||
last_active_ago,
|
||||
presence,
|
||||
status_msg,
|
||||
},
|
||||
..
|
||||
} = presence_ev;
|
||||
|
||||
self.presence_events.push(presence_ev.clone());
|
||||
*self = User {
|
||||
display_name: displayname.clone(),
|
||||
avatar_url: avatar_url.clone(),
|
||||
presence: Some(presence.clone()),
|
||||
status_msg: status_msg.clone(),
|
||||
last_active_ago: *last_active_ago,
|
||||
currently_active: *currently_active,
|
||||
// TODO better way of moving vec over
|
||||
events: self.events.clone(),
|
||||
presence_events: self.presence_events.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue