|
|
|
|
@ -119,7 +119,15 @@ impl Service {
|
|
|
|
|
.ok_or_else(|| Error::bad_database("Failed to find first pdu in db."))?; |
|
|
|
|
|
|
|
|
|
let (incoming_pdu, val) = self |
|
|
|
|
.handle_outlier_pdu(origin, &create_event, event_id, room_id, value, pub_key_map) |
|
|
|
|
.handle_outlier_pdu( |
|
|
|
|
origin, |
|
|
|
|
&create_event, |
|
|
|
|
event_id, |
|
|
|
|
room_id, |
|
|
|
|
value, |
|
|
|
|
false, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
self.check_room_id(room_id, &incoming_pdu)?; |
|
|
|
|
|
|
|
|
|
@ -276,6 +284,7 @@ impl Service {
|
|
|
|
|
event_id: &'a EventId, |
|
|
|
|
room_id: &'a RoomId, |
|
|
|
|
mut value: BTreeMap<String, CanonicalJsonValue>, |
|
|
|
|
auth_events_known: bool, |
|
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, |
|
|
|
|
) -> AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>> { |
|
|
|
|
Box::pin(async move { |
|
|
|
|
@ -343,23 +352,25 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
self.check_room_id(room_id, &incoming_pdu)?; |
|
|
|
|
|
|
|
|
|
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
|
|
|
|
|
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
|
|
|
|
|
// NOTE: Step 5 is not applied anymore because it failed too often
|
|
|
|
|
debug!(event_id = ?incoming_pdu.event_id, "Fetching auth events"); |
|
|
|
|
self.fetch_and_handle_outliers( |
|
|
|
|
origin, |
|
|
|
|
&incoming_pdu |
|
|
|
|
.auth_events |
|
|
|
|
.iter() |
|
|
|
|
.map(|x| Arc::from(&**x)) |
|
|
|
|
.collect::<Vec<_>>(), |
|
|
|
|
create_event, |
|
|
|
|
room_id, |
|
|
|
|
room_version_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
if !auth_events_known { |
|
|
|
|
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
|
|
|
|
|
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
|
|
|
|
|
// NOTE: Step 5 is not applied anymore because it failed too often
|
|
|
|
|
debug!(event_id = ?incoming_pdu.event_id, "Fetching auth events"); |
|
|
|
|
self.fetch_and_handle_outliers( |
|
|
|
|
origin, |
|
|
|
|
&incoming_pdu |
|
|
|
|
.auth_events |
|
|
|
|
.iter() |
|
|
|
|
.map(|x| Arc::from(&**x)) |
|
|
|
|
.collect::<Vec<_>>(), |
|
|
|
|
create_event, |
|
|
|
|
room_id, |
|
|
|
|
room_version_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
|
|
|
|
|
debug!( |
|
|
|
|
@ -1015,26 +1026,6 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
let mut pdus = vec![]; |
|
|
|
|
for id in events { |
|
|
|
|
if let Some((time, tries)) = services() |
|
|
|
|
.globals |
|
|
|
|
.bad_event_ratelimiter |
|
|
|
|
.read() |
|
|
|
|
.unwrap() |
|
|
|
|
.get(&**id) |
|
|
|
|
{ |
|
|
|
|
// Exponential backoff
|
|
|
|
|
let mut min_elapsed_duration = |
|
|
|
|
Duration::from_secs(5 * 60) * (*tries) * (*tries); |
|
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
|
min_elapsed_duration = Duration::from_secs(60 * 60 * 24); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if time.elapsed() < min_elapsed_duration { |
|
|
|
|
info!("Backing off from {}", id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
|
// b. Look at outlier pdu tree
|
|
|
|
|
// (get_pdu_json checks both)
|
|
|
|
|
@ -1052,6 +1043,26 @@ impl Service {
|
|
|
|
|
let mut events_all = HashSet::new(); |
|
|
|
|
let mut i = 0; |
|
|
|
|
while let Some(next_id) = todo_auth_events.pop() { |
|
|
|
|
if let Some((time, tries)) = services() |
|
|
|
|
.globals |
|
|
|
|
.bad_event_ratelimiter |
|
|
|
|
.read() |
|
|
|
|
.unwrap() |
|
|
|
|
.get(&*next_id) |
|
|
|
|
{ |
|
|
|
|
// Exponential backoff
|
|
|
|
|
let mut min_elapsed_duration = |
|
|
|
|
Duration::from_secs(5 * 60) * (*tries) * (*tries); |
|
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
|
min_elapsed_duration = Duration::from_secs(60 * 60 * 24); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if time.elapsed() < min_elapsed_duration { |
|
|
|
|
info!("Backing off from {}", next_id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if events_all.contains(&next_id) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
@ -1062,7 +1073,7 @@ impl Service {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let Ok(Some(_)) = services().rooms.timeline.get_pdu(&next_id) { |
|
|
|
|
trace!("Found {} in db", id); |
|
|
|
|
trace!("Found {} in db", next_id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -1121,6 +1132,26 @@ impl Service {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (next_id, value) in events_in_reverse_order.iter().rev() { |
|
|
|
|
if let Some((time, tries)) = services() |
|
|
|
|
.globals |
|
|
|
|
.bad_event_ratelimiter |
|
|
|
|
.read() |
|
|
|
|
.unwrap() |
|
|
|
|
.get(&**next_id) |
|
|
|
|
{ |
|
|
|
|
// Exponential backoff
|
|
|
|
|
let mut min_elapsed_duration = |
|
|
|
|
Duration::from_secs(5 * 60) * (*tries) * (*tries); |
|
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
|
min_elapsed_duration = Duration::from_secs(60 * 60 * 24); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if time.elapsed() < min_elapsed_duration { |
|
|
|
|
info!("Backing off from {}", next_id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
match self |
|
|
|
|
.handle_outlier_pdu( |
|
|
|
|
origin, |
|
|
|
|
@ -1128,6 +1159,7 @@ impl Service {
|
|
|
|
|
next_id, |
|
|
|
|
room_id, |
|
|
|
|
value.clone(), |
|
|
|
|
true, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
|