|
|
|
|
@ -300,174 +300,177 @@ async fn sync_helper(
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut left_rooms = BTreeMap::new(); |
|
|
|
|
let all_left_rooms: Vec<_> = services() |
|
|
|
|
.rooms |
|
|
|
|
.state_cache |
|
|
|
|
.rooms_left(&sender_user) |
|
|
|
|
.collect(); |
|
|
|
|
for result in all_left_rooms { |
|
|
|
|
let (room_id, _) = result?; |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
// Get and drop the lock to wait for remaining operations to finish
|
|
|
|
|
let mutex_insert = Arc::clone( |
|
|
|
|
services() |
|
|
|
|
.globals |
|
|
|
|
.roomid_mutex_insert |
|
|
|
|
.write() |
|
|
|
|
.await |
|
|
|
|
.entry(room_id.clone()) |
|
|
|
|
.or_default(), |
|
|
|
|
); |
|
|
|
|
let insert_lock = mutex_insert.lock().await; |
|
|
|
|
drop(insert_lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let left_count = services() |
|
|
|
|
if filter.room.include_leave { |
|
|
|
|
let all_left_rooms: Vec<_> = services() |
|
|
|
|
.rooms |
|
|
|
|
.state_cache |
|
|
|
|
.get_left_count(&room_id, &sender_user)?; |
|
|
|
|
.rooms_left(&sender_user) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
// Left before last sync
|
|
|
|
|
if Some(since) >= left_count { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
for result in all_left_rooms { |
|
|
|
|
let (room_id, _) = result?; |
|
|
|
|
|
|
|
|
|
if !services().rooms.metadata.exists(&room_id)? { |
|
|
|
|
// This is just a rejected invite, not a room we know
|
|
|
|
|
let event = PduEvent { |
|
|
|
|
event_id: EventId::new(services().globals.server_name()).into(), |
|
|
|
|
sender: sender_user.clone(), |
|
|
|
|
origin_server_ts: utils::millis_since_unix_epoch() |
|
|
|
|
.try_into() |
|
|
|
|
.expect("Timestamp is valid js_int value"), |
|
|
|
|
kind: TimelineEventType::RoomMember, |
|
|
|
|
content: serde_json::from_str(r#"{ "membership": "leave"}"#).unwrap(), |
|
|
|
|
state_key: Some(sender_user.to_string()), |
|
|
|
|
unsigned: None, |
|
|
|
|
// The following keys are dropped on conversion
|
|
|
|
|
room_id: room_id.clone(), |
|
|
|
|
prev_events: vec![], |
|
|
|
|
depth: uint!(1), |
|
|
|
|
auth_events: vec![], |
|
|
|
|
redacts: None, |
|
|
|
|
hashes: EventHash { |
|
|
|
|
sha256: String::new(), |
|
|
|
|
}, |
|
|
|
|
signatures: None, |
|
|
|
|
}; |
|
|
|
|
{ |
|
|
|
|
// Get and drop the lock to wait for remaining operations to finish
|
|
|
|
|
let mutex_insert = Arc::clone( |
|
|
|
|
services() |
|
|
|
|
.globals |
|
|
|
|
.roomid_mutex_insert |
|
|
|
|
.write() |
|
|
|
|
.await |
|
|
|
|
.entry(room_id.clone()) |
|
|
|
|
.or_default(), |
|
|
|
|
); |
|
|
|
|
let insert_lock = mutex_insert.lock().await; |
|
|
|
|
drop(insert_lock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
left_rooms.insert( |
|
|
|
|
room_id, |
|
|
|
|
LeftRoom { |
|
|
|
|
account_data: RoomAccountData { events: Vec::new() }, |
|
|
|
|
timeline: Timeline { |
|
|
|
|
limited: false, |
|
|
|
|
prev_batch: Some(next_batch_string.clone()), |
|
|
|
|
events: Vec::new(), |
|
|
|
|
let left_count = services() |
|
|
|
|
.rooms |
|
|
|
|
.state_cache |
|
|
|
|
.get_left_count(&room_id, &sender_user)?; |
|
|
|
|
|
|
|
|
|
// Left before last sync
|
|
|
|
|
if Some(since) >= left_count { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !services().rooms.metadata.exists(&room_id)? { |
|
|
|
|
// This is just a rejected invite, not a room we know
|
|
|
|
|
let event = PduEvent { |
|
|
|
|
event_id: EventId::new(services().globals.server_name()).into(), |
|
|
|
|
sender: sender_user.clone(), |
|
|
|
|
origin_server_ts: utils::millis_since_unix_epoch() |
|
|
|
|
.try_into() |
|
|
|
|
.expect("Timestamp is valid js_int value"), |
|
|
|
|
kind: TimelineEventType::RoomMember, |
|
|
|
|
content: serde_json::from_str(r#"{ "membership": "leave"}"#).unwrap(), |
|
|
|
|
state_key: Some(sender_user.to_string()), |
|
|
|
|
unsigned: None, |
|
|
|
|
// The following keys are dropped on conversion
|
|
|
|
|
room_id: room_id.clone(), |
|
|
|
|
prev_events: vec![], |
|
|
|
|
depth: uint!(1), |
|
|
|
|
auth_events: vec![], |
|
|
|
|
redacts: None, |
|
|
|
|
hashes: EventHash { |
|
|
|
|
sha256: String::new(), |
|
|
|
|
}, |
|
|
|
|
state: State { |
|
|
|
|
events: vec![event.to_sync_state_event()], |
|
|
|
|
signatures: None, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
left_rooms.insert( |
|
|
|
|
room_id, |
|
|
|
|
LeftRoom { |
|
|
|
|
account_data: RoomAccountData { events: Vec::new() }, |
|
|
|
|
timeline: Timeline { |
|
|
|
|
limited: false, |
|
|
|
|
prev_batch: Some(next_batch_string.clone()), |
|
|
|
|
events: Vec::new(), |
|
|
|
|
}, |
|
|
|
|
state: State { |
|
|
|
|
events: vec![event.to_sync_state_event()], |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
); |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut left_state_events = Vec::new(); |
|
|
|
|
let mut left_state_events = Vec::new(); |
|
|
|
|
|
|
|
|
|
let since_shortstatehash = services() |
|
|
|
|
.rooms |
|
|
|
|
.user |
|
|
|
|
.get_token_shortstatehash(&room_id, since)?; |
|
|
|
|
let since_shortstatehash = services() |
|
|
|
|
.rooms |
|
|
|
|
.user |
|
|
|
|
.get_token_shortstatehash(&room_id, since)?; |
|
|
|
|
|
|
|
|
|
let since_state_ids = match since_shortstatehash { |
|
|
|
|
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, |
|
|
|
|
None => HashMap::new(), |
|
|
|
|
}; |
|
|
|
|
let since_state_ids = match since_shortstatehash { |
|
|
|
|
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, |
|
|
|
|
None => HashMap::new(), |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let left_event_id = match services().rooms.state_accessor.room_state_get_id( |
|
|
|
|
&room_id, |
|
|
|
|
&StateEventType::RoomMember, |
|
|
|
|
sender_user.as_str(), |
|
|
|
|
)? { |
|
|
|
|
Some(e) => e, |
|
|
|
|
None => { |
|
|
|
|
error!("Left room but no left state event"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let left_event_id = match services().rooms.state_accessor.room_state_get_id( |
|
|
|
|
&room_id, |
|
|
|
|
&StateEventType::RoomMember, |
|
|
|
|
sender_user.as_str(), |
|
|
|
|
)? { |
|
|
|
|
Some(e) => e, |
|
|
|
|
None => { |
|
|
|
|
error!("Left room but no left state event"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let left_shortstatehash = match services() |
|
|
|
|
.rooms |
|
|
|
|
.state_accessor |
|
|
|
|
.pdu_shortstatehash(&left_event_id)? |
|
|
|
|
{ |
|
|
|
|
Some(s) => s, |
|
|
|
|
None => { |
|
|
|
|
error!("Leave event has no state"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let left_shortstatehash = match services() |
|
|
|
|
.rooms |
|
|
|
|
.state_accessor |
|
|
|
|
.pdu_shortstatehash(&left_event_id)? |
|
|
|
|
{ |
|
|
|
|
Some(s) => s, |
|
|
|
|
None => { |
|
|
|
|
error!("Leave event has no state"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let mut left_state_ids = services() |
|
|
|
|
.rooms |
|
|
|
|
.state_accessor |
|
|
|
|
.state_full_ids(left_shortstatehash) |
|
|
|
|
.await?; |
|
|
|
|
let mut left_state_ids = services() |
|
|
|
|
.rooms |
|
|
|
|
.state_accessor |
|
|
|
|
.state_full_ids(left_shortstatehash) |
|
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
let leave_shortstatekey = services() |
|
|
|
|
.rooms |
|
|
|
|
.short |
|
|
|
|
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; |
|
|
|
|
let leave_shortstatekey = services() |
|
|
|
|
.rooms |
|
|
|
|
.short |
|
|
|
|
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; |
|
|
|
|
|
|
|
|
|
left_state_ids.insert(leave_shortstatekey, left_event_id); |
|
|
|
|
left_state_ids.insert(leave_shortstatekey, left_event_id); |
|
|
|
|
|
|
|
|
|
let mut i = 0; |
|
|
|
|
for (key, id) in left_state_ids { |
|
|
|
|
if full_state || since_state_ids.get(&key) != Some(&id) { |
|
|
|
|
let (event_type, state_key) = |
|
|
|
|
services().rooms.short.get_statekey_from_short(key)?; |
|
|
|
|
let mut i = 0; |
|
|
|
|
for (key, id) in left_state_ids { |
|
|
|
|
if full_state || since_state_ids.get(&key) != Some(&id) { |
|
|
|
|
let (event_type, state_key) = |
|
|
|
|
services().rooms.short.get_statekey_from_short(key)?; |
|
|
|
|
|
|
|
|
|
if !lazy_load_enabled |
|
|
|
|
if !lazy_load_enabled |
|
|
|
|
|| event_type != StateEventType::RoomMember |
|
|
|
|
|| full_state |
|
|
|
|
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
|
|
|
|
|| *sender_user == state_key |
|
|
|
|
{ |
|
|
|
|
let pdu = match services().rooms.timeline.get_pdu(&id)? { |
|
|
|
|
Some(pdu) => pdu, |
|
|
|
|
None => { |
|
|
|
|
error!("Pdu in state not found: {}", id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
{ |
|
|
|
|
let pdu = match services().rooms.timeline.get_pdu(&id)? { |
|
|
|
|
Some(pdu) => pdu, |
|
|
|
|
None => { |
|
|
|
|
error!("Pdu in state not found: {}", id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
left_state_events.push(pdu.to_sync_state_event()); |
|
|
|
|
left_state_events.push(pdu.to_sync_state_event()); |
|
|
|
|
|
|
|
|
|
i += 1; |
|
|
|
|
if i % 100 == 0 { |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
i += 1; |
|
|
|
|
if i % 100 == 0 { |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
left_rooms.insert( |
|
|
|
|
room_id.clone(), |
|
|
|
|
LeftRoom { |
|
|
|
|
account_data: RoomAccountData { events: Vec::new() }, |
|
|
|
|
timeline: Timeline { |
|
|
|
|
limited: false, |
|
|
|
|
prev_batch: Some(next_batch_string.clone()), |
|
|
|
|
events: Vec::new(), |
|
|
|
|
}, |
|
|
|
|
state: State { |
|
|
|
|
events: left_state_events, |
|
|
|
|
left_rooms.insert( |
|
|
|
|
room_id.clone(), |
|
|
|
|
LeftRoom { |
|
|
|
|
account_data: RoomAccountData { events: Vec::new() }, |
|
|
|
|
timeline: Timeline { |
|
|
|
|
limited: false, |
|
|
|
|
prev_batch: Some(next_batch_string.clone()), |
|
|
|
|
events: Vec::new(), |
|
|
|
|
}, |
|
|
|
|
state: State { |
|
|
|
|
events: left_state_events, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
); |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut invited_rooms = BTreeMap::new(); |
|
|
|
|
|