|
|
|
|
@ -13,7 +13,7 @@ use ruma::{
|
|
|
|
|
}, |
|
|
|
|
assign, |
|
|
|
|
}; |
|
|
|
|
use tokio::task::AbortHandle; |
|
|
|
|
use tokio::{task::AbortHandle, time::sleep}; |
|
|
|
|
use tokio_stream::wrappers::BroadcastStream; |
|
|
|
|
use tracing::{debug, error, info}; |
|
|
|
|
|
|
|
|
|
@ -38,7 +38,13 @@ use crate::{
|
|
|
|
|
const SESSION_PROFILE_KEY: &str = "session_profile"; |
|
|
|
|
/// The number of consecutive missed synchronizations before the session is
|
|
|
|
|
/// marked as offline.
|
|
|
|
|
const MISSED_SYNC_MAX_COUNT: u8 = 3; |
|
|
|
|
///
|
|
|
|
|
/// Note that this is set to `2`, but the count begins at `0` so this would
|
|
|
|
|
/// match the third missed synchronization.
|
|
|
|
|
const MISSED_SYNC_OFFLINE_COUNT: usize = 2; |
|
|
|
|
/// The delays in seconds to wait for when a sync fails, depending on the number
|
|
|
|
|
/// of missed attempts.
|
|
|
|
|
const MISSED_SYNC_DELAYS: &[u64] = &[1, 5, 10, 20, 30]; |
|
|
|
|
|
|
|
|
|
/// The state of the session.
|
|
|
|
|
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, glib::Enum)] |
|
|
|
|
@ -104,8 +110,8 @@ mod imp {
|
|
|
|
|
homeserver_reachable_source: RefCell<Option<glib::SourceId>>, |
|
|
|
|
/// The number of missed synchonizations in a row.
|
|
|
|
|
///
|
|
|
|
|
/// Capped at `MISSED_SYNC_MAX_COUNT - 1`.
|
|
|
|
|
missed_sync_count: Cell<u8>, |
|
|
|
|
/// Capped at `MISSED_SYNC_DELAYS.len() - 1`.
|
|
|
|
|
missed_sync_count: Cell<usize>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[glib::object_subclass] |
|
|
|
|
@ -272,6 +278,8 @@ mod imp {
|
|
|
|
|
#[weak(rename_to = imp)] |
|
|
|
|
self, |
|
|
|
|
move || { |
|
|
|
|
imp.homeserver_reachable_source.take(); |
|
|
|
|
|
|
|
|
|
spawn!(async move { |
|
|
|
|
imp.update_homeserver_reachable().await; |
|
|
|
|
}); |
|
|
|
|
@ -429,13 +437,24 @@ mod imp {
|
|
|
|
|
while let Some(response) = sync_stream.next().await { |
|
|
|
|
let obj_weak = obj_weak.clone(); |
|
|
|
|
let ctx = glib::MainContext::default(); |
|
|
|
|
ctx.spawn(async move { |
|
|
|
|
spawn!(async move { |
|
|
|
|
if let Some(obj) = obj_weak.upgrade() { |
|
|
|
|
obj.imp().handle_sync_response(response); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
let delay = ctx |
|
|
|
|
.spawn(async move { |
|
|
|
|
spawn!(async move { |
|
|
|
|
if let Some(obj) = obj_weak.upgrade() { |
|
|
|
|
obj.imp().handle_sync_response(response) |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.await |
|
|
|
|
.expect("task was not aborted") |
|
|
|
|
}) |
|
|
|
|
.await |
|
|
|
|
.expect("task was not aborted"); |
|
|
|
|
|
|
|
|
|
if let Some(delay) = delay { |
|
|
|
|
sleep(delay).await; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.abort_handle(); |
|
|
|
|
@ -444,7 +463,13 @@ mod imp {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Handle the response received via sync.
|
|
|
|
|
fn handle_sync_response(&self, response: Result<SyncResponse, matrix_sdk::Error>) { |
|
|
|
|
///
|
|
|
|
|
/// Returns the delay to wait for before making the next sync, if
|
|
|
|
|
/// necessary.
|
|
|
|
|
fn handle_sync_response( |
|
|
|
|
&self, |
|
|
|
|
response: Result<SyncResponse, matrix_sdk::Error>, |
|
|
|
|
) -> Option<Duration> { |
|
|
|
|
let obj = self.obj(); |
|
|
|
|
let session_id = obj.session_id(); |
|
|
|
|
debug!(session = session_id, "Received sync response"); |
|
|
|
|
@ -460,16 +485,27 @@ mod imp {
|
|
|
|
|
|
|
|
|
|
self.set_offline(false); |
|
|
|
|
self.missed_sync_count.set(0); |
|
|
|
|
|
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
Err(error) => { |
|
|
|
|
let missed_sync_count = self.missed_sync_count.get() + 1; |
|
|
|
|
let missed_sync_count = self.missed_sync_count.get(); |
|
|
|
|
|
|
|
|
|
if missed_sync_count >= MISSED_SYNC_MAX_COUNT { |
|
|
|
|
// If there are too many failed attempts, mark the session as offline.
|
|
|
|
|
if missed_sync_count == MISSED_SYNC_OFFLINE_COUNT { |
|
|
|
|
self.set_offline(true); |
|
|
|
|
} else { |
|
|
|
|
self.missed_sync_count.set(missed_sync_count); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Increase the count of missed syncs, if we have not reached the maximum value.
|
|
|
|
|
if missed_sync_count < 4 { |
|
|
|
|
self.missed_sync_count.set(missed_sync_count + 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
error!(session = session_id, "Could not perform sync: {error}"); |
|
|
|
|
|
|
|
|
|
// Sleep a little between attempts.
|
|
|
|
|
let delay = MISSED_SYNC_DELAYS[missed_sync_count]; |
|
|
|
|
Some(Duration::from_secs(delay)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|