|
|
|
|
@ -2,7 +2,10 @@
|
|
|
|
|
|
|
|
|
|
use crate::{ |
|
|
|
|
api::client_server::{self, claim_keys_helper, get_keys_helper}, |
|
|
|
|
service::pdu::{gen_event_id_canonical_json, PduBuilder}, |
|
|
|
|
service::{ |
|
|
|
|
globals::SigningKeys, |
|
|
|
|
pdu::{gen_event_id_canonical_json, PduBuilder}, |
|
|
|
|
}, |
|
|
|
|
services, utils, Error, PduEvent, Result, Ruma, |
|
|
|
|
}; |
|
|
|
|
use axum::{response::IntoResponse, Json}; |
|
|
|
|
@ -807,17 +810,78 @@ pub fn parse_incoming_pdu(
|
|
|
|
|
|
|
|
|
|
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(_) => { |
|
|
|
|
Err(e) => { |
|
|
|
|
// Event could not be converted to canonical json
|
|
|
|
|
return Err(Error::BadRequest( |
|
|
|
|
ErrorKind::InvalidParam, |
|
|
|
|
"Could not convert event to canonical json.", |
|
|
|
|
)); |
|
|
|
|
return Err(e); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
Ok((event_id, value, room_id)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Attempts to parse and append PDU to timeline.
|
|
|
|
|
/// If no event ID is returned, then the PDU was failed to be parsed.
|
|
|
|
|
/// If the Ok(()) is returned, then the PDU was successfully appended to the timeline.
|
|
|
|
|
async fn handle_pdu_in_transaction( |
|
|
|
|
origin: &ServerName, |
|
|
|
|
pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>, |
|
|
|
|
pdu: &RawJsonValue, |
|
|
|
|
) -> (Option<OwnedEventId>, Result<()>) { |
|
|
|
|
let (event_id, value, room_id) = match parse_incoming_pdu(pdu) { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Could not parse PDU: {e}"); |
|
|
|
|
warn!("Full PDU: {:?}", &pdu); |
|
|
|
|
return (None, Err(Error::BadServerResponse("Could not parse PDU"))); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Makes use of the m.room.create event. If we cannot fetch this event,
|
|
|
|
|
// we must have never been in that room.
|
|
|
|
|
if services().rooms.state.get_room_version(&room_id).is_err() { |
|
|
|
|
debug!("Room {room_id} is not known to this server"); |
|
|
|
|
return ( |
|
|
|
|
Some(event_id), |
|
|
|
|
Err(Error::BadServerResponse("Room is not known to this server")), |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
|
|
|
|
|
|
|
|
let mutex = Arc::clone( |
|
|
|
|
services() |
|
|
|
|
.globals |
|
|
|
|
.roomid_mutex_federation |
|
|
|
|
.write() |
|
|
|
|
.await |
|
|
|
|
.entry(room_id.to_owned()) |
|
|
|
|
.or_default(), |
|
|
|
|
); |
|
|
|
|
let mutex_lock = mutex.lock().await; |
|
|
|
|
let start_time = Instant::now(); |
|
|
|
|
|
|
|
|
|
if let Err(e) = services() |
|
|
|
|
.rooms |
|
|
|
|
.event_handler |
|
|
|
|
.handle_incoming_pdu(origin, &event_id, &room_id, value, true, pub_key_map) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
warn!("Error appending PDU to timeline: {}: {:?}", e, pdu); |
|
|
|
|
return (Some(event_id), Err(e)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
drop(mutex_lock); |
|
|
|
|
|
|
|
|
|
let elapsed = start_time.elapsed(); |
|
|
|
|
debug!( |
|
|
|
|
"Handling transaction of event {} took {}m{}s", |
|
|
|
|
event_id, |
|
|
|
|
elapsed.as_secs() / 60, |
|
|
|
|
elapsed.as_secs() % 60 |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
(Some(event_id), Ok(())) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
|
|
|
|
///
|
|
|
|
|
/// Push EDUs and PDUs to this server.
|
|
|
|
|
@ -842,77 +906,11 @@ pub async fn send_transaction_message_route(
|
|
|
|
|
// let mut auth_cache = EventMap::new();
|
|
|
|
|
|
|
|
|
|
for pdu in &body.pdus { |
|
|
|
|
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { |
|
|
|
|
warn!("Error parsing incoming event {:?}: {:?}", pdu, e); |
|
|
|
|
Error::BadServerResponse("Invalid PDU in server response") |
|
|
|
|
})?; |
|
|
|
|
let room_id: OwnedRoomId = value |
|
|
|
|
.get("room_id") |
|
|
|
|
.and_then(|id| RoomId::parse(id.as_str()?).ok()) |
|
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
|
ErrorKind::InvalidParam, |
|
|
|
|
"Invalid room id in pdu", |
|
|
|
|
))?; |
|
|
|
|
let (event_id, result) = |
|
|
|
|
handle_pdu_in_transaction(sender_servername, &pub_key_map, pdu).await; |
|
|
|
|
|
|
|
|
|
if services().rooms.state.get_room_version(&room_id).is_err() { |
|
|
|
|
debug!("Server is not in room {room_id}"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let r = parse_incoming_pdu(pdu); |
|
|
|
|
let (event_id, value, room_id) = match r { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Could not parse PDU: {e}"); |
|
|
|
|
warn!("Full PDU: {:?}", &pdu); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
|
|
|
|
|
|
|
|
let mutex = Arc::clone( |
|
|
|
|
services() |
|
|
|
|
.globals |
|
|
|
|
.roomid_mutex_federation |
|
|
|
|
.write() |
|
|
|
|
.await |
|
|
|
|
.entry(room_id.to_owned()) |
|
|
|
|
.or_default(), |
|
|
|
|
); |
|
|
|
|
let mutex_lock = mutex.lock().await; |
|
|
|
|
let start_time = Instant::now(); |
|
|
|
|
resolved_map.insert( |
|
|
|
|
event_id.clone(), |
|
|
|
|
services() |
|
|
|
|
.rooms |
|
|
|
|
.event_handler |
|
|
|
|
.handle_incoming_pdu( |
|
|
|
|
sender_servername, |
|
|
|
|
&event_id, |
|
|
|
|
&room_id, |
|
|
|
|
value, |
|
|
|
|
true, |
|
|
|
|
&pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
.map(|_| ()), |
|
|
|
|
); |
|
|
|
|
drop(mutex_lock); |
|
|
|
|
|
|
|
|
|
let elapsed = start_time.elapsed(); |
|
|
|
|
debug!( |
|
|
|
|
"Handling transaction of event {} took {}m{}s", |
|
|
|
|
event_id, |
|
|
|
|
elapsed.as_secs() / 60, |
|
|
|
|
elapsed.as_secs() % 60 |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for pdu in &resolved_map { |
|
|
|
|
if let Err(e) = pdu.1 { |
|
|
|
|
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { |
|
|
|
|
warn!("Incoming PDU failed {:?}", pdu); |
|
|
|
|
} |
|
|
|
|
if let Some(event_id) = event_id { |
|
|
|
|
resolved_map.insert(event_id.clone(), result.map_err(|e| e.sanitized_error())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -1081,12 +1079,7 @@ pub async fn send_transaction_message_route(
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Ok(send_transaction_message::v1::Response { |
|
|
|
|
pdus: resolved_map |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))) |
|
|
|
|
.collect(), |
|
|
|
|
}) |
|
|
|
|
Ok(send_transaction_message::v1::Response { pdus: resolved_map }) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// # `GET /_matrix/federation/v1/event/{eventId}`
|
|
|
|
|
@ -1829,7 +1822,7 @@ pub async fn create_invite_route(
|
|
|
|
|
let event_id = EventId::parse(format!( |
|
|
|
|
"${}", |
|
|
|
|
ruma::signatures::reference_hash(&signed_event, &body.room_version) |
|
|
|
|
.expect("ruma can calculate reference hashes") |
|
|
|
|
.expect("Event format validated when event was hashed") |
|
|
|
|
)) |
|
|
|
|
.expect("ruma's reference hashes are valid event ids"); |
|
|
|
|
|
|
|
|
|
|