|
|
|
|
@ -1,16 +1,19 @@
|
|
|
|
|
use std::collections::HashMap; |
|
|
|
|
use std::{ |
|
|
|
|
collections::{HashMap, HashSet, VecDeque}, |
|
|
|
|
pin::Pin, |
|
|
|
|
sync::Arc, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
use gtk::{gio, glib, glib::clone, prelude::*, subclass::prelude::*}; |
|
|
|
|
use futures::{lock::Mutex, pin_mut, Stream, StreamExt}; |
|
|
|
|
use gtk::{gio, glib, prelude::*, subclass::prelude::*}; |
|
|
|
|
use log::{error, warn}; |
|
|
|
|
use matrix_sdk::{ |
|
|
|
|
deserialized_responses::SyncRoomEvent, |
|
|
|
|
room::Room as MatrixRoom, |
|
|
|
|
ruma::{ |
|
|
|
|
api::client::r0::message::get_message_events::Direction, |
|
|
|
|
events::{ |
|
|
|
|
room::message::MessageType, AnySyncMessageEvent, AnySyncRoomEvent, AnySyncStateEvent, |
|
|
|
|
}, |
|
|
|
|
identifiers::EventId, |
|
|
|
|
events::{room::message::MessageType, AnySyncMessageEvent, AnySyncRoomEvent}, |
|
|
|
|
identifiers::{EventId, TransactionId}, |
|
|
|
|
}, |
|
|
|
|
uuid::Uuid, |
|
|
|
|
Error as MatrixError, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
@ -20,7 +23,7 @@ use crate::{
|
|
|
|
|
user::UserExt, |
|
|
|
|
verification::{IdentityVerification, VERIFICATION_CREATION_TIMEOUT}, |
|
|
|
|
}, |
|
|
|
|
spawn, spawn_tokio, |
|
|
|
|
spawn_tokio, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy, glib::Enum)] |
|
|
|
|
@ -40,11 +43,12 @@ impl Default for TimelineState {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const MAX_BATCH_SIZE: usize = 20; |
|
|
|
|
type BackwardStream = |
|
|
|
|
Pin<Box<dyn Stream<Item = Vec<matrix_sdk::Result<SyncRoomEvent>>> + 'static + Send>>; |
|
|
|
|
|
|
|
|
|
mod imp { |
|
|
|
|
use std::{ |
|
|
|
|
cell::{Cell, RefCell}, |
|
|
|
|
collections::{HashSet, VecDeque}, |
|
|
|
|
}; |
|
|
|
|
use std::cell::{Cell, RefCell}; |
|
|
|
|
|
|
|
|
|
use glib::object::WeakRef; |
|
|
|
|
use once_cell::{sync::Lazy, unsync::OnceCell}; |
|
|
|
|
@ -62,13 +66,13 @@ mod imp {
|
|
|
|
|
pub event_map: RefCell<HashMap<Box<EventId>, Event>>, |
|
|
|
|
/// Maps the temporary `EventId` of the pending Event to the real
|
|
|
|
|
/// `EventId`
|
|
|
|
|
pub pending_events: RefCell<HashMap<String, Box<EventId>>>, |
|
|
|
|
pub pending_events: RefCell<HashMap<Box<TransactionId>, Box<EventId>>>, |
|
|
|
|
/// A Hashset of `EventId`s that where just redacted.
|
|
|
|
|
pub redacted_events: RefCell<HashSet<Box<EventId>>>, |
|
|
|
|
pub oldest_event: RefCell<Option<Box<EventId>>>, |
|
|
|
|
pub state: Cell<TimelineState>, |
|
|
|
|
/// The most recent verification request event
|
|
|
|
|
pub verification: RefCell<Option<IdentityVerification>>, |
|
|
|
|
pub backward_stream: Arc<Mutex<Option<BackwardStream>>>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[glib::object_subclass] |
|
|
|
|
@ -510,8 +514,92 @@ impl Timeline {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Load the timeline
|
|
|
|
|
/// This function should also be called to load more events
|
|
|
|
|
/// Returns `true` when messages where successfully added
|
|
|
|
|
pub async fn load(&self) -> bool { |
|
|
|
|
let priv_ = self.imp(); |
|
|
|
|
|
|
|
|
|
if matches!( |
|
|
|
|
self.state(), |
|
|
|
|
TimelineState::Loading | TimelineState::Complete |
|
|
|
|
) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
self.set_state(TimelineState::Loading); |
|
|
|
|
self.add_loading_spinner(); |
|
|
|
|
|
|
|
|
|
let matrix_room = self.room().matrix_room(); |
|
|
|
|
let room_weak = self.downgrade().into(); |
|
|
|
|
let backward_stream = priv_.backward_stream.clone(); |
|
|
|
|
|
|
|
|
|
let handle: tokio::task::JoinHandle<matrix_sdk::Result<_>> = spawn_tokio!(async move { |
|
|
|
|
let mut backward_stream_guard = backward_stream.lock().await; |
|
|
|
|
if backward_stream_guard.is_none() { |
|
|
|
|
backward_stream_guard |
|
|
|
|
.replace(create_streams_handler(room_weak, matrix_room).await?); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Ok(backward_stream_guard.as_mut().unwrap().next().await) |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
match handle.await.unwrap() { |
|
|
|
|
Ok(Some(events)) => { |
|
|
|
|
let events: Vec<Event> = events |
|
|
|
|
.into_iter() |
|
|
|
|
.filter_map(|event| match event { |
|
|
|
|
Ok(event) => Some(event), |
|
|
|
|
Err(error) => { |
|
|
|
|
error!("Failed to load messages: {}", error); |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.map(|event| Event::new(event, &self.room())) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
self.remove_loading_spinner(); |
|
|
|
|
if events.is_empty() { |
|
|
|
|
self.set_state(TimelineState::Error); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
self.set_state(TimelineState::Ready); |
|
|
|
|
self.prepend(events); |
|
|
|
|
true |
|
|
|
|
} |
|
|
|
|
Ok(None) => { |
|
|
|
|
self.remove_loading_spinner(); |
|
|
|
|
self.set_state(TimelineState::Complete); |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
Err(error) => { |
|
|
|
|
error!("Failed to load timeline: {}", error); |
|
|
|
|
self.set_state(TimelineState::Error); |
|
|
|
|
self.remove_loading_spinner(); |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn clear(&self) { |
|
|
|
|
let priv_ = self.imp(); |
|
|
|
|
// Remove backward stream so that we create new streams
|
|
|
|
|
let mut backward_stream = priv_.backward_stream.lock().await; |
|
|
|
|
backward_stream.take(); |
|
|
|
|
|
|
|
|
|
let length = priv_.list.borrow().len(); |
|
|
|
|
priv_.relates_to_events.replace(HashMap::new()); |
|
|
|
|
priv_.list.replace(VecDeque::new()); |
|
|
|
|
priv_.event_map.replace(HashMap::new()); |
|
|
|
|
priv_.pending_events.replace(HashMap::new()); |
|
|
|
|
priv_.redacted_events.replace(HashSet::new()); |
|
|
|
|
|
|
|
|
|
self.notify("empty"); |
|
|
|
|
self.upcast_ref::<gio::ListModel>() |
|
|
|
|
.items_changed(0, length as u32, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Append the new events
|
|
|
|
|
// TODO: This should be lazy, for inspiration see: https://blogs.gnome.org/ebassi/documentation/lazy-loading/
|
|
|
|
|
pub fn append(&self, batch: Vec<Event>) { |
|
|
|
|
let priv_ = self.imp(); |
|
|
|
|
|
|
|
|
|
@ -527,12 +615,6 @@ impl Timeline {
|
|
|
|
|
// multiple times
|
|
|
|
|
list.reserve(batch.len()); |
|
|
|
|
|
|
|
|
|
if list.is_empty() { |
|
|
|
|
priv_ |
|
|
|
|
.oldest_event |
|
|
|
|
.replace(batch.first().as_ref().map(|event| event.matrix_event_id())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
list.len() |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
@ -578,7 +660,7 @@ impl Timeline {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Append an event that wasn't yet fully sent and received via a sync
|
|
|
|
|
pub fn append_pending(&self, txn_id: Uuid, event: Event) { |
|
|
|
|
pub fn append_pending(&self, txn_id: &TransactionId, event: Event) { |
|
|
|
|
let priv_ = self.imp(); |
|
|
|
|
|
|
|
|
|
priv_ |
|
|
|
|
@ -589,7 +671,7 @@ impl Timeline {
|
|
|
|
|
priv_ |
|
|
|
|
.pending_events |
|
|
|
|
.borrow_mut() |
|
|
|
|
.insert(txn_id.to_string(), event.matrix_event_id()); |
|
|
|
|
.insert(txn_id.to_owned(), event.matrix_event_id()); |
|
|
|
|
|
|
|
|
|
let index = { |
|
|
|
|
let mut list = priv_.list.borrow_mut(); |
|
|
|
|
@ -634,7 +716,7 @@ impl Timeline {
|
|
|
|
|
let handle = |
|
|
|
|
spawn_tokio!(async move { matrix_room.event(event_id_clone.as_ref()).await }); |
|
|
|
|
match handle.await.unwrap() { |
|
|
|
|
Ok(room_event) => Ok(Event::new(room_event.event.into(), &room)), |
|
|
|
|
Ok(room_event) => Ok(Event::new(room_event.into(), &room)), |
|
|
|
|
Err(error) => { |
|
|
|
|
// TODO: Retry on connection error?
|
|
|
|
|
warn!("Could not fetch event {}: {}", event_id, error); |
|
|
|
|
@ -645,15 +727,10 @@ impl Timeline {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Prepends a batch of events
|
|
|
|
|
// TODO: This should be lazy, see: https://blogs.gnome.org/ebassi/documentation/lazy-loading/
|
|
|
|
|
pub fn prepend(&self, batch: Vec<Event>) { |
|
|
|
|
let priv_ = self.imp(); |
|
|
|
|
let mut added = batch.len(); |
|
|
|
|
|
|
|
|
|
priv_ |
|
|
|
|
.oldest_event |
|
|
|
|
.replace(batch.last().as_ref().map(|event| event.matrix_event_id())); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
let mut hidden_events: Vec<Event> = vec![]; |
|
|
|
|
// Extend the size of the list so that rust doesn't need to reallocate memory
|
|
|
|
|
@ -712,9 +789,6 @@ impl Timeline {
|
|
|
|
|
|| (priv_.list.borrow().len() == 1 && self.state() == TimelineState::Loading) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn oldest_event(&self) -> Option<Box<EventId>> { |
|
|
|
|
self.imp().oldest_event.borrow().clone() |
|
|
|
|
} |
|
|
|
|
fn add_loading_spinner(&self) { |
|
|
|
|
self.imp() |
|
|
|
|
.list |
|
|
|
|
@ -728,68 +802,6 @@ impl Timeline {
|
|
|
|
|
self.upcast_ref::<gio::ListModel>().items_changed(0, 1, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn load_previous_events(&self) { |
|
|
|
|
if matches!( |
|
|
|
|
self.state(), |
|
|
|
|
TimelineState::Loading | TimelineState::Complete |
|
|
|
|
) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
self.set_state(TimelineState::Loading); |
|
|
|
|
self.add_loading_spinner(); |
|
|
|
|
|
|
|
|
|
let matrix_room = self.room().matrix_room(); |
|
|
|
|
let last_event = self.oldest_event(); |
|
|
|
|
let contains_last_event = last_event.is_some(); |
|
|
|
|
|
|
|
|
|
let handle = spawn_tokio!(async move { |
|
|
|
|
matrix_room |
|
|
|
|
.messages(last_event.as_deref(), None, 20, Direction::Backward) |
|
|
|
|
.await |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
spawn!( |
|
|
|
|
glib::PRIORITY_LOW, |
|
|
|
|
clone!(@weak self as obj => async move { |
|
|
|
|
obj.remove_loading_spinner(); |
|
|
|
|
|
|
|
|
|
// FIXME: If the request fails it's automatically restarted because the added events (none), didn't fill the screen.
|
|
|
|
|
// We should block the loading for some time before retrying
|
|
|
|
|
match handle.await.unwrap() { |
|
|
|
|
Ok(Some(events)) => { |
|
|
|
|
let events: Vec<Event> = if contains_last_event { |
|
|
|
|
events |
|
|
|
|
.into_iter() |
|
|
|
|
.skip(1) |
|
|
|
|
.map(|event| Event::new(event, &obj.room())).collect() |
|
|
|
|
} else { |
|
|
|
|
events |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|event| Event::new(event, &obj.room())).collect() |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if events.iter().any(|event| matches!(event.matrix_event(), Some(AnySyncRoomEvent::State(AnySyncStateEvent::RoomCreate(_))))) { |
|
|
|
|
obj.set_state(TimelineState::Complete); |
|
|
|
|
} else { |
|
|
|
|
obj.set_state(TimelineState::Ready); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
obj.prepend(events); |
|
|
|
|
}, |
|
|
|
|
Ok(None) => { |
|
|
|
|
error!("The start event wasn't found in the timeline for room {}.", obj.room().room_id()); |
|
|
|
|
obj.set_state(TimelineState::Error); |
|
|
|
|
}, |
|
|
|
|
Err(error) => { |
|
|
|
|
error!("Couldn't load previous events for room {}: {}", error, obj.room().room_id()); |
|
|
|
|
obj.set_state(TimelineState::Error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn set_verification(&self, verification: IdentityVerification) { |
|
|
|
|
self.imp().verification.replace(Some(verification)); |
|
|
|
|
self.notify("verification"); |
|
|
|
|
@ -899,3 +911,58 @@ impl Timeline {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn create_streams_handler( |
|
|
|
|
timeline: glib::SendWeakRef<Timeline>, |
|
|
|
|
matrix_room: MatrixRoom, |
|
|
|
|
) -> matrix_sdk::Result<BackwardStream> { |
|
|
|
|
let (forward_stream, backward_stream) = matrix_room.timeline().await?; |
|
|
|
|
|
|
|
|
|
tokio::spawn(async move { |
|
|
|
|
handle_forward_stream(timeline, forward_stream).await; |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
Ok(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE))) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn handle_forward_stream( |
|
|
|
|
timeline: glib::SendWeakRef<Timeline>, |
|
|
|
|
stream: impl Stream<Item = SyncRoomEvent>, |
|
|
|
|
) { |
|
|
|
|
let stream = stream.ready_chunks(MAX_BATCH_SIZE); |
|
|
|
|
pin_mut!(stream); |
|
|
|
|
|
|
|
|
|
while let Some(events) = stream.next().await { |
|
|
|
|
let timeline = timeline.clone(); |
|
|
|
|
let (sender, receiver) = futures::channel::oneshot::channel(); |
|
|
|
|
let ctx = glib::MainContext::default(); |
|
|
|
|
ctx.spawn(async move { |
|
|
|
|
let result = if let Some(timeline) = timeline.upgrade() { |
|
|
|
|
timeline.append( |
|
|
|
|
events |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|event| Event::new(event, &timeline.room())) |
|
|
|
|
.collect(), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
true |
|
|
|
|
} else { |
|
|
|
|
false |
|
|
|
|
}; |
|
|
|
|
sender.send(result).unwrap(); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
if !receiver.await.unwrap() { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let ctx = glib::MainContext::default(); |
|
|
|
|
ctx.spawn(async move { |
|
|
|
|
crate::spawn!(async move { |
|
|
|
|
if let Some(timeline) = timeline.upgrade() { |
|
|
|
|
timeline.clear().await; |
|
|
|
|
}; |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|