@ -1,7 +1,11 @@
mod timeline_item ;
mod virtual_item ;
use std ::{ collections ::HashMap , ops ::ControlFlow , sync ::Arc } ;
use std ::{
collections ::{ HashMap , VecDeque } ,
ops ::ControlFlow ,
sync ::Arc ,
} ;
use futures_util ::StreamExt ;
use gtk ::{
@ -11,7 +15,7 @@ use gtk::{
subclass ::prelude ::* ,
} ;
use matrix_sdk_ui ::{
eyeball_im ::VectorDiff ,
eyeball_im ::{ Vector , VectorDiff } ,
timeline ::{
default_event_filter , RoomExt , Timeline as SdkTimeline , TimelineEventItemId ,
TimelineItem as SdkTimelineItem ,
@ -28,7 +32,7 @@ use tokio::task::AbortHandle;
use tracing ::error ;
pub ( crate ) use self ::{
timeline_item ::{ TimelineItem , TimelineItemImpl } ,
timeline_item ::{ TimelineItem , TimelineItemExt , TimelineItem Impl } ,
virtual_item ::{ VirtualItem , VirtualItemKind } ,
} ;
use super ::{ Event , Room } ;
@ -208,16 +212,16 @@ mod imp {
let matrix_timeline = Arc ::new ( matrix_timeline ) ;
self . matrix_timeline
. set ( matrix_timeline . clone ( ) )
. expect ( "matrix timeline wa s uninitialized" ) ;
. expect ( "matrix timeline i s uninitialized" ) ;
let ( values , timeline_stream ) = matrix_timeline . subscribe ( ) . await ;
let ( values , timeline_stream ) = matrix_timeline . subscribe_batched ( ) . await ;
if ! values . is_empty ( ) {
self . update ( VectorDiff ::Append { values } ) ;
self . update_with_single_diff ( VectorDiff ::Append { values } , & room ) ;
}
let obj_weak = glib ::SendWeakRef ::from ( self . obj ( ) . downgrade ( ) ) ;
let fut = timeline_stream . for_each ( move | diff | {
let fut = timeline_stream . for_each ( move | diff_list | {
let obj_weak = obj_weak . clone ( ) ;
let room_id = room_id . clone ( ) ;
async move {
@ -225,7 +229,7 @@ mod imp {
ctx . spawn ( async move {
spawn ! ( async move {
if let Some ( obj ) = obj_weak . upgrade ( ) {
obj . imp ( ) . update ( diff ) ;
obj . imp ( ) . update_with_diff_list ( diff_list ) ;
} else {
error ! (
" Could not send timeline diff for room { room_id } : \
@ -238,7 +242,9 @@ mod imp {
} ) ;
let diff_handle = spawn_tokio ! ( fut ) ;
self . diff_handle . set ( diff_handle . abort_handle ( ) ) . unwrap ( ) ;
self . diff_handle
. set ( diff_handle . abort_handle ( ) )
. expect ( "handle is uninitialized" ) ;
self . watch_read_receipts ( ) . await ;
self . set_state ( TimelineState ::Ready ) ;
@ -266,15 +272,148 @@ mod imp {
self . obj ( ) . notify_has_room_create ( ) ;
}
/// Update this `Timeline` with the given diff.
#[ allow(clippy::too_many_lines) ]
fn update ( & self , diff : VectorDiff < Arc < SdkTimelineItem > > ) {
/// Update this timeline with the given diff list.
fn update_with_diff_list ( & self , diff_list : Vec < VectorDiff < Arc < SdkTimelineItem > > > ) {
let Some ( room ) = self . room . upgrade ( ) else {
return ;
} ;
let sdk_items = & self . sdk_items ;
let was_empty = self . is_empty ( ) ;
let diff_list = self . minimize_diff_list ( diff_list ) ;
for diff in diff_list {
self . update_with_single_diff ( diff , & room ) ;
}
let obj = self . obj ( ) ;
if self . is_empty ( ) ! = was_empty {
obj . notify_is_empty ( ) ;
}
obj . emit_read_change_trigger ( ) ;
}
/// Attempt to minimize the given list of diffs.
///
/// This is necessary because the SDK diffs are not always optimized,
/// e.g. an item is removed then re-added, which creates jumps in the
/// room history.
fn minimize_diff_list (
& self ,
diff_list : Vec < VectorDiff < Arc < SdkTimelineItem > > > ,
) -> Vec < VectorDiff < Arc < SdkTimelineItem > > > {
let mut old_items = Vec ::with_capacity ( self . sdk_items . n_items ( ) as usize ) ;
// Construct the current state.
for item in self . sdk_items . iter ::< TimelineItem > ( ) {
let Ok ( item ) = item else {
// The list changed, return early.
return diff_list ;
} ;
old_items . push ( item . timeline_id ( ) ) ;
}
// Get the new state by applying the diffs.
let mut value_map = HashMap ::with_capacity ( diff_list . len ( ) ) ;
let mut new_items = old_items . iter ( ) . cloned ( ) . collect ::< VecDeque < _ > > ( ) ;
for diff in & diff_list {
match diff {
VectorDiff ::Append { values } = > {
new_items . reserve ( values . len ( ) ) ;
for value in values {
let timeline_id = value . unique_id ( ) . 0. clone ( ) ;
value_map . insert ( timeline_id . clone ( ) , value . clone ( ) ) ;
new_items . push_back ( timeline_id ) ;
}
}
VectorDiff ::PushFront { value } = > {
let timeline_id = value . unique_id ( ) . 0. clone ( ) ;
value_map . insert ( timeline_id . clone ( ) , value . clone ( ) ) ;
new_items . push_front ( timeline_id ) ;
}
VectorDiff ::PushBack { value } = > {
let timeline_id = value . unique_id ( ) . 0. clone ( ) ;
value_map . insert ( timeline_id . clone ( ) , value . clone ( ) ) ;
new_items . push_back ( timeline_id ) ;
}
VectorDiff ::PopFront = > {
new_items . pop_front ( ) ;
}
VectorDiff ::PopBack = > {
new_items . pop_back ( ) ;
}
VectorDiff ::Insert { index , value } = > {
let timeline_id = value . unique_id ( ) . 0. clone ( ) ;
value_map . insert ( timeline_id . clone ( ) , value . clone ( ) ) ;
new_items . insert ( * index , timeline_id ) ;
}
VectorDiff ::Set { index , value } = > {
let timeline_id = value . unique_id ( ) . 0. clone ( ) ;
value_map . insert ( timeline_id . clone ( ) , value . clone ( ) ) ;
let item = new_items . get_mut ( * index ) . expect ( "item exists" ) ;
* item = timeline_id ;
}
VectorDiff ::Remove { index } = > {
new_items . remove ( * index ) ;
}
VectorDiff ::Clear | VectorDiff ::Truncate { .. } | VectorDiff ::Reset { .. } = > {
// Minimizing this will probably make a worse diff, so we return early.
return diff_list ;
}
}
}
let new_items = Vec ::from ( new_items ) ;
let mut min_diff_list = Vec ::with_capacity ( diff_list . len ( ) ) ;
let mut index = 0 ;
let mut n_items = old_items . len ( ) ;
for result in diff ::slice ( & old_items , & new_items ) {
match result {
diff ::Result ::Left ( _ ) = > {
min_diff_list . push ( VectorDiff ::Remove { index } ) ;
n_items - = 1 ;
}
diff ::Result ::Both ( timeline_id , _ ) = > {
if let Some ( value ) = value_map . get ( timeline_id ) . cloned ( ) {
min_diff_list . push ( VectorDiff ::Set { index , value } ) ;
}
index + = 1 ;
}
diff ::Result ::Right ( timeline_id ) = > {
let value = value_map
. get ( timeline_id )
. expect ( "value exists for added item" )
. clone ( ) ;
if index = = n_items {
if let Some ( VectorDiff ::Append { values } ) = min_diff_list . last_mut ( ) {
values . push_back ( value ) ;
} else {
min_diff_list . push ( VectorDiff ::Append {
values : Vector ::from ( [ value ] ) ,
} ) ;
}
} else {
min_diff_list . push ( VectorDiff ::Insert { index , value } ) ;
}
index + = 1 ;
n_items + = 1 ;
}
}
}
min_diff_list
}
/// Update this timeline with the given diff.
#[ allow(clippy::too_many_lines) ]
fn update_with_single_diff ( & self , diff : VectorDiff < Arc < SdkTimelineItem > > , room : & Room ) {
match diff {
VectorDiff ::Append { values } = > {
let new_list = values
@ -282,10 +421,10 @@ mod imp {
. map ( | item | self . create_item ( & item ) )
. collect ::< Vec < _ > > ( ) ;
let pos = sdk_items . n_items ( ) ;
let pos = self . sdk_items . n_items ( ) ;
let added = new_list . len ( ) as u32 ;
sdk_items . extend_from_slice ( & new_list ) ;
self . sdk_items . extend_from_slice ( & new_list ) ;
self . update_items_headers ( pos , added . max ( 1 ) ) ;
// Try to update the latest unread message.
@ -294,14 +433,14 @@ mod imp {
) ;
}
VectorDiff ::Clear = > {
sdk_items . remove_all ( ) ;
self . sdk_items . remove_all ( ) ;
self . event_map . borrow_mut ( ) . clear ( ) ;
self . set_has_room_create ( false ) ;
}
VectorDiff ::PushFront { value } = > {
let item = self . create_item ( & value ) ;
sdk_items . insert ( 0 , & item ) ;
self . sdk_items . insert ( 0 , & item ) ;
self . update_items_headers ( 0 , 1 ) ;
// Try to update the latest unread message.
@ -311,8 +450,8 @@ mod imp {
}
VectorDiff ::PushBack { value } = > {
let item = self . create_item ( & value ) ;
let pos = sdk_items . n_items ( ) ;
sdk_items . append ( & item ) ;
let pos = self . sdk_items . n_items ( ) ;
self . sdk_items . append ( & item ) ;
self . update_items_headers ( pos , 1 ) ;
// Try to update the latest unread message.
@ -321,24 +460,32 @@ mod imp {
}
}
VectorDiff ::PopFront = > {
let item = sdk_items . item ( 0 ) . and_downcast ( ) . unwrap ( ) ;
let item = self
. sdk_items
. item ( 0 )
. and_downcast ( )
. expect ( "all items are TimelineItems" ) ;
self . remove_item ( & item ) ;
sdk_items . remove ( 0 ) ;
self . sdk_items . remove ( 0 ) ;
self . update_items_headers ( 0 , 1 ) ;
}
VectorDiff ::PopBack = > {
let pos = sdk_items . n_items ( ) - 1 ;
let item = sdk_items . item ( pos ) . and_downcast ( ) . unwrap ( ) ;
let pos = self . sdk_items . n_items ( ) - 1 ;
let item = self
. sdk_items
. item ( pos )
. and_downcast ( )
. expect ( "all items are TimelineItems" ) ;
self . remove_item ( & item ) ;
sdk_items . remove ( pos ) ;
self . sdk_items . remove ( pos ) ;
}
VectorDiff ::Insert { index , value } = > {
let pos = index as u32 ;
let item = self . create_item ( & value ) ;
sdk_items . insert ( pos , & item ) ;
self . sdk_items . insert ( pos , & item ) ;
self . update_items_headers ( pos , 1 ) ;
// Try to update the latest unread message.
@ -348,7 +495,11 @@ mod imp {
}
VectorDiff ::Set { index , value } = > {
let pos = index as u32 ;
let prev_item = sdk_items . item ( pos ) . and_downcast ::< TimelineItem > ( ) . unwrap ( ) ;
let prev_item = self
. sdk_items
. item ( pos )
. and_downcast ::< TimelineItem > ( )
. expect ( "all items are TimelineItems" ) ;
let item = if prev_item . try_update_with ( & value ) {
if let Some ( event ) = prev_item . downcast_ref ::< Event > ( ) {
@ -364,7 +515,7 @@ mod imp {
self . remove_item ( & prev_item ) ;
let item = self . create_item ( & value ) ;
sdk_items . splice ( pos , 1 , & [ item . clone ( ) ] ) ;
self . sdk_items . splice ( pos , 1 , & [ item . clone ( ) ] ) ;
item
} ;
@ -379,22 +530,31 @@ mod imp {
}
VectorDiff ::Remove { index } = > {
let pos = index as u32 ;
let item = sdk_items . item ( pos ) . and_downcast ( ) . unwrap ( ) ;
let item = self
. sdk_items
. item ( pos )
. and_downcast ( )
. expect ( "all items are TimelineItems" ) ;
self . remove_item ( & item ) ;
sdk_items . remove ( pos ) ;
self . sdk_items . remove ( pos ) ;
self . update_items_headers ( pos , 1 ) ;
}
VectorDiff ::Truncate { length } = > {
let new_len = length as u32 ;
let old_len = sdk_items . n_items ( ) ;
let old_len = self . sdk_items . n_items ( ) ;
for pos in new_len .. old_len {
let item = sdk_items . item ( pos ) . and_downcast ( ) . unwrap ( ) ;
let item = self
. sdk_items
. item ( pos )
. and_downcast ( )
. expect ( "all items are TimelineItems" ) ;
self . remove_item ( & item ) ;
}
sdk_items . splice ( new_len , old_len - new_len , & [ ] as & [ glib ::Object ] ) ;
self . sdk_items
. splice ( new_len , old_len - new_len , & [ ] as & [ glib ::Object ] ) ;
}
VectorDiff ::Reset { values } = > {
// Reset the state.
@ -406,10 +566,10 @@ mod imp {
. map ( | item | self . create_item ( & item ) )
. collect ::< Vec < _ > > ( ) ;
let removed = sdk_items . n_items ( ) ;
let removed = self . sdk_items . n_items ( ) ;
let added = new_list . len ( ) as u32 ;
sdk_items . splice ( 0 , removed , & new_list ) ;
self . sdk_items . splice ( 0 , removed , & new_list ) ;
self . update_items_headers ( 0 , added . max ( 1 ) ) ;
// Try to update the latest unread message.
@ -418,13 +578,6 @@ mod imp {
) ;
}
}
let obj = self . obj ( ) ;
if self . is_empty ( ) ! = was_empty {
obj . notify_is_empty ( ) ;
}
obj . emit_read_change_trigger ( ) ;
}
/// Update `nb` items' headers starting at `pos`.
@ -625,7 +778,7 @@ mod imp {
let handle = spawn_tokio ! ( fut ) ;
self . read_receipts_changed_handle
. set ( handle . abort_handle ( ) )
. unwrap ( ) ;
. expect ( "handle is uninitialized" ) ;
}
}
}
@ -738,7 +891,7 @@ impl Timeline {
. await
} )
. await
. unwrap ( ) ;
. expect ( "task was not aborted" ) ;
let sdk_items = & self . imp ( ) . sdk_items ;
let count = sdk_items . n_items ( ) ;