diff --git a/src/session/room/timeline.rs b/src/session/room/timeline.rs index 282f532a..0f4f322c 100644 --- a/src/session/room/timeline.rs +++ b/src/session/room/timeline.rs @@ -9,13 +9,13 @@ use gtk::{gio, glib, prelude::*, subclass::prelude::*}; use log::{error, warn}; use matrix_sdk::{ deserialized_responses::SyncRoomEvent, - room::Room as MatrixRoom, ruma::{ events::{room::message::MessageType, AnySyncMessageEvent, AnySyncRoomEvent}, identifiers::{EventId, TransactionId}, }, Error as MatrixError, }; +use tokio::task::JoinHandle; use crate::{ session::{ @@ -73,6 +73,7 @@ mod imp { /// The most recent verification request event pub verification: RefCell>, pub backward_stream: Arc>>, + pub forward_handle: Arc>>>, } #[glib::object_subclass] @@ -531,14 +532,26 @@ impl Timeline { self.add_loading_spinner(); let matrix_room = self.room().matrix_room(); - let room_weak = self.downgrade().into(); + let timeline_weak = self.downgrade().into(); let backward_stream = priv_.backward_stream.clone(); + let forward_handle = priv_.forward_handle.clone(); let handle: tokio::task::JoinHandle> = spawn_tokio!(async move { let mut backward_stream_guard = backward_stream.lock().await; + let mut forward_handle_guard = forward_handle.lock().await; if backward_stream_guard.is_none() { + let (forward_stream, backward_stream) = matrix_room.timeline().await?; + + let forward_handle = tokio::spawn(async move { + handle_forward_stream(timeline_weak, forward_stream).await; + }); + + if let Some(old_forward_handle) = forward_handle_guard.replace(forward_handle) { + old_forward_handle.abort(); + } + backward_stream_guard - .replace(create_streams_handler(room_weak, matrix_room).await?); + .replace(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE))); } Ok(backward_stream_guard.as_mut().unwrap().next().await) @@ -587,12 +600,18 @@ impl Timeline { let mut backward_stream = priv_.backward_stream.lock().await; backward_stream.take(); + let mut forward_handle = priv_.forward_handle.lock().await; + if let Some(forward_handle) = forward_handle.take() { + forward_handle.abort(); + } + let length = priv_.list.borrow().len(); priv_.relates_to_events.replace(HashMap::new()); priv_.list.replace(VecDeque::new()); priv_.event_map.replace(HashMap::new()); priv_.pending_events.replace(HashMap::new()); priv_.redacted_events.replace(HashSet::new()); + self.set_state(TimelineState::Initial); self.notify("empty"); self.upcast_ref::() @@ -912,19 +931,6 @@ impl Timeline { } } -async fn create_streams_handler( - timeline: glib::SendWeakRef, - matrix_room: MatrixRoom, -) -> matrix_sdk::Result { - let (forward_stream, backward_stream) = matrix_room.timeline().await?; - - tokio::spawn(async move { - handle_forward_stream(timeline, forward_stream).await; - }); - - Ok(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE))) -} - async fn handle_forward_stream( timeline: glib::SendWeakRef, stream: impl Stream,