|
|
|
|
@ -1,6 +1,10 @@
|
|
|
|
|
use crate::{database::DatabaseGuard, ConduitResult, Database, Error, Result, Ruma, RumaResponse}; |
|
|
|
|
use ruma::{ |
|
|
|
|
api::client::r0::{sync::sync_events, uiaa::UiaaResponse}, |
|
|
|
|
api::client::r0::{ |
|
|
|
|
filter::{IncomingFilterDefinition, LazyLoadOptions}, |
|
|
|
|
sync::sync_events, |
|
|
|
|
uiaa::UiaaResponse, |
|
|
|
|
}, |
|
|
|
|
events::{ |
|
|
|
|
room::member::{MembershipState, RoomMemberEventContent}, |
|
|
|
|
AnySyncEphemeralRoomEvent, EventType, |
|
|
|
|
@ -36,13 +40,15 @@ use rocket::{get, tokio};
|
|
|
|
|
/// Calling this endpoint with a `since` parameter from a previous `next_batch` returns:
|
|
|
|
|
/// For joined rooms:
|
|
|
|
|
/// - Some of the most recent events of each timeline that happened after since
|
|
|
|
|
/// - If user joined the room after since: All state events and device list updates in that room
|
|
|
|
|
/// - If user joined the room after since: All state events (unless lazy loading is activated) and
|
|
|
|
|
/// all device list updates in that room
|
|
|
|
|
/// - If the user was already in the room: A list of all events that are in the state now, but were
|
|
|
|
|
/// not in the state at `since`
|
|
|
|
|
/// - If the state we send contains a member event: Joined and invited member counts, heroes
|
|
|
|
|
/// - Device list updates that happened after `since`
|
|
|
|
|
/// - If there are events in the timeline we send or the user send updated his read mark: Notification counts
|
|
|
|
|
/// - EDUs that are active now (read receipts, typing updates, presence)
|
|
|
|
|
/// - TODO: Allow multiple sync streams to support Pantalaimon
|
|
|
|
|
///
|
|
|
|
|
/// For invited rooms:
|
|
|
|
|
/// - If the user was invited after `since`: A subset of the state of the room at the point of the invite
|
|
|
|
|
@ -77,34 +83,32 @@ pub async fn sync_events_route(
|
|
|
|
|
Entry::Vacant(v) => { |
|
|
|
|
let (tx, rx) = tokio::sync::watch::channel(None); |
|
|
|
|
|
|
|
|
|
v.insert((body.since.clone(), rx.clone())); |
|
|
|
|
|
|
|
|
|
tokio::spawn(sync_helper_wrapper( |
|
|
|
|
Arc::clone(&arc_db), |
|
|
|
|
sender_user.clone(), |
|
|
|
|
sender_device.clone(), |
|
|
|
|
body.since.clone(), |
|
|
|
|
body.full_state, |
|
|
|
|
body.timeout, |
|
|
|
|
body, |
|
|
|
|
tx, |
|
|
|
|
)); |
|
|
|
|
|
|
|
|
|
v.insert((body.since.clone(), rx)).1.clone() |
|
|
|
|
rx |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(mut o) => { |
|
|
|
|
if o.get().0 != body.since { |
|
|
|
|
let (tx, rx) = tokio::sync::watch::channel(None); |
|
|
|
|
|
|
|
|
|
o.insert((body.since.clone(), rx.clone())); |
|
|
|
|
|
|
|
|
|
tokio::spawn(sync_helper_wrapper( |
|
|
|
|
Arc::clone(&arc_db), |
|
|
|
|
sender_user.clone(), |
|
|
|
|
sender_device.clone(), |
|
|
|
|
body.since.clone(), |
|
|
|
|
body.full_state, |
|
|
|
|
body.timeout, |
|
|
|
|
body, |
|
|
|
|
tx, |
|
|
|
|
)); |
|
|
|
|
|
|
|
|
|
o.insert((body.since.clone(), rx.clone())); |
|
|
|
|
|
|
|
|
|
rx |
|
|
|
|
} else { |
|
|
|
|
o.get().1.clone() |
|
|
|
|
@ -135,18 +139,16 @@ async fn sync_helper_wrapper(
|
|
|
|
|
db: Arc<DatabaseGuard>, |
|
|
|
|
sender_user: Box<UserId>, |
|
|
|
|
sender_device: Box<DeviceId>, |
|
|
|
|
since: Option<String>, |
|
|
|
|
full_state: bool, |
|
|
|
|
timeout: Option<Duration>, |
|
|
|
|
body: sync_events::IncomingRequest, |
|
|
|
|
tx: Sender<Option<ConduitResult<sync_events::Response>>>, |
|
|
|
|
) { |
|
|
|
|
let since = body.since.clone(); |
|
|
|
|
|
|
|
|
|
let r = sync_helper( |
|
|
|
|
Arc::clone(&db), |
|
|
|
|
sender_user.clone(), |
|
|
|
|
sender_device.clone(), |
|
|
|
|
since.clone(), |
|
|
|
|
full_state, |
|
|
|
|
timeout, |
|
|
|
|
body, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
@ -179,9 +181,7 @@ async fn sync_helper(
|
|
|
|
|
db: Arc<DatabaseGuard>, |
|
|
|
|
sender_user: Box<UserId>, |
|
|
|
|
sender_device: Box<DeviceId>, |
|
|
|
|
since: Option<String>, |
|
|
|
|
full_state: bool, |
|
|
|
|
timeout: Option<Duration>, |
|
|
|
|
body: sync_events::IncomingRequest, |
|
|
|
|
// bool = caching allowed
|
|
|
|
|
) -> Result<(sync_events::Response, bool), Error> { |
|
|
|
|
// TODO: match body.set_presence {
|
|
|
|
|
@ -193,8 +193,26 @@ async fn sync_helper(
|
|
|
|
|
let next_batch = db.globals.current_count()?; |
|
|
|
|
let next_batch_string = next_batch.to_string(); |
|
|
|
|
|
|
|
|
|
// Load filter
|
|
|
|
|
let filter = match body.filter { |
|
|
|
|
None => IncomingFilterDefinition::default(), |
|
|
|
|
Some(sync_events::IncomingFilter::FilterDefinition(filter)) => filter, |
|
|
|
|
Some(sync_events::IncomingFilter::FilterId(filter_id)) => db |
|
|
|
|
.users |
|
|
|
|
.get_filter(&sender_user, &filter_id)? |
|
|
|
|
.unwrap_or_default(), |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options { |
|
|
|
|
LazyLoadOptions::Enabled { |
|
|
|
|
include_redundant_members: redundant, |
|
|
|
|
} => (true, redundant), |
|
|
|
|
_ => (false, false), |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let mut joined_rooms = BTreeMap::new(); |
|
|
|
|
let since = since |
|
|
|
|
let since = body |
|
|
|
|
.since |
|
|
|
|
.clone() |
|
|
|
|
.and_then(|string| string.parse().ok()) |
|
|
|
|
.unwrap_or(0); |
|
|
|
|
@ -264,6 +282,14 @@ async fn sync_helper(
|
|
|
|
|
// limited unless there are events in non_timeline_pdus
|
|
|
|
|
let limited = non_timeline_pdus.next().is_some(); |
|
|
|
|
|
|
|
|
|
let mut timeline_users = HashSet::new(); |
|
|
|
|
for (_, event) in &timeline_pdus { |
|
|
|
|
timeline_users.insert(event.sender.as_str().to_owned()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
db.rooms |
|
|
|
|
.lazy_load_confirm_delivery(&sender_user, &sender_device, &room_id, since)?; |
|
|
|
|
|
|
|
|
|
// Database queries:
|
|
|
|
|
|
|
|
|
|
let current_shortstatehash = db |
|
|
|
|
@ -344,14 +370,58 @@ async fn sync_helper(
|
|
|
|
|
state_events, |
|
|
|
|
) = if since_shortstatehash.is_none() { |
|
|
|
|
// Probably since = 0, we will do an initial sync
|
|
|
|
|
|
|
|
|
|
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?; |
|
|
|
|
|
|
|
|
|
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?; |
|
|
|
|
let state_events: Vec<_> = current_state_ids |
|
|
|
|
.iter() |
|
|
|
|
.map(|(_, id)| db.rooms.get_pdu(id)) |
|
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
let mut state_events = Vec::new(); |
|
|
|
|
let mut lazy_loaded = HashSet::new(); |
|
|
|
|
|
|
|
|
|
for (shortstatekey, id) in current_state_ids { |
|
|
|
|
let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?; |
|
|
|
|
|
|
|
|
|
if event_type != EventType::RoomMember { |
|
|
|
|
let pdu = match db.rooms.get_pdu(&id)? { |
|
|
|
|
Some(pdu) => pdu, |
|
|
|
|
None => { |
|
|
|
|
error!("Pdu in state not found: {}", id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
state_events.push(pdu); |
|
|
|
|
} else if !lazy_load_enabled |
|
|
|
|
|| body.full_state |
|
|
|
|
|| timeline_users.contains(&state_key) |
|
|
|
|
{ |
|
|
|
|
let pdu = match db.rooms.get_pdu(&id)? { |
|
|
|
|
Some(pdu) => pdu, |
|
|
|
|
None => { |
|
|
|
|
error!("Pdu in state not found: {}", id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
lazy_loaded.insert( |
|
|
|
|
UserId::parse(state_key.as_ref()) |
|
|
|
|
.expect("they are in timeline_users, so they should be correct"), |
|
|
|
|
); |
|
|
|
|
state_events.push(pdu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Reset lazy loading because this is an initial sync
|
|
|
|
|
db.rooms |
|
|
|
|
.lazy_load_reset(&sender_user, &sender_device, &room_id)?; |
|
|
|
|
|
|
|
|
|
// The state_events above should contain all timeline_users, let's mark them as lazy
|
|
|
|
|
// loaded.
|
|
|
|
|
db.rooms.lazy_load_mark_sent( |
|
|
|
|
&sender_user, |
|
|
|
|
&sender_device, |
|
|
|
|
&room_id, |
|
|
|
|
lazy_loaded, |
|
|
|
|
next_batch, |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
( |
|
|
|
|
heroes, |
|
|
|
|
@ -387,20 +457,67 @@ async fn sync_helper(
|
|
|
|
|
|
|
|
|
|
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?; |
|
|
|
|
|
|
|
|
|
let state_events = if joined_since_last_sync { |
|
|
|
|
current_state_ids |
|
|
|
|
.iter() |
|
|
|
|
.map(|(_, id)| db.rooms.get_pdu(id)) |
|
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
|
.collect::<Vec<_>>() |
|
|
|
|
} else { |
|
|
|
|
current_state_ids |
|
|
|
|
.iter() |
|
|
|
|
.filter(|(key, id)| since_state_ids.get(key) != Some(id)) |
|
|
|
|
.map(|(_, id)| db.rooms.get_pdu(id)) |
|
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
|
.collect() |
|
|
|
|
}; |
|
|
|
|
let mut state_events = Vec::new(); |
|
|
|
|
let mut lazy_loaded = HashSet::new(); |
|
|
|
|
|
|
|
|
|
for (key, id) in current_state_ids { |
|
|
|
|
if body.full_state || since_state_ids.get(&key) != Some(&id) { |
|
|
|
|
let pdu = match db.rooms.get_pdu(&id)? { |
|
|
|
|
Some(pdu) => pdu, |
|
|
|
|
None => { |
|
|
|
|
error!("Pdu in state not found: {}", id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if pdu.kind == EventType::RoomMember { |
|
|
|
|
match UserId::parse( |
|
|
|
|
pdu.state_key |
|
|
|
|
.as_ref() |
|
|
|
|
.expect("State event has state key") |
|
|
|
|
.clone(), |
|
|
|
|
) { |
|
|
|
|
Ok(state_key_userid) => { |
|
|
|
|
lazy_loaded.insert(state_key_userid); |
|
|
|
|
} |
|
|
|
|
Err(e) => error!("Invalid state key for member event: {}", e), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
state_events.push(pdu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (_, event) in &timeline_pdus { |
|
|
|
|
if lazy_loaded.contains(&event.sender) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !db.rooms.lazy_load_was_sent_before( |
|
|
|
|
&sender_user, |
|
|
|
|
&sender_device, |
|
|
|
|
&room_id, |
|
|
|
|
&event.sender, |
|
|
|
|
)? || lazy_load_send_redundant |
|
|
|
|
{ |
|
|
|
|
if let Some(member_event) = db.rooms.room_state_get( |
|
|
|
|
&room_id, |
|
|
|
|
&EventType::RoomMember, |
|
|
|
|
event.sender.as_str(), |
|
|
|
|
)? { |
|
|
|
|
lazy_loaded.insert(event.sender.clone()); |
|
|
|
|
state_events.push(member_event); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
db.rooms.lazy_load_mark_sent( |
|
|
|
|
&sender_user, |
|
|
|
|
&sender_device, |
|
|
|
|
&room_id, |
|
|
|
|
lazy_loaded, |
|
|
|
|
next_batch, |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let encrypted_room = db |
|
|
|
|
.rooms |
|
|
|
|
@ -765,7 +882,7 @@ async fn sync_helper(
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// TODO: Retry the endpoint instead of returning (waiting for #118)
|
|
|
|
|
if !full_state |
|
|
|
|
if !body.full_state |
|
|
|
|
&& response.rooms.is_empty() |
|
|
|
|
&& response.presence.is_empty() |
|
|
|
|
&& response.account_data.is_empty() |
|
|
|
|
@ -774,7 +891,7 @@ async fn sync_helper(
|
|
|
|
|
{ |
|
|
|
|
// Hang a few seconds so requests are not spammed
|
|
|
|
|
// Stop hanging if new info arrives
|
|
|
|
|
let mut duration = timeout.unwrap_or_default(); |
|
|
|
|
let mut duration = body.timeout.unwrap_or_default(); |
|
|
|
|
if duration.as_secs() > 30 { |
|
|
|
|
duration = Duration::from_secs(30); |
|
|
|
|
} |
|
|
|
|
|