Browse Source

room: Restart the send queue regularly when an error that disabled it is encountered

merge-requests/1716/head
Kévin Commaille 2 years ago
parent
commit
28e8395182
No known key found for this signature in database
GPG Key ID: C971D9DBC9D678D
  1. 1
      Cargo.lock
  2. 1
      Cargo.toml
  3. 88
      src/session/model/room/mod.rs

1
Cargo.lock generated

@ -1442,6 +1442,7 @@ dependencies = [
"strum",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"url",

1
Cargo.toml

@ -44,6 +44,7 @@ serde_json = "1"
strum = { version = "0.26", features = ["derive"] }
thiserror = "1"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2"

88
src/session/model/room/mod.rs

@ -12,10 +12,12 @@ use matrix_sdk::{
deserialized_responses::{AmbiguityChange, MemberEvent, SyncTimelineEvent},
event_handler::EventHandlerDropGuard,
room::Room as MatrixRoom,
send_queue::RoomSendQueueUpdate,
sync::{JoinedRoomUpdate, LeftRoomUpdate},
DisplayName, Result as MatrixResult, RoomInfo, RoomMemberships, RoomState,
};
use ruma::{
api::client::error::{ErrorKind, RetryAfter},
events::{
receipt::{ReceiptEventContent, ReceiptType},
relation::Annotation,
@ -30,6 +32,7 @@ use ruma::{
},
EventId, MatrixToUri, MatrixUri, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{debug, error, warn};
mod aliases;
@ -67,10 +70,15 @@ use crate::{
utils::BoundObjectWeakRef,
};
/// The default duration in seconds that we wait for to retry failed sending
/// requests.
const DEFAULT_RETRY_AFTER: u32 = 30;
mod imp {
use std::{
cell::{Cell, OnceCell},
marker::PhantomData,
time::SystemTime,
};
use glib::subclass::Signal;
@ -364,6 +372,75 @@ mod imp {
self.notifications_setting.set(setting);
self.obj().notify_notifications_setting();
}
/// Watch errors in the send queue to try to handle them.
pub(super) async fn watch_send_queue(&self) {
let send_queue = self.matrix_room().send_queue();
let room_weak = glib::SendWeakRef::from(self.obj().downgrade());
spawn_tokio!(async move {
let subscriber = match send_queue.subscribe().await {
Ok((_, subscriber)) => BroadcastStream::new(subscriber),
Err(error) => {
warn!("Failed to listen to room send queue: {error}");
return;
}
};
subscriber
.for_each(move |update| {
let room_weak = room_weak.clone();
async move {
let Ok(RoomSendQueueUpdate::SendError {
error,
is_recoverable: true,
..
}) = update
else {
return;
};
let ctx = glib::MainContext::default();
ctx.spawn(async move {
spawn!(async move {
let Some(obj) = room_weak.upgrade() else {
return;
};
let Some(session) = obj.session() else {
return;
};
if session.is_offline() {
// The queue will be restarted when the session is back
// online.
return;
}
let duration = match error.client_api_error_kind() {
Some(ErrorKind::LimitExceeded {
retry_after: Some(retry_after),
}) => match retry_after {
RetryAfter::Delay(duration) => Some(*duration),
RetryAfter::DateTime(time) => {
time.duration_since(SystemTime::now()).ok()
}
},
_ => None,
};
let retry_after = duration
.and_then(|d| d.as_secs().try_into().ok())
.unwrap_or(DEFAULT_RETRY_AFTER);
glib::timeout_add_seconds_local_once(retry_after, move || {
obj.matrix_room().send_queue().set_enabled(true);
});
});
});
}
})
.await;
});
}
}
}
@ -497,6 +574,17 @@ impl Room {
}
)
);
spawn!(
glib::Priority::DEFAULT_IDLE,
clone!(
#[weak(rename_to = obj)]
self,
async move {
obj.imp().watch_send_queue().await;
}
)
);
}
fn init_timeline(&self) {

Loading…
Cancel
Save