Browse Source

timeline: Clear timeline on broken forward_stream

merge-requests/1327/merge
Julian Sparber 4 years ago
parent
commit
e6259b5c50
  1. 38
      src/session/room/timeline.rs

38
src/session/room/timeline.rs

@ -9,13 +9,13 @@ use gtk::{gio, glib, prelude::*, subclass::prelude::*};
use log::{error, warn};
use matrix_sdk::{
deserialized_responses::SyncRoomEvent,
room::Room as MatrixRoom,
ruma::{
events::{room::message::MessageType, AnySyncMessageEvent, AnySyncRoomEvent},
identifiers::{EventId, TransactionId},
},
Error as MatrixError,
};
use tokio::task::JoinHandle;
use crate::{
session::{
@ -73,6 +73,7 @@ mod imp {
/// The most recent verification request event
pub verification: RefCell<Option<IdentityVerification>>,
pub backward_stream: Arc<Mutex<Option<BackwardStream>>>,
pub forward_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
#[glib::object_subclass]
@ -531,14 +532,26 @@ impl Timeline {
self.add_loading_spinner();
let matrix_room = self.room().matrix_room();
let room_weak = self.downgrade().into();
let timeline_weak = self.downgrade().into();
let backward_stream = priv_.backward_stream.clone();
let forward_handle = priv_.forward_handle.clone();
let handle: tokio::task::JoinHandle<matrix_sdk::Result<_>> = spawn_tokio!(async move {
let mut backward_stream_guard = backward_stream.lock().await;
let mut forward_handle_guard = forward_handle.lock().await;
if backward_stream_guard.is_none() {
let (forward_stream, backward_stream) = matrix_room.timeline().await?;
let forward_handle = tokio::spawn(async move {
handle_forward_stream(timeline_weak, forward_stream).await;
});
if let Some(old_forward_handle) = forward_handle_guard.replace(forward_handle) {
old_forward_handle.abort();
}
backward_stream_guard
.replace(create_streams_handler(room_weak, matrix_room).await?);
.replace(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE)));
}
Ok(backward_stream_guard.as_mut().unwrap().next().await)
@ -587,12 +600,18 @@ impl Timeline {
let mut backward_stream = priv_.backward_stream.lock().await;
backward_stream.take();
let mut forward_handle = priv_.forward_handle.lock().await;
if let Some(forward_handle) = forward_handle.take() {
forward_handle.abort();
}
let length = priv_.list.borrow().len();
priv_.relates_to_events.replace(HashMap::new());
priv_.list.replace(VecDeque::new());
priv_.event_map.replace(HashMap::new());
priv_.pending_events.replace(HashMap::new());
priv_.redacted_events.replace(HashSet::new());
self.set_state(TimelineState::Initial);
self.notify("empty");
self.upcast_ref::<gio::ListModel>()
@ -912,19 +931,6 @@ impl Timeline {
}
}
async fn create_streams_handler(
timeline: glib::SendWeakRef<Timeline>,
matrix_room: MatrixRoom,
) -> matrix_sdk::Result<BackwardStream> {
let (forward_stream, backward_stream) = matrix_room.timeline().await?;
tokio::spawn(async move {
handle_forward_stream(timeline, forward_stream).await;
});
Ok(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE)))
}
async fn handle_forward_stream(
timeline: glib::SendWeakRef<Timeline>,
stream: impl Stream<Item = SyncRoomEvent>,

Loading…
Cancel
Save