|
|
|
|
@ -12,6 +12,7 @@ use ruma::{
|
|
|
|
|
client::error::{Error as RumaError, ErrorKind}, |
|
|
|
|
federation::{ |
|
|
|
|
authorization::get_event_authorization, |
|
|
|
|
backfill::get_backfill, |
|
|
|
|
device::get_devices::{self, v1::UserDevice}, |
|
|
|
|
directory::{get_public_rooms, get_public_rooms_filtered}, |
|
|
|
|
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, |
|
|
|
|
@ -42,8 +43,9 @@ use ruma::{
|
|
|
|
|
}, |
|
|
|
|
serde::{Base64, JsonObject, Raw}, |
|
|
|
|
to_device::DeviceIdOrAllDevices, |
|
|
|
|
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, |
|
|
|
|
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, |
|
|
|
|
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, |
|
|
|
|
OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, |
|
|
|
|
ServerName, |
|
|
|
|
}; |
|
|
|
|
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; |
|
|
|
|
use std::{ |
|
|
|
|
@ -123,6 +125,8 @@ where
|
|
|
|
|
return Err(Error::bad_config("Federation is disabled.")); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
debug!("Preparing to send request to {destination}"); |
|
|
|
|
|
|
|
|
|
let mut write_destination_to_cache = false; |
|
|
|
|
|
|
|
|
|
let cached_result = services() |
|
|
|
|
@ -229,11 +233,13 @@ where
|
|
|
|
|
|
|
|
|
|
let url = reqwest_request.url().clone(); |
|
|
|
|
|
|
|
|
|
debug!("Sending request to {destination} at {url}"); |
|
|
|
|
let response = services() |
|
|
|
|
.globals |
|
|
|
|
.federation_client() |
|
|
|
|
.execute(reqwest_request) |
|
|
|
|
.await; |
|
|
|
|
debug!("Received response from {destination} at {url}"); |
|
|
|
|
|
|
|
|
|
match response { |
|
|
|
|
Ok(mut response) => { |
|
|
|
|
@ -249,10 +255,12 @@ where
|
|
|
|
|
.expect("http::response::Builder is usable"), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
debug!("Getting response bytes from {destination}"); |
|
|
|
|
let body = response.bytes().await.unwrap_or_else(|e| { |
|
|
|
|
warn!("server error {}", e); |
|
|
|
|
Vec::new().into() |
|
|
|
|
}); // TODO: handle timeout
|
|
|
|
|
debug!("Got response bytes from {destination}"); |
|
|
|
|
|
|
|
|
|
if status != 200 { |
|
|
|
|
warn!( |
|
|
|
|
@ -271,6 +279,7 @@ where
|
|
|
|
|
.expect("reqwest body is valid http body"); |
|
|
|
|
|
|
|
|
|
if status == 200 { |
|
|
|
|
debug!("Parsing response bytes from {destination}"); |
|
|
|
|
let response = T::IncomingResponse::try_from_http_response(http_response); |
|
|
|
|
if response.is_ok() && write_destination_to_cache { |
|
|
|
|
services() |
|
|
|
|
@ -292,6 +301,7 @@ where
|
|
|
|
|
Error::BadServerResponse("Server returned bad 200 response.") |
|
|
|
|
}) |
|
|
|
|
} else { |
|
|
|
|
debug!("Returning error from {destination}"); |
|
|
|
|
Err(Error::FederationError( |
|
|
|
|
destination.to_owned(), |
|
|
|
|
RumaError::from_http_response(http_response), |
|
|
|
|
@ -330,36 +340,38 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest {
|
|
|
|
|
/// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names
|
|
|
|
|
/// Numbers in comments below refer to bullet points in linked section of specification
|
|
|
|
|
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) { |
|
|
|
|
debug!("Finding actual destination for {destination}"); |
|
|
|
|
let destination_str = destination.as_str().to_owned(); |
|
|
|
|
let mut hostname = destination_str.clone(); |
|
|
|
|
let actual_destination = match get_ip_with_port(&destination_str) { |
|
|
|
|
Some(host_port) => { |
|
|
|
|
// 1: IP literal with provided or default port
|
|
|
|
|
debug!("1: IP literal with provided or default port"); |
|
|
|
|
host_port |
|
|
|
|
} |
|
|
|
|
None => { |
|
|
|
|
if let Some(pos) = destination_str.find(':') { |
|
|
|
|
// 2: Hostname with included port
|
|
|
|
|
debug!("2: Hostname with included port"); |
|
|
|
|
let (host, port) = destination_str.split_at(pos); |
|
|
|
|
FedDest::Named(host.to_owned(), port.to_owned()) |
|
|
|
|
} else { |
|
|
|
|
debug!("Requesting well known for {destination}"); |
|
|
|
|
match request_well_known(destination.as_str()).await { |
|
|
|
|
// 3: A .well-known file is available
|
|
|
|
|
Some(delegated_hostname) => { |
|
|
|
|
debug!("3: A .well-known file is available"); |
|
|
|
|
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string(); |
|
|
|
|
match get_ip_with_port(&delegated_hostname) { |
|
|
|
|
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
|
|
|
|
|
None => { |
|
|
|
|
if let Some(pos) = delegated_hostname.find(':') { |
|
|
|
|
// 3.2: Hostname with port in .well-known file
|
|
|
|
|
debug!("3.2: Hostname with port in .well-known file"); |
|
|
|
|
let (host, port) = delegated_hostname.split_at(pos); |
|
|
|
|
FedDest::Named(host.to_owned(), port.to_owned()) |
|
|
|
|
} else { |
|
|
|
|
// Delegated hostname has no port in this branch
|
|
|
|
|
debug!("Delegated hostname has no port in this branch"); |
|
|
|
|
if let Some(hostname_override) = |
|
|
|
|
query_srv_record(&delegated_hostname).await |
|
|
|
|
{ |
|
|
|
|
// 3.3: SRV lookup successful
|
|
|
|
|
debug!("3.3: SRV lookup successful"); |
|
|
|
|
let force_port = hostname_override.port(); |
|
|
|
|
|
|
|
|
|
if let Ok(override_ip) = services() |
|
|
|
|
@ -390,18 +402,18 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
|
|
|
|
|
add_port_to_hostname(&delegated_hostname) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// 3.4: No SRV records, just use the hostname from .well-known
|
|
|
|
|
debug!("3.4: No SRV records, just use the hostname from .well-known"); |
|
|
|
|
add_port_to_hostname(&delegated_hostname) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// 4: No .well-known or an error occured
|
|
|
|
|
None => { |
|
|
|
|
debug!("4: No .well-known or an error occured"); |
|
|
|
|
match query_srv_record(&destination_str).await { |
|
|
|
|
// 4: SRV record found
|
|
|
|
|
Some(hostname_override) => { |
|
|
|
|
debug!("4: SRV record found"); |
|
|
|
|
let force_port = hostname_override.port(); |
|
|
|
|
|
|
|
|
|
if let Ok(override_ip) = services() |
|
|
|
|
@ -432,14 +444,17 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
|
|
|
|
|
add_port_to_hostname(&hostname) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// 5: No SRV record found
|
|
|
|
|
None => add_port_to_hostname(&destination_str), |
|
|
|
|
None => { |
|
|
|
|
debug!("5: No SRV record found"); |
|
|
|
|
add_port_to_hostname(&destination_str) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
debug!("Actual destination: {actual_destination:?}"); |
|
|
|
|
|
|
|
|
|
// Can't use get_ip_with_port here because we don't want to add a port
|
|
|
|
|
// to an IP address if it wasn't specified
|
|
|
|
|
@ -457,10 +472,11 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> { |
|
|
|
|
let hostname = hostname.trim_end_matches('.'); |
|
|
|
|
if let Ok(Some(host_port)) = services() |
|
|
|
|
.globals |
|
|
|
|
.dns_resolver() |
|
|
|
|
.srv_lookup(format!("_matrix._tcp.{hostname}")) |
|
|
|
|
.srv_lookup(format!("_matrix._tcp.{hostname}.")) |
|
|
|
|
.await |
|
|
|
|
.map(|srv| { |
|
|
|
|
srv.iter().next().map(|result| { |
|
|
|
|
@ -478,19 +494,16 @@ async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn request_well_known(destination: &str) -> Option<String> { |
|
|
|
|
let body: serde_json::Value = serde_json::from_str( |
|
|
|
|
&services() |
|
|
|
|
.globals |
|
|
|
|
.default_client() |
|
|
|
|
.get(&format!("https://{destination}/.well-known/matrix/server")) |
|
|
|
|
.send() |
|
|
|
|
.await |
|
|
|
|
.ok()? |
|
|
|
|
.text() |
|
|
|
|
.await |
|
|
|
|
.ok()?, |
|
|
|
|
) |
|
|
|
|
.ok()?; |
|
|
|
|
let response = services() |
|
|
|
|
.globals |
|
|
|
|
.default_client() |
|
|
|
|
.get(&format!("https://{destination}/.well-known/matrix/server")) |
|
|
|
|
.send() |
|
|
|
|
.await; |
|
|
|
|
debug!("Got well known response"); |
|
|
|
|
let text = response.ok()?.text().await; |
|
|
|
|
debug!("Got well known response text"); |
|
|
|
|
let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?; |
|
|
|
|
Some(body.get("m.server")?.as_str()?.to_owned()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -627,6 +640,37 @@ pub async fn get_public_rooms_route(
|
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn parse_incoming_pdu( |
|
|
|
|
pdu: &RawJsonValue, |
|
|
|
|
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> { |
|
|
|
|
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { |
|
|
|
|
warn!("Error parsing incoming event {:?}: {:?}", pdu, e); |
|
|
|
|
Error::BadServerResponse("Invalid PDU in server response") |
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
let room_id: OwnedRoomId = value |
|
|
|
|
.get("room_id") |
|
|
|
|
.and_then(|id| RoomId::parse(id.as_str()?).ok()) |
|
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
|
ErrorKind::InvalidParam, |
|
|
|
|
"Invalid room id in pdu", |
|
|
|
|
))?; |
|
|
|
|
|
|
|
|
|
let room_version_id = services().rooms.state.get_room_version(&room_id)?; |
|
|
|
|
|
|
|
|
|
let (event_id, value) = match gen_event_id_canonical_json(&pdu, &room_version_id) { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(_) => { |
|
|
|
|
// Event could not be converted to canonical json
|
|
|
|
|
return Err(Error::BadRequest( |
|
|
|
|
ErrorKind::InvalidParam, |
|
|
|
|
"Could not convert event to canonical json.", |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
Ok((event_id, value, room_id)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
|
|
|
|
///
|
|
|
|
|
/// Push EDUs and PDUs to this server.
|
|
|
|
|
@ -655,33 +699,11 @@ pub async fn send_transaction_message_route(
|
|
|
|
|
// let mut auth_cache = EventMap::new();
|
|
|
|
|
|
|
|
|
|
for pdu in &body.pdus { |
|
|
|
|
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { |
|
|
|
|
warn!("Error parsing incoming event {:?}: {:?}", pdu, e); |
|
|
|
|
Error::BadServerResponse("Invalid PDU in server response") |
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
let room_id: OwnedRoomId = match value |
|
|
|
|
.get("room_id") |
|
|
|
|
.and_then(|id| RoomId::parse(id.as_str()?).ok()) |
|
|
|
|
{ |
|
|
|
|
Some(id) => id, |
|
|
|
|
None => { |
|
|
|
|
// Event is invalid
|
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let room_version_id = match services().rooms.state.get_room_version(&room_id) { |
|
|
|
|
Ok(v) => v, |
|
|
|
|
Err(_) => { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { |
|
|
|
|
let r = parse_incoming_pdu(&pdu); |
|
|
|
|
let (event_id, value, room_id) = match r { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(_) => { |
|
|
|
|
// Event could not be converted to canonical json
|
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Could not parse pdu: {e}"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
@ -943,6 +965,17 @@ pub async fn get_event_route(
|
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !services().rooms.state_accessor.server_can_see_event( |
|
|
|
|
sender_servername, |
|
|
|
|
&room_id, |
|
|
|
|
&body.event_id, |
|
|
|
|
)? { |
|
|
|
|
return Err(Error::BadRequest( |
|
|
|
|
ErrorKind::Forbidden, |
|
|
|
|
"Server is not allowed to see event.", |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Ok(get_event::v1::Response { |
|
|
|
|
origin: services().globals.server_name().to_owned(), |
|
|
|
|
origin_server_ts: MilliSecondsSinceUnixEpoch::now(), |
|
|
|
|
@ -950,6 +983,83 @@ pub async fn get_event_route(
|
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
|
|
|
|
|
///
|
|
|
|
|
/// Retrieves events from before the sender joined the room, if the room's
|
|
|
|
|
/// history visibility allows.
|
|
|
|
|
pub async fn get_backfill_route( |
|
|
|
|
body: Ruma<get_backfill::v1::Request>, |
|
|
|
|
) -> Result<get_backfill::v1::Response> { |
|
|
|
|
if !services().globals.allow_federation() { |
|
|
|
|
return Err(Error::bad_config("Federation is disabled.")); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let sender_servername = body |
|
|
|
|
.sender_servername |
|
|
|
|
.as_ref() |
|
|
|
|
.expect("server is authenticated"); |
|
|
|
|
|
|
|
|
|
info!("Got backfill request from: {}", sender_servername); |
|
|
|
|
|
|
|
|
|
if !services() |
|
|
|
|
.rooms |
|
|
|
|
.state_cache |
|
|
|
|
.server_in_room(sender_servername, &body.room_id)? |
|
|
|
|
{ |
|
|
|
|
return Err(Error::BadRequest( |
|
|
|
|
ErrorKind::Forbidden, |
|
|
|
|
"Server is not in room.", |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
services() |
|
|
|
|
.rooms |
|
|
|
|
.event_handler |
|
|
|
|
.acl_check(sender_servername, &body.room_id)?; |
|
|
|
|
|
|
|
|
|
let until = body |
|
|
|
|
.v |
|
|
|
|
.iter() |
|
|
|
|
.map(|eventid| services().rooms.timeline.get_pdu_count(eventid)) |
|
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
|
.max() |
|
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
|
ErrorKind::InvalidParam, |
|
|
|
|
"No known eventid in v", |
|
|
|
|
))?; |
|
|
|
|
|
|
|
|
|
let limit = body.limit.min(uint!(100)); |
|
|
|
|
|
|
|
|
|
let all_events = services() |
|
|
|
|
.rooms |
|
|
|
|
.timeline |
|
|
|
|
.pdus_until(&user_id!("@doesntmatter:conduit.rs"), &body.room_id, until)? |
|
|
|
|
.take(limit.try_into().unwrap()); |
|
|
|
|
|
|
|
|
|
let events = all_events |
|
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
|
.filter(|(_, e)| { |
|
|
|
|
matches!( |
|
|
|
|
services().rooms.state_accessor.server_can_see_event( |
|
|
|
|
sender_servername, |
|
|
|
|
&e.room_id, |
|
|
|
|
&e.event_id, |
|
|
|
|
), |
|
|
|
|
Ok(true), |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
.map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id)) |
|
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
|
.map(|pdu| PduEvent::convert_to_outgoing_federation_event(pdu)) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
Ok(get_backfill::v1::Response { |
|
|
|
|
origin: services().globals.server_name().to_owned(), |
|
|
|
|
origin_server_ts: MilliSecondsSinceUnixEpoch::now(), |
|
|
|
|
pdus: events, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
|
|
|
|
|
///
|
|
|
|
|
/// Retrieves events that the sender is missing.
|
|
|
|
|
@ -1010,6 +1120,16 @@ pub async fn get_missing_events_route(
|
|
|
|
|
i += 1; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !services().rooms.state_accessor.server_can_see_event( |
|
|
|
|
sender_servername, |
|
|
|
|
&body.room_id, |
|
|
|
|
&queued_events[i], |
|
|
|
|
)? { |
|
|
|
|
i += 1; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
queued_events.extend_from_slice( |
|
|
|
|
&serde_json::from_value::<Vec<OwnedEventId>>( |
|
|
|
|
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { |
|
|
|
|
|