diff --git a/Cargo.lock b/Cargo.lock index 8e563096..daad96f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1442,6 +1442,7 @@ dependencies = [ "strum", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index dc143728..b690f65a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ serde_json = "1" strum = { version = "0.26", features = ["derive"] } thiserror = "1" tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync"] } +tokio-stream = { version = "0.1", features = ["sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2" diff --git a/src/session/model/room/mod.rs b/src/session/model/room/mod.rs index 21041985..1c4e9c9e 100644 --- a/src/session/model/room/mod.rs +++ b/src/session/model/room/mod.rs @@ -12,10 +12,12 @@ use matrix_sdk::{ deserialized_responses::{AmbiguityChange, MemberEvent, SyncTimelineEvent}, event_handler::EventHandlerDropGuard, room::Room as MatrixRoom, + send_queue::RoomSendQueueUpdate, sync::{JoinedRoomUpdate, LeftRoomUpdate}, DisplayName, Result as MatrixResult, RoomInfo, RoomMemberships, RoomState, }; use ruma::{ + api::client::error::{ErrorKind, RetryAfter}, events::{ receipt::{ReceiptEventContent, ReceiptType}, relation::Annotation, @@ -30,6 +32,7 @@ use ruma::{ }, EventId, MatrixToUri, MatrixUri, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, }; +use tokio_stream::wrappers::BroadcastStream; use tracing::{debug, error, warn}; mod aliases; @@ -67,10 +70,15 @@ use crate::{ utils::BoundObjectWeakRef, }; +/// The default duration in seconds that we wait for to retry failed sending +/// requests. +const DEFAULT_RETRY_AFTER: u32 = 30; + mod imp { use std::{ cell::{Cell, OnceCell}, marker::PhantomData, + time::SystemTime, }; use glib::subclass::Signal; @@ -364,6 +372,75 @@ mod imp { self.notifications_setting.set(setting); self.obj().notify_notifications_setting(); } + + /// Watch errors in the send queue to try to handle them. + pub(super) async fn watch_send_queue(&self) { + let send_queue = self.matrix_room().send_queue(); + + let room_weak = glib::SendWeakRef::from(self.obj().downgrade()); + spawn_tokio!(async move { + let subscriber = match send_queue.subscribe().await { + Ok((_, subscriber)) => BroadcastStream::new(subscriber), + Err(error) => { + warn!("Failed to listen to room send queue: {error}"); + return; + } + }; + + subscriber + .for_each(move |update| { + let room_weak = room_weak.clone(); + async move { + let Ok(RoomSendQueueUpdate::SendError { + error, + is_recoverable: true, + .. + }) = update + else { + return; + }; + + let ctx = glib::MainContext::default(); + ctx.spawn(async move { + spawn!(async move { + let Some(obj) = room_weak.upgrade() else { + return; + }; + let Some(session) = obj.session() else { + return; + }; + + if session.is_offline() { + // The queue will be restarted when the session is back + // online. + return; + } + + let duration = match error.client_api_error_kind() { + Some(ErrorKind::LimitExceeded { + retry_after: Some(retry_after), + }) => match retry_after { + RetryAfter::Delay(duration) => Some(*duration), + RetryAfter::DateTime(time) => { + time.duration_since(SystemTime::now()).ok() + } + }, + _ => None, + }; + let retry_after = duration + .and_then(|d| d.as_secs().try_into().ok()) + .unwrap_or(DEFAULT_RETRY_AFTER); + + glib::timeout_add_seconds_local_once(retry_after, move || { + obj.matrix_room().send_queue().set_enabled(true); + }); + }); + }); + } + }) + .await; + }); + } } } @@ -497,6 +574,17 @@ impl Room { } ) ); + + spawn!( + glib::Priority::DEFAULT_IDLE, + clone!( + #[weak(rename_to = obj)] + self, + async move { + obj.imp().watch_send_queue().await; + } + ) + ); } fn init_timeline(&self) {