Browse Source

room: Use timeline to watch read status change triggers

Should avoid a race condition where the data in the timeline is
not updated yet when we check if receipts have changed.
fractal-9
Kévin Commaille 2 years ago
parent
commit
b04471e693
No known key found for this signature in database
GPG Key ID: C971D9DBC9D678D
  1. 23
      Cargo.lock
  2. 8
      Cargo.toml
  3. 72
      src/session/model/room/mod.rs
  4. 88
      src/session/model/room/timeline/mod.rs

23
Cargo.lock generated

@ -2627,9 +2627,9 @@ dependencies = [
[[package]] [[package]]
name = "indexed_db_futures" name = "indexed_db_futures"
version = "0.4.2" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0704b71f13f81b5933d791abf2de26b33c40935143985220299a357721166706" checksum = "43315957678a70eb21fb0d2384fe86dde0d6c859a01e24ce127eb65a0143d28c"
dependencies = [ dependencies = [
"accessory", "accessory",
"cfg-if", "cfg-if",
@ -3137,7 +3137,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk" name = "matrix-sdk"
version = "0.7.1" version = "0.7.1"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"anymap2", "anymap2",
"aquamarine", "aquamarine",
@ -3187,7 +3187,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-base" name = "matrix-sdk-base"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"async-trait", "async-trait",
@ -3211,7 +3211,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-common" name = "matrix-sdk-common"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"futures-core", "futures-core",
@ -3232,7 +3232,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-crypto" name = "matrix-sdk-crypto"
version = "0.7.2" version = "0.7.2"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"aes", "aes",
"as_variant", "as_variant",
@ -3273,7 +3273,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-indexeddb" name = "matrix-sdk-indexeddb"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -3301,7 +3301,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-qrcode" name = "matrix-sdk-qrcode"
version = "0.7.1" version = "0.7.1"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"qrcode", "qrcode",
@ -3313,7 +3313,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-sqlite" name = "matrix-sdk-sqlite"
version = "0.7.1" version = "0.7.1"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"deadpool-sqlite", "deadpool-sqlite",
@ -3335,7 +3335,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-store-encryption" name = "matrix-sdk-store-encryption"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"base64", "base64",
"blake3", "blake3",
@ -3354,7 +3354,7 @@ dependencies = [
[[package]] [[package]]
name = "matrix-sdk-ui" name = "matrix-sdk-ui"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/matrix-org/matrix-rust-sdk.git?rev=7f4e79e2a3bb07fa189c62987a06b3f78f2e4286#7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" source = "git+https://github.com/zecakeh/matrix-rust-sdk.git?rev=2837564fea7018dfc1e355f6cc051e35ce86ff66#2837564fea7018dfc1e355f6cc051e35ce86ff66"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"async-once-cell", "async-once-cell",
@ -3382,6 +3382,7 @@ dependencies = [
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
"tracing", "tracing",
"unicode-normalization", "unicode-normalization",
] ]

8
Cargo.toml

@ -66,8 +66,8 @@ shumate = { package = "libshumate", version = "0.6" }
sourceview = { package = "sourceview5", version = "0.9" } sourceview = { package = "sourceview5", version = "0.9" }
[dependencies.matrix-sdk] [dependencies.matrix-sdk]
git = "https://github.com/matrix-org/matrix-rust-sdk.git" git = "https://github.com/zecakeh/matrix-rust-sdk.git"
rev = "7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" rev = "2837564fea7018dfc1e355f6cc051e35ce86ff66"
features = [ features = [
"socks", "socks",
"sso-login", "sso-login",
@ -76,8 +76,8 @@ features = [
] ]
[dependencies.matrix-sdk-ui] [dependencies.matrix-sdk-ui]
git = "https://github.com/matrix-org/matrix-rust-sdk.git" git = "https://github.com/zecakeh/matrix-rust-sdk.git"
rev = "7f4e79e2a3bb07fa189c62987a06b3f78f2e4286" rev = "2837564fea7018dfc1e355f6cc051e35ce86ff66"
default-features = false default-features = false
features = ["e2e-encryption", "native-tls"] features = ["e2e-encryption", "native-tls"]

72
src/session/model/room/mod.rs

@ -21,7 +21,6 @@ use matrix_sdk::{
use ruma::{ use ruma::{
api::client::error::{ErrorKind, RetryAfter}, api::client::error::{ErrorKind, RetryAfter},
events::{ events::{
receipt::{ReceiptEventContent, ReceiptType},
room::{ room::{
avatar::RoomAvatarEventContent, encryption::SyncRoomEncryptionEvent, avatar::RoomAvatarEventContent, encryption::SyncRoomEncryptionEvent,
guest_access::GuestAccess, history_visibility::HistoryVisibility, guest_access::GuestAccess, history_visibility::HistoryVisibility,
@ -217,7 +216,6 @@ mod imp {
#[property(get, builder(HistoryVisibilityValue::default()))] #[property(get, builder(HistoryVisibilityValue::default()))]
pub history_visibility: Cell<HistoryVisibilityValue>, pub history_visibility: Cell<HistoryVisibilityValue>,
pub typing_drop_guard: OnceCell<EventHandlerDropGuard>, pub typing_drop_guard: OnceCell<EventHandlerDropGuard>,
pub receipts_drop_guard: OnceCell<EventHandlerDropGuard>,
} }
#[glib::object_subclass] #[glib::object_subclass]
@ -519,7 +517,6 @@ impl Room {
self.load_predecessor(); self.load_predecessor();
self.load_tombstone(); self.load_tombstone();
self.load_category(); self.load_category();
self.set_up_receipts();
self.set_up_typing(); self.set_up_typing();
self.init_timeline(); self.init_timeline();
self.set_up_is_encrypted(); self.set_up_is_encrypted();
@ -628,15 +625,14 @@ impl Room {
} }
fn init_timeline(&self) { fn init_timeline(&self) {
let timeline = Timeline::new(self); let timeline = self.imp().timeline.get_or_init(|| Timeline::new(self));
self.imp().timeline.set(timeline.clone()).unwrap();
timeline.sdk_items().connect_items_changed(clone!( timeline.connect_read_change_trigger(clone!(
#[weak(rename_to = obj)] #[weak(rename_to = obj)]
self, self,
move |_, _, _, _| { move |_| {
spawn!(async move { spawn!(glib::Priority::DEFAULT_IDLE, async move {
obj.update_is_read().await; obj.handle_read_change_trigger().await
}); });
} }
)); ));
@ -1164,59 +1160,6 @@ impl Room {
imp.typing_drop_guard.set(drop_guard).unwrap(); imp.typing_drop_guard.set(drop_guard).unwrap();
} }
/// Start listening to read receipts events.
fn set_up_receipts(&self) {
let imp = self.imp();
if imp.receipts_drop_guard.get().is_some() {
// The event handler is already set up.
return;
}
// Listen to changes in the read receipts.
let matrix_room = self.matrix_room();
let room_weak = glib::SendWeakRef::from(self.downgrade());
let handle = matrix_room.add_event_handler(
move |event: SyncEphemeralRoomEvent<ReceiptEventContent>| {
let room_weak = room_weak.clone();
async move {
let ctx = glib::MainContext::default();
ctx.spawn(async move {
spawn!(async move {
if let Some(obj) = room_weak.upgrade() {
obj.handle_receipt_event(event.content)
}
});
});
}
},
);
let drop_guard = matrix_room.client().event_handler_drop_guard(handle);
imp.receipts_drop_guard.set(drop_guard).unwrap();
}
fn handle_receipt_event(&self, content: ReceiptEventContent) {
let Some(session) = self.session() else {
return;
};
let own_user_id = session.user_id();
for (_event_id, receipts) in content.iter() {
if let Some(users) = receipts.get(&ReceiptType::Read) {
if let Some(receipt) = users.get(own_user_id) {
tracing::trace!("{}: Got own receipt: {receipt:?}", self.human_readable_id());
spawn!(clone!(
#[weak(rename_to = obj)]
self,
async move {
obj.update_is_read().await;
}
));
}
}
}
}
fn handle_typing_event(&self, content: TypingEventContent) { fn handle_typing_event(&self, content: TypingEventContent) {
let Some(session) = self.session() else { let Some(session) = self.session() else {
return; return;
@ -1317,8 +1260,9 @@ impl Room {
self.notify_highlight(); self.notify_highlight();
} }
async fn update_is_read(&self) { /// Handle the trigger emitted when a read change might have occurred.
tracing::trace!("{}::update_is_read", self.human_readable_id()); async fn handle_read_change_trigger(&self) {
tracing::trace!("{}::handle_read_change_trigger", self.human_readable_id());
if let Some(has_unread) = self.timeline().has_unread_messages().await { if let Some(has_unread) = self.timeline().has_unread_messages().await {
self.set_is_read(!has_unread); self.set_is_read(!has_unread);
} }

88
src/session/model/room/timeline/mod.rs

@ -4,7 +4,12 @@ mod virtual_item;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use futures_util::StreamExt; use futures_util::StreamExt;
use gtk::{gio, glib, glib::clone, prelude::*, subclass::prelude::*}; use gtk::{
gio, glib,
glib::{clone, closure_local},
prelude::*,
subclass::prelude::*,
};
use matrix_sdk::Error as MatrixError; use matrix_sdk::Error as MatrixError;
use matrix_sdk_ui::{ use matrix_sdk_ui::{
eyeball_im::VectorDiff, eyeball_im::VectorDiff,
@ -72,6 +77,9 @@ mod imp {
marker::PhantomData, marker::PhantomData,
}; };
use glib::subclass::Signal;
use once_cell::sync::Lazy;
use super::*; use super::*;
#[derive(Debug, glib::Properties)] #[derive(Debug, glib::Properties)]
@ -98,14 +106,15 @@ mod imp {
pub state: Cell<TimelineState>, pub state: Cell<TimelineState>,
/// Whether this timeline has a typing row. /// Whether this timeline has a typing row.
pub has_typing: Cell<bool>, pub has_typing: Cell<bool>,
pub diff_handle: OnceCell<AbortHandle>,
pub back_pagination_status_handle: OnceCell<AbortHandle>,
/// Whether the timeline is empty. /// Whether the timeline is empty.
#[property(get = Self::is_empty)] #[property(get = Self::is_empty)]
pub empty: PhantomData<bool>, pub empty: PhantomData<bool>,
/// Whether the timeline has the `m.room.create` event of the room. /// Whether the timeline has the `m.room.create` event of the room.
#[property(get)] #[property(get)]
pub has_room_create: Cell<bool>, pub has_room_create: Cell<bool>,
pub diff_handle: OnceCell<AbortHandle>,
pub back_pagination_status_handle: OnceCell<AbortHandle>,
pub read_receipts_changed_handle: OnceCell<AbortHandle>,
} }
impl Default for Timeline { impl Default for Timeline {
@ -129,10 +138,11 @@ mod imp {
event_map: Default::default(), event_map: Default::default(),
state: Default::default(), state: Default::default(),
has_typing: Default::default(), has_typing: Default::default(),
diff_handle: Default::default(),
back_pagination_status_handle: Default::default(),
empty: Default::default(), empty: Default::default(),
has_room_create: Default::default(), has_room_create: Default::default(),
diff_handle: Default::default(),
back_pagination_status_handle: Default::default(),
read_receipts_changed_handle: Default::default(),
} }
} }
} }
@ -145,6 +155,12 @@ mod imp {
#[glib::derived_properties] #[glib::derived_properties]
impl ObjectImpl for Timeline { impl ObjectImpl for Timeline {
fn signals() -> &'static [Signal] {
static SIGNALS: Lazy<Vec<Signal>> =
Lazy::new(|| vec![Signal::builder("read-change-trigger").build()]);
SIGNALS.as_ref()
}
fn dispose(&self) { fn dispose(&self) {
if let Some(handle) = self.diff_handle.get() { if let Some(handle) = self.diff_handle.get() {
handle.abort(); handle.abort();
@ -152,6 +168,9 @@ mod imp {
if let Some(handle) = self.back_pagination_status_handle.get() { if let Some(handle) = self.back_pagination_status_handle.get() {
handle.abort(); handle.abort();
} }
if let Some(handle) = self.read_receipts_changed_handle.get() {
handle.abort();
}
} }
} }
@ -366,6 +385,8 @@ impl Timeline {
if self.empty() != was_empty { if self.empty() != was_empty {
self.notify_empty(); self.notify_empty();
} }
self.emit_read_change_trigger();
} }
/// Update `nb` items' headers starting at `pos`. /// Update `nb` items' headers starting at `pos`.
@ -633,6 +654,7 @@ impl Timeline {
imp.diff_handle.set(diff_handle.abort_handle()).unwrap(); imp.diff_handle.set(diff_handle.abort_handle()).unwrap();
self.setup_back_pagination_status().await; self.setup_back_pagination_status().await;
self.setup_read_receipts().await;
} }
/// Setup the back-pagination status. /// Setup the back-pagination status.
@ -675,6 +697,43 @@ impl Timeline {
.unwrap(); .unwrap();
} }
/// Listen to read receipts changes.
async fn setup_read_receipts(&self) {
let Some(room) = self.room() else {
return;
};
let room_id = room.room_id().to_owned();
let matrix_timeline = self.matrix_timeline();
let stream = matrix_timeline
.subscribe_own_user_read_receipts_changed()
.await;
let obj_weak = glib::SendWeakRef::from(self.downgrade());
let fut = stream.for_each(move |_| {
let obj_weak = obj_weak.clone();
let room_id = room_id.clone();
async move {
let ctx = glib::MainContext::default();
ctx.spawn(async move {
spawn!(async move {
if let Some(obj) = obj_weak.upgrade() {
obj.emit_read_change_trigger();
} else {
error!("Could not emit read change trigger for room {room_id}: could not upgrade weak reference");
}
});
});
}
});
let handle = spawn_tokio!(fut);
self.imp()
.read_receipts_changed_handle
.set(handle.abort_handle())
.unwrap();
}
/// The underlying SDK timeline. /// The underlying SDK timeline.
pub fn matrix_timeline(&self) -> Arc<SdkTimeline> { pub fn matrix_timeline(&self) -> Arc<SdkTimeline> {
self.imp().timeline.get().unwrap().clone() self.imp().timeline.get().unwrap().clone()
@ -812,6 +871,25 @@ impl Timeline {
events events
} }
/// Emit the trigger that a read change might have occurred.
fn emit_read_change_trigger(&self) {
self.emit_by_name::<()>("read-change-trigger", &[]);
}
/// Connect to the trigger emitted when a read change might have occurred.
pub fn connect_read_change_trigger<F: Fn(&Self) + 'static>(
&self,
f: F,
) -> glib::SignalHandlerId {
self.connect_closure(
"read-change-trigger",
true,
closure_local!(move |obj: Self| {
f(&obj);
}),
)
}
} }
/// Whether the given event is an `m.room.create` event. /// Whether the given event is an `m.room.create` event.

Loading…
Cancel
Save