@ -1,8 +1,29 @@
/// An async function that can recursively call itself.
type AsyncRecursiveType < ' a , T > = Pin < Box < dyn Future < Output = T > + ' a + Send > > ;
use crate ::service ::* ;
use std ::{
collections ::{ btree_map , hash_map , BTreeMap , HashMap , HashSet } ,
pin ::Pin ,
sync ::{ Arc , RwLock } ,
time ::{ Duration , Instant } ,
} ;
use futures_util ::Future ;
use ruma ::{
api ::{
client ::error ::ErrorKind ,
federation ::event ::{ get_event , get_room_state_ids } ,
} ,
events ::{ room ::create ::RoomCreateEventContent , StateEventType } ,
int ,
serde ::Base64 ,
signatures ::CanonicalJsonValue ,
state_res ::{ self , RoomVersion , StateMap } ,
uint , EventId , MilliSecondsSinceUnixEpoch , RoomId , ServerName ,
} ;
use tracing ::{ error , info , trace , warn } ;
use crate ::{ service ::* , services , Error , PduEvent } ;
pub struct Service ;
@ -31,45 +52,47 @@ impl Service {
/// it
/// 14. Use state resolution to find new room state
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
#[ tracing::instrument(skip(value, is_timeline_event, db, pub_key_map)) ]
#[ tracing::instrument(skip(value, is_timeline_event, pub_key_map)) ]
pub ( crate ) async fn handle_incoming_pdu < ' a > (
& self ,
origin : & ' a ServerName ,
event_id : & ' a EventId ,
room_id : & ' a RoomId ,
value : BTreeMap < String , CanonicalJsonValue > ,
is_timeline_event : bool ,
db : & ' a Database ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap < String , Base64 > > > ,
) -> Result < Option < Vec < u8 > > > {
db . rooms . exists ( room_id ) ? . ok_or ( Error ::BadRequest ( ErrorKind ::NotFound , "Room is unknown to this server" ) ) ? ;
services ( ) . rooms . exists ( room_id ) ? . ok_or ( Error ::BadRequest (
ErrorKind ::NotFound ,
"Room is unknown to this server" ,
) ) ? ;
services ( )
. rooms
. is_disabled ( room_id ) ?
. ok_or ( Error ::BadRequest (
ErrorKind ::Forbidden ,
"Federation of this room is currently disabled on this server." ,
) ) ? ;
db . rooms . is_disabled ( room_id ) ? . ok_or ( Error ::BadRequest ( ErrorKind ::Forbidden , "Federation of this room is currently disabled on this server." ) ) ? ;
// 1. Skip the PDU if we already have it as a timeline event
if let Some ( pdu_id ) = db . rooms . get_pdu_id ( event_id ) ? {
return Some ( pdu_id . to_vec ( ) ) ;
if let Some ( pdu_id ) = services ( ) . rooms . get_pdu_id ( event_id ) ? {
return Ok ( Some ( pdu_id . to_vec ( ) ) ) ;
}
let create_event = db
let create_event = services ( )
. rooms
. room_state_get ( room_id , & StateEventType ::RoomCreate , "" ) ?
. ok_or_else ( | | Error ::bad_database ( "Failed to find create event in db." ) ) ? ;
let first_pdu_in_room = db
let first_pdu_in_room = services ( )
. rooms
. first_pdu_in_room ( room_id ) ?
. ok_or_else ( | | Error ::bad_database ( "Failed to find first pdu in db." ) ) ? ;
let ( incoming_pdu , val ) = handle_outlier_pdu (
origin ,
& create_event ,
event_id ,
room_id ,
value ,
db ,
pub_key_map ,
)
. await ? ;
let ( incoming_pdu , val ) = self
. handle_outlier_pdu ( origin , & create_event , event_id , room_id , value , pub_key_map )
. await ? ;
// 8. if not timeline event: stop
if ! is_timeline_event {
@ -82,15 +105,27 @@ impl Service {
}
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let sorted_prev_events = fetch_unknown_prev_events ( incoming_pdu . prev_events . clone ( ) ) ;
let ( sorted_prev_events , eventid_info ) = self . fetch_unknown_prev_events (
origin ,
& create_event ,
room_id ,
pub_key_map ,
incoming_pdu . prev_events . clone ( ) ,
) ;
let mut errors = 0 ;
for prev_id in dbg! ( sorted ) {
for prev_id in dbg! ( sorted_prev_events ) {
// Check for disabled again because it might have changed
db . rooms . is_disabled ( room_id ) ? . ok_or ( Error ::BadRequest ( ErrorKind ::Forbidden , " Federation of
this room is currently disabled on this server . " ) ) ? ;
if let Some ( ( time , tries ) ) = db
services ( )
. rooms
. is_disabled ( room_id ) ?
. ok_or ( Error ::BadRequest (
ErrorKind ::Forbidden ,
" Federation of
this room is currently disabled on this server . " ,
) ) ? ;
if let Some ( ( time , tries ) ) = services ( )
. globals
. bad_event_ratelimiter
. read ( )
@ -120,26 +155,27 @@ impl Service {
}
let start_time = Instant ::now ( ) ;
db . globals
services ( )
. globals
. roomid_federationhandletime
. write ( )
. unwrap ( )
. insert ( room_id . to_owned ( ) , ( ( * prev_id ) . to_owned ( ) , start_time ) ) ;
if let Err ( e ) = upgrade_outlier_to_timeline_pdu (
pdu ,
json ,
& create_event ,
origin ,
db ,
room_id ,
pub_key_map ,
)
. await
if let Err ( e ) = self
. upgrade_outlier_to_timeline_pdu (
pdu ,
json ,
& create_event ,
origin ,
room_id ,
pub_key_map ,
)
. await
{
errors + = 1 ;
warn ! ( "Prev event {} failed: {}" , prev_id , e ) ;
match db
match services ( )
. globals
. bad_event_ratelimiter
. write ( )
@ -155,7 +191,8 @@ impl Service {
}
}
let elapsed = start_time . elapsed ( ) ;
db . globals
services ( )
. globals
. roomid_federationhandletime
. write ( )
. unwrap ( )
@ -172,22 +209,23 @@ impl Service {
// Done with prev events, now handling the incoming event
let start_time = Instant ::now ( ) ;
db . globals
services ( )
. globals
. roomid_federationhandletime
. write ( )
. unwrap ( )
. insert ( room_id . to_owned ( ) , ( event_id . to_owned ( ) , start_time ) ) ;
let r = upgrade_outlier_to_timeline_pdu (
let r = services ( ) . rooms . event_handler . upgrade_outlier_to_timeline_pdu (
incoming_pdu ,
val ,
& create_event ,
origin ,
db ,
room_id ,
pub_key_map ,
)
. await ;
db . globals
services ( )
. globals
. roomid_federationhandletime
. write ( )
. unwrap ( )
@ -196,22 +234,23 @@ impl Service {
r
}
#[ tracing::instrument(skip(create_event, value, db, pub_key_map)) ]
#[ tracing::instrument(skip(create_event, value, pub_key_map)) ]
fn handle_outlier_pdu < ' a > (
& self ,
origin : & ' a ServerName ,
create_event : & ' a PduEvent ,
event_id : & ' a EventId ,
room_id : & ' a RoomId ,
value : BTreeMap < String , CanonicalJsonValue > ,
db : & ' a Database ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap < String , Base64 > > > ,
) -> AsyncRecursiveType < ' a , Result < ( Arc < PduEvent > , BTreeMap < String , CanonicalJsonValue > ) , String > > {
) -> AsyncRecursiveType < ' a , Result < ( Arc < PduEvent > , BTreeMap < String , CanonicalJsonValue > ) , String > >
{
Box ::pin ( async move {
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// We go through all the signatures we see on the value and fetch the corresponding signing
// keys
fetch_required_signing_keys ( & value , pub_key_map , db )
self . fetch_required_signing_keys ( & value , pub_key_map , db )
. await ? ;
// 2. Check signatures, otherwise drop
@ -223,7 +262,8 @@ impl Service {
} ) ? ;
let room_version_id = & create_event_content . room_version ;
let room_version = RoomVersion ::new ( room_version_id ) . expect ( "room version is supported" ) ;
let room_version =
RoomVersion ::new ( room_version_id ) . expect ( "room version is supported" ) ;
let mut val = match ruma ::signatures ::verify_event (
& * pub_key_map . read ( ) . map_err ( | _ | "RwLock is poisoned." ) ? ,
@ -261,8 +301,7 @@ impl Service {
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often
warn ! ( "Fetching auth events for {}" , incoming_pdu . event_id ) ;
fetch_and_handle_outliers (
db ,
self . fetch_and_handle_outliers (
origin ,
& incoming_pdu
. auth_events
@ -284,7 +323,7 @@ impl Service {
// Build map of auth events
let mut auth_events = HashMap ::new ( ) ;
for id in & incoming_pdu . auth_events {
let auth_event = match db . rooms . get_pdu ( id ) ? {
let auth_event = match services ( ) . rooms . get_pdu ( id ) ? {
Some ( e ) = > e ,
None = > {
warn ! ( "Could not find auth event {}" , id ) ;
@ -303,8 +342,9 @@ impl Service {
v . insert ( auth_event ) ;
}
hash_map ::Entry ::Occupied ( _ ) = > {
return Err ( Error ::BadRequest ( ErrorKind ::InvalidParam ,
"Auth event's type and state_key combination exists multiple times."
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Auth event's type and state_key combination exists multiple times." ,
) ) ;
}
}
@ -316,7 +356,10 @@ impl Service {
. map ( | a | a . as_ref ( ) )
! = Some ( create_event )
{
return Err ( Error ::BadRequest ( ErrorKind ::InvalidParam ( "Incoming event refers to wrong create event." ) ) ) ;
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Incoming event refers to wrong create event." ,
) ) ;
}
if ! state_res ::event_auth ::auth_check (
@ -325,15 +368,21 @@ impl Service {
None ::< PduEvent > , // TODO: third party invite
| k , s | auth_events . get ( & ( k . to_string ( ) . into ( ) , s . to_owned ( ) ) ) ,
)
. map_err ( | e | { error ! ( e ) ; Error ::BadRequest ( ErrorKind ::InvalidParam , "Auth check failed" ) } ) ?
{
return Err ( Error ::BadRequest ( ErrorKind ::InvalidParam , "Auth check failed" ) ) ;
. map_err ( | e | {
error ! ( e ) ;
Error ::BadRequest ( ErrorKind ::InvalidParam , "Auth check failed" )
} ) ? {
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Auth check failed" ,
) ) ;
}
info ! ( "Validation successful." ) ;
// 7. Persist the event as an outlier.
db . rooms
services ( )
. rooms
. add_pdu_outlier ( & incoming_pdu . event_id , & val ) ? ;
info ! ( "Added pdu as outlier." ) ;
@ -342,22 +391,22 @@ impl Service {
} )
}
#[ tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map)) ]
async fn upgrade_outlier_to_timeline_pdu (
#[ tracing::instrument(skip(incoming_pdu, val, create_event, pub_key_map)) ]
pub async fn upgrade_outlier_to_timeline_pdu (
& self ,
incoming_pdu : Arc < PduEvent > ,
val : BTreeMap < String , CanonicalJsonValue > ,
create_event : & PduEvent ,
origin : & ServerName ,
db : & Database ,
room_id : & RoomId ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap < String , Base64 > > > ,
) -> Result < Option < Vec < u8 > > , String > {
// Skip the PDU if we already have it as a timeline event
if let Ok ( Some ( pduid ) ) = db . rooms . get_pdu_id ( & incoming_pdu . event_id ) {
if let Ok ( Some ( pduid ) ) = services ( ) . rooms . get_pdu_id ( & incoming_pdu . event_id ) {
return Ok ( Some ( pduid ) ) ;
}
if db
if services ( )
. rooms
. is_event_soft_failed ( & incoming_pdu . event_id )
. map_err ( | _ | "Failed to ask db for soft fail" . to_owned ( ) ) ?
@ -387,32 +436,32 @@ impl Service {
if incoming_pdu . prev_events . len ( ) = = 1 {
let prev_event = & * incoming_pdu . prev_events [ 0 ] ;
let prev_event_sstatehash = db
let prev_event_sstatehash = services ( )
. rooms
. pdu_shortstatehash ( prev_event )
. map_err ( | _ | "Failed talking to db" . to_owned ( ) ) ? ;
let state = if let Some ( shortstatehash ) = prev_event_sstatehash {
Some ( db . rooms . state_full_ids ( shortstatehash ) . await )
Some ( services ( ) . rooms . state_full_ids ( shortstatehash ) . await )
} else {
None
} ;
if let Some ( Ok ( mut state ) ) = state {
info ! ( "Using cached state" ) ;
let prev_pdu =
db . rooms . get_pdu ( prev_event ) . ok ( ) . flatten ( ) . ok_or_else ( | | {
let prev_pdu = services ( )
. rooms
. get_pdu ( prev_event )
. ok ( )
. flatten ( )
. ok_or_else ( | | {
"Could not find prev event, but we know the state." . to_owned ( )
} ) ? ;
if let Some ( state_key ) = & prev_pdu . state_key {
let shortstatekey = db
let shortstatekey = services ( )
. rooms
. get_or_create_shortstatekey (
& prev_pdu . kind . to_string ( ) . into ( ) ,
state_key ,
& db . globals ,
)
. get_or_create_shortstatekey ( & prev_pdu . kind . to_string ( ) . into ( ) , state_key )
. map_err ( | _ | "Failed to create shortstatekey." . to_owned ( ) ) ? ;
state . insert ( shortstatekey , Arc ::from ( prev_event ) ) ;
@ -427,19 +476,20 @@ impl Service {
let mut okay = true ;
for prev_eventid in & incoming_pdu . prev_events {
let prev_event = if let Ok ( Some ( pdu ) ) = db . rooms . get_pdu ( prev_eventid ) {
let prev_event = if let Ok ( Some ( pdu ) ) = services ( ) . rooms . get_pdu ( prev_eventid ) {
pdu
} else {
okay = false ;
break ;
} ;
let sstatehash = if let Ok ( Some ( s ) ) = db . rooms . pdu_shortstatehash ( prev_eventid ) {
s
} else {
okay = false ;
break ;
} ;
let sstatehash =
if let Ok ( Some ( s ) ) = services ( ) . rooms . pdu_shortstatehash ( prev_eventid ) {
s
} else {
okay = false ;
break ;
} ;
extremity_sstatehashes . insert ( sstatehash , prev_event ) ;
}
@ -449,19 +499,18 @@ impl Service {
let mut auth_chain_sets = Vec ::with_capacity ( extremity_sstatehashes . len ( ) ) ;
for ( sstatehash , prev_event ) in extremity_sstatehashes {
let mut leaf_state : BTreeMap < _ , _ > = db
let mut leaf_state : BTreeMap < _ , _ > = services ( )
. rooms
. state_full_ids ( sstatehash )
. await
. map_err ( | _ | "Failed to ask db for room state." . to_owned ( ) ) ? ;
if let Some ( state_key ) = & prev_event . state_key {
let shortstatekey = db
let shortstatekey = services ( )
. rooms
. get_or_create_shortstatekey (
& prev_event . kind . to_string ( ) . into ( ) ,
state_key ,
& db . globals ,
)
. map_err ( | _ | "Failed to create shortstatekey." . to_owned ( ) ) ? ;
leaf_state . insert ( shortstatekey , Arc ::from ( & * prev_event . event_id ) ) ;
@ -472,7 +521,7 @@ impl Service {
let mut starting_events = Vec ::with_capacity ( leaf_state . len ( ) ) ;
for ( k , id ) in leaf_state {
if let Ok ( ( ty , st_key ) ) = db . rooms . get_statekey_from_short ( k ) {
if let Ok ( ( ty , st_key ) ) = services ( ) . rooms . get_statekey_from_short ( k ) {
// FIXME: Undo .to_string().into() when StateMap
// is updated to use StateEventType
state . insert ( ( ty . to_string ( ) . into ( ) , st_key ) , id . clone ( ) ) ;
@ -483,7 +532,10 @@ impl Service {
}
auth_chain_sets . push (
get_auth_chain ( room_id , starting_events , db )
services ( )
. rooms
. auth_chain
. get_auth_chain ( room_id , starting_events , services ( ) )
. await
. map_err ( | _ | "Failed to load auth chain." . to_owned ( ) ) ?
. collect ( ) ,
@ -492,15 +544,16 @@ impl Service {
fork_states . push ( state ) ;
}
let lock = db . globals . stateres_mutex . lock ( ) ;
let lock = services ( ) . globals . stateres_mutex . lock ( ) ;
let result = state_res ::resolve ( room_version_id , & fork_states , auth_chain_sets , | id | {
let res = db . rooms . get_pdu ( id ) ;
if let Err ( e ) = & res {
error ! ( "LOOK AT ME Failed to fetch event: {}" , e ) ;
}
res . ok ( ) . flatten ( )
} ) ;
let result =
state_res ::resolve ( room_version_id , & fork_states , auth_chain_sets , | id | {
let res = services ( ) . rooms . get_pdu ( id ) ;
if let Err ( e ) = & res {
error ! ( "LOOK AT ME Failed to fetch event: {}" , e ) ;
}
res . ok ( ) . flatten ( )
} ) ;
drop ( lock ) ;
state_at_incoming_event = match result {
@ -508,14 +561,15 @@ impl Service {
new_state
. into_iter ( )
. map ( | ( ( event_type , state_key ) , event_id ) | {
let shortstatekey = db
let shortstatekey = services ( )
. rooms
. get_or_create_shortstatekey (
& event_type . to_string ( ) . into ( ) ,
& state_key ,
& db . globals ,
)
. map_err ( | _ | "Failed to get_or_create_shortstatekey" . to_owned ( ) ) ? ;
. map_err ( | _ | {
"Failed to get_or_create_shortstatekey" . to_owned ( )
} ) ? ;
Ok ( ( shortstatekey , event_id ) )
} )
. collect ::< Result < _ , String > > ( ) ? ,
@ -532,10 +586,9 @@ impl Service {
info ! ( "Calling /state_ids" ) ;
// Call /state_ids to find out what the state at this pdu is. We trust the server's
// response to some extend, but we still do a lot of checks on the events
match db
match services ( )
. sending
. send_federation_request (
& db . globals ,
origin ,
get_room_state_ids ::v1 ::Request {
room_id ,
@ -546,18 +599,18 @@ impl Service {
{
Ok ( res ) = > {
info ! ( "Fetching state events at event." ) ;
let state_vec = fetch_and_handle_outliers (
db ,
origin ,
& res . pdu_ids
. iter ( )
. map ( | x | Arc ::from ( & * * x ) )
. collect ::< Vec < _ > > ( ) ,
create_event ,
room_id ,
pub_key_map ,
)
. await ;
let state_vec = self
. fetch_and_handle_outliers (
origin ,
& res . pdu_ids
. iter ( )
. map ( | x | Arc ::from ( & * * x ) )
. collect ::< Vec < _ > > ( ) ,
create_event ,
room_id ,
pub_key_map ,
)
. await ;
let mut state : BTreeMap < _ , Arc < EventId > > = BTreeMap ::new ( ) ;
for ( pdu , _ ) in state_vec {
@ -566,13 +619,9 @@ impl Service {
. clone ( )
. ok_or_else ( | | "Found non-state pdu in state events." . to_owned ( ) ) ? ;
let shortstatekey = db
let shortstatekey = services ( )
. rooms
. get_or_create_shortstatekey (
& pdu . kind . to_string ( ) . into ( ) ,
& state_key ,
& db . globals ,
)
. get_or_create_shortstatekey ( & pdu . kind . to_string ( ) . into ( ) , & state_key )
. map_err ( | _ | "Failed to create shortstatekey." . to_owned ( ) ) ? ;
match state . entry ( shortstatekey ) {
@ -587,7 +636,7 @@ impl Service {
}
// The original create event must still be in the state
let create_shortstatekey = db
let create_shortstatekey = services ( )
. rooms
. get_shortstatekey ( & StateEventType ::RoomCreate , "" )
. map_err ( | _ | "Failed to talk to db." ) ?
@ -618,12 +667,13 @@ impl Service {
& incoming_pdu ,
None ::< PduEvent > , // TODO: third party invite
| k , s | {
db . rooms
services ( )
. rooms
. get_shortstatekey ( & k . to_string ( ) . into ( ) , s )
. ok ( )
. flatten ( )
. and_then ( | shortstatekey | state_at_incoming_event . get ( & shortstatekey ) )
. and_then ( | event_id | db . rooms . get_pdu ( event_id ) . ok ( ) . flatten ( ) )
. and_then ( | event_id | services ( ) . rooms . get_pdu ( event_id ) . ok ( ) . flatten ( ) )
} ,
)
. map_err ( | _e | "Auth check failed." . to_owned ( ) ) ? ;
@ -636,7 +686,8 @@ impl Service {
// We start looking at current room state now, so lets lock the room
let mutex_state = Arc ::clone (
db . globals
services ( )
. globals
. roomid_mutex_state
. write ( )
. unwrap ( )
@ -648,7 +699,7 @@ impl Service {
// Now we calculate the set of extremities this room has after the incoming event has been
// applied. We start with the previous extremities (aka leaves)
info ! ( "Calculating extremities" ) ;
let mut extremities = db
let mut extremities = services ( )
. rooms
. get_pdu_leaves ( room_id )
. map_err ( | _ | "Failed to load room leaves" . to_owned ( ) ) ? ;
@ -661,14 +712,16 @@ impl Service {
}
// Only keep those extremities were not referenced yet
extremities . retain ( | id | ! matches! ( db . rooms . is_event_referenced ( room_id , id ) , Ok ( true ) ) ) ;
extremities
. retain ( | id | ! matches! ( services ( ) . rooms . is_event_referenced ( room_id , id ) , Ok ( true ) ) ) ;
info ! ( "Compressing state at event" ) ;
let state_ids_compressed = state_at_incoming_event
. iter ( )
. map ( | ( shortstatekey , id ) | {
db . rooms
. compress_state_event ( * shortstatekey , id , & db . globals )
services ( )
. rooms
. compress_state_event ( * shortstatekey , id )
. map_err ( | _ | "Failed to compress_state_event" . to_owned ( ) )
} )
. collect ::< Result < _ , _ > > ( ) ? ;
@ -676,7 +729,7 @@ impl Service {
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
info ! ( "Starting soft fail auth check" ) ;
let auth_events = db
let auth_events = services ( )
. rooms
. get_auth_events (
room_id ,
@ -696,11 +749,10 @@ impl Service {
. map_err ( | _e | "Auth check failed." . to_owned ( ) ) ? ;
if soft_fail {
append_incoming_pdu (
db ,
self . append_incoming_pdu (
& incoming_pdu ,
val ,
extremities . iter ( ) . map ( Deref ::deref ) ,
extremities . iter ( ) . map ( std ::ops ::Deref ::deref ) ,
state_ids_compressed ,
soft_fail ,
& state_lock ,
@ -712,7 +764,8 @@ impl Service {
// Soft fail, we keep the event as an outlier but don't add it to the timeline
warn ! ( "Event was soft failed: {:?}" , incoming_pdu ) ;
db . rooms
services ( )
. rooms
. mark_event_soft_failed ( & incoming_pdu . event_id )
. map_err ( | _ | "Failed to set soft failed flag" . to_owned ( ) ) ? ;
return Err ( "Event has been soft failed" . into ( ) ) ;
@ -720,13 +773,13 @@ impl Service {
if incoming_pdu . state_key . is_some ( ) {
info ! ( "Loading current room state ids" ) ;
let current_sstatehash = db
let current_sstatehash = services ( )
. rooms
. current_shortstatehash ( room_id )
. map_err ( | _ | "Failed to load current state hash." . to_owned ( ) ) ?
. expect ( "every room has state" ) ;
let current_state_ids = db
let current_state_ids = services ( )
. rooms
. state_full_ids ( current_sstatehash )
. await
@ -737,14 +790,14 @@ impl Service {
info ! ( "Loading extremities" ) ;
for id in dbg! ( & extremities ) {
match db
match services ( )
. rooms
. get_pdu ( id )
. map_err ( | _ | "Failed to ask db for pdu." . to_owned ( ) ) ?
{
Some ( leaf_pdu ) = > {
extremity_sstatehashes . insert (
db . rooms
services ( )
. pdu_shortstatehash ( & leaf_pdu . event_id )
. map_err ( | _ | "Failed to ask db for pdu state hash." . to_owned ( ) ) ?
. ok_or_else ( | | {
@ -777,13 +830,9 @@ impl Service {
// We also add state after incoming event to the fork states
let mut state_after = state_at_incoming_event . clone ( ) ;
if let Some ( state_key ) = & incoming_pdu . state_key {
let shortstatekey = db
let shortstatekey = services ( )
. rooms
. get_or_create_shortstatekey (
& incoming_pdu . kind . to_string ( ) . into ( ) ,
state_key ,
& db . globals ,
)
. get_or_create_shortstatekey ( & incoming_pdu . kind . to_string ( ) . into ( ) , state_key )
. map_err ( | _ | "Failed to create shortstatekey." . to_owned ( ) ) ? ;
state_after . insert ( shortstatekey , Arc ::from ( & * incoming_pdu . event_id ) ) ;
@ -801,8 +850,9 @@ impl Service {
fork_states [ 0 ]
. iter ( )
. map ( | ( k , id ) | {
db . rooms
. compress_state_event ( * k , id , & db . globals )
services ( )
. rooms
. compress_state_event ( * k , id )
. map_err ( | _ | "Failed to compress_state_event." . to_owned ( ) )
} )
. collect ::< Result < _ , _ > > ( ) ?
@ -814,14 +864,16 @@ impl Service {
let mut auth_chain_sets = Vec ::new ( ) ;
for state in & fork_states {
auth_chain_sets . push (
get_auth_chain (
room_id ,
state . iter ( ) . map ( | ( _ , id ) | id . clone ( ) ) . collect ( ) ,
db ,
)
. await
. map_err ( | _ | "Failed to load auth chain." . to_owned ( ) ) ?
. collect ( ) ,
services ( )
. rooms
. auth_chain
. get_auth_chain (
room_id ,
state . iter ( ) . map ( | ( _ , id ) | id . clone ( ) ) . collect ( ) ,
)
. await
. map_err ( | _ | "Failed to load auth chain." . to_owned ( ) ) ?
. collect ( ) ,
) ;
}
@ -832,7 +884,8 @@ impl Service {
. map ( | map | {
map . into_iter ( )
. filter_map ( | ( k , id ) | {
db . rooms
services ( )
. rooms
. get_statekey_from_short ( k )
// FIXME: Undo .to_string().into() when StateMap
// is updated to use StateEventType
@ -846,13 +899,13 @@ impl Service {
info ! ( "Resolving state" ) ;
let lock = db . globals . stateres_mutex . lock ( ) ;
let lock = services ( ) . globals . stateres_mutex . lock ( ) ;
let state = match state_res ::resolve (
room_version_id ,
& fork_states ,
auth_chain_sets ,
| id | {
let res = db . rooms . get_pdu ( id ) ;
let res = services ( ) . rooms . get_pdu ( id ) ;
if let Err ( e ) = & res {
error ! ( "LOOK AT ME Failed to fetch event: {}" , e ) ;
}
@ -872,16 +925,13 @@ impl Service {
state
. into_iter ( )
. map ( | ( ( event_type , state_key ) , event_id ) | {
let shortstatekey = db
let shortstatekey = services ( )
. rooms
. get_or_create_shortstatekey (
& event_type . to_string ( ) . into ( ) ,
& state_key ,
& db . globals ,
)
. get_or_create_shortstatekey ( & event_type . to_string ( ) . into ( ) , & state_key )
. map_err ( | _ | "Failed to get_or_create_shortstatekey" . to_owned ( ) ) ? ;
db . rooms
. compress_state_event ( shortstatekey , & event_id , & db . globals )
services ( )
. rooms
. compress_state_event ( shortstatekey , & event_id )
. map_err ( | _ | "Failed to compress state event" . to_owned ( ) )
} )
. collect ::< Result < _ , _ > > ( ) ?
@ -890,8 +940,9 @@ impl Service {
// Set the new room state to the resolved state
if update_state {
info ! ( "Forcing new room state" ) ;
db . rooms
. force_state ( room_id , new_room_state , db )
services ( )
. rooms
. force_state ( room_id , new_room_state )
. map_err ( | _ | "Failed to set new room state." . to_owned ( ) ) ? ;
}
}
@ -903,19 +954,19 @@ impl Service {
// We use the `state_at_event` instead of `state_after` so we accurately
// represent the state for this event.
let pdu_id = append_incoming_pdu (
db ,
& incoming_pdu ,
val ,
extremities . iter ( ) . map ( Deref ::deref ) ,
state_ids_compressed ,
soft_fail ,
& state_lock ,
)
. map_err ( | e | {
warn ! ( "Failed to add pdu to db: {}" , e ) ;
"Failed to add pdu to db." . to_owned ( )
} ) ? ;
let pdu_id = self
. append_incoming_pdu (
& incoming_pdu ,
val ,
extremities . iter ( ) . map ( std ::ops ::Deref ::deref ) ,
state_ids_compressed ,
soft_fail ,
& state_lock ,
)
. map_err ( | e | {
warn ! ( "Failed to add pdu to db: {}" , e ) ;
"Failed to add pdu to db." . to_owned ( )
} ) ? ;
info ! ( "Appended incoming pdu" ) ;
@ -935,15 +986,22 @@ impl Service {
/// d. TODO: Ask other servers over federation?
#[ tracing::instrument(skip_all) ]
pub ( crate ) fn fetch_and_handle_outliers < ' a > (
db : & ' a Database ,
& self ,
origin : & ' a ServerName ,
events : & ' a [ Arc < EventId > ] ,
create_event : & ' a PduEvent ,
room_id : & ' a RoomId ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap < String , Base64 > > > ,
) -> AsyncRecursiveType < ' a , Vec < ( Arc < PduEvent > , Option < BTreeMap < String , CanonicalJsonValue > > ) > > {
) -> AsyncRecursiveType < ' a , Vec < ( Arc < PduEvent > , Option < BTreeMap < String , CanonicalJsonValue > > ) > >
{
Box ::pin ( async move {
let back_off = | id | match db . globals . bad_event_ratelimiter . write ( ) . unwrap ( ) . entry ( id ) {
let back_off = | id | match services ( )
. globals
. bad_event_ratelimiter
. write ( )
. unwrap ( )
. entry ( id )
{
hash_map ::Entry ::Vacant ( e ) = > {
e . insert ( ( Instant ::now ( ) , 1 ) ) ;
}
@ -952,10 +1010,16 @@ impl Service {
let mut pdus = vec! [ ] ;
for id in events {
if let Some ( ( time , tries ) ) = db . globals . bad_event_ratelimiter . read ( ) . unwrap ( ) . get ( & * * id )
if let Some ( ( time , tries ) ) = services ( )
. globals
. bad_event_ratelimiter
. read ( )
. unwrap ( )
. get ( & * * id )
{
// Exponential backoff
let mut min_elapsed_duration = Duration ::from_secs ( 5 * 60 ) * ( * tries ) * ( * tries ) ;
let mut min_elapsed_duration =
Duration ::from_secs ( 5 * 60 ) * ( * tries ) * ( * tries ) ;
if min_elapsed_duration > Duration ::from_secs ( 60 * 60 * 24 ) {
min_elapsed_duration = Duration ::from_secs ( 60 * 60 * 24 ) ;
}
@ -969,7 +1033,7 @@ impl Service {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok ( Some ( local_pdu ) ) = db . rooms . get_pdu ( id ) {
if let Ok ( Some ( local_pdu ) ) = services ( ) . rooms . get_pdu ( id ) {
trace ! ( "Found {} in db" , id ) ;
pdus . push ( ( local_pdu , None ) ) ;
continue ;
@ -992,16 +1056,15 @@ impl Service {
tokio ::task ::yield_now ( ) . await ;
}
if let Ok ( Some ( _ ) ) = db . rooms . get_pdu ( & next_id ) {
if let Ok ( Some ( _ ) ) = services ( ) . rooms . get_pdu ( & next_id ) {
trace ! ( "Found {} in db" , id ) ;
continue ;
}
info ! ( "Fetching {} over federation." , next_id ) ;
match db
match services ( )
. sending
. send_federation_request (
& db . globals ,
origin ,
get_event ::v1 ::Request { event_id : & next_id } ,
)
@ -1010,7 +1073,7 @@ impl Service {
Ok ( res ) = > {
info ! ( "Got {} over federation" , next_id ) ;
let ( calculated_event_id , value ) =
match crate ::pdu ::gen_event_id_canonical_json ( & res . pdu , & db ) {
match pdu ::gen_event_id_canonical_json ( & res . pdu ) {
Ok ( t ) = > t ,
Err ( _ ) = > {
back_off ( ( * next_id ) . to_owned ( ) ) ;
@ -1051,16 +1114,16 @@ impl Service {
}
for ( next_id , value ) in events_in_reverse_order . iter ( ) . rev ( ) {
match handle_outlier_pdu (
origin ,
create_event ,
next_id ,
room _id,
value . clone ( ) ,
db ,
pub_key_map ,
)
. await
match self
. handle_outlier_pdu (
origin ,
create_event ,
next _id ,
room_id ,
value . clone ( ) ,
pub_key_map ,
)
. await
{
Ok ( ( pdu , json ) ) = > {
if next_id = = id {
@ -1078,9 +1141,14 @@ impl Service {
} )
}
fn fetch_unknown_prev_events ( initial_set : Vec < Arc < EventId > > ) -> Vec < Arc < EventId > > {
async fn fetch_unknown_prev_events (
& self ,
origin : & ServerName ,
create_event : & PduEvent ,
room_id : & RoomId ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap < String , Base64 > > > ,
initial_set : Vec < Arc < EventId > > ,
) -> Vec < ( Arc < EventId > , HashMap < Arc < EventId > , ( Arc < PduEvent > , BTreeMap < String , CanonicalJsonValue > ) > ) > {
let mut graph : HashMap < Arc < EventId > , _ > = HashMap ::new ( ) ;
let mut eventid_info = HashMap ::new ( ) ;
let mut todo_outlier_stack : Vec < Arc < EventId > > = initial_set ;
@ -1088,16 +1156,16 @@ impl Service {
let mut amount = 0 ;
while let Some ( prev_event_id ) = todo_outlier_stack . pop ( ) {
if let Some ( ( pdu , json_opt ) ) = fetch_and_handle_outliers (
db ,
origin ,
& [ prev_event_id . clone ( ) ] ,
& create_event ,
room_id ,
pub_key_map ,
)
. await
. pop ( )
if let Some ( ( pdu , json_opt ) ) = self
. fetch_and_handle_outliers (
origin ,
& [ prev_event_id . clone ( ) ] ,
& create_event ,
room_id ,
pub_key_map ,
)
. await
. pop ( )
{
if amount > 100 {
// Max limit reached
@ -1106,9 +1174,13 @@ impl Service {
continue ;
}
if let Some ( json ) =
json_opt . or_else ( | | db . rooms . get_outlier_pdu_json ( & prev_event_id ) . ok ( ) . flatten ( ) )
{
if let Some ( json ) = json_opt . or_else ( | | {
services ( )
. rooms
. get_outlier_pdu_json ( & prev_event_id )
. ok ( )
. flatten ( )
} ) {
if pdu . origin_server_ts > first_pdu_in_room . origin_server_ts {
amount + = 1 ;
for prev_prev in & pdu . prev_events {
@ -1153,6 +1225,6 @@ impl Service {
} )
. map_err ( | _ | "Error sorting prev events" . to_owned ( ) ) ? ;
sorted
( sorted , eventid_info )
}
}