|
|
|
|
@ -3,9 +3,9 @@ use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
pub use data::Data; |
|
|
|
|
use ruma::{ |
|
|
|
|
api::client::relations::get_relating_events, |
|
|
|
|
api::{client::relations::get_relating_events, Direction}, |
|
|
|
|
events::{relation::RelationType, TimelineEventType}, |
|
|
|
|
EventId, RoomId, UserId, |
|
|
|
|
EventId, RoomId, UInt, UserId, |
|
|
|
|
}; |
|
|
|
|
use serde::Deserialize; |
|
|
|
|
|
|
|
|
|
@ -48,37 +48,57 @@ impl Service {
|
|
|
|
|
target: &EventId, |
|
|
|
|
filter_event_type: Option<TimelineEventType>, |
|
|
|
|
filter_rel_type: Option<RelationType>, |
|
|
|
|
from: PduCount, |
|
|
|
|
to: Option<PduCount>, |
|
|
|
|
limit: usize, |
|
|
|
|
from: Option<String>, |
|
|
|
|
to: Option<String>, |
|
|
|
|
limit: Option<UInt>, |
|
|
|
|
recurse: bool, |
|
|
|
|
dir: &Direction, |
|
|
|
|
) -> Result<get_relating_events::v1::Response> { |
|
|
|
|
let from = match from { |
|
|
|
|
Some(from) => PduCount::try_from_string(&from)?, |
|
|
|
|
None => match dir { |
|
|
|
|
Direction::Forward => PduCount::min(), |
|
|
|
|
Direction::Backward => PduCount::max(), |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let to = to.as_ref().and_then(|t| PduCount::try_from_string(t).ok()); |
|
|
|
|
|
|
|
|
|
// Use limit or else 10, with maximum 100
|
|
|
|
|
let limit = limit |
|
|
|
|
.and_then(|u| u32::try_from(u).ok()) |
|
|
|
|
.map_or(10_usize, |u| u as usize) |
|
|
|
|
.min(100); |
|
|
|
|
|
|
|
|
|
let next_token; |
|
|
|
|
|
|
|
|
|
//TODO: Fix ruma: match body.dir {
|
|
|
|
|
match ruma::api::Direction::Backward { |
|
|
|
|
ruma::api::Direction::Forward => { |
|
|
|
|
let events_after: Vec<_> = services() |
|
|
|
|
.rooms |
|
|
|
|
.pdu_metadata |
|
|
|
|
.relations_until(sender_user, room_id, target, from)? // TODO: should be relations_after
|
|
|
|
|
.filter(|r| { |
|
|
|
|
r.as_ref().map_or(true, |(_, pdu)| { |
|
|
|
|
filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) |
|
|
|
|
&& if let Ok(content) = |
|
|
|
|
serde_json::from_str::<ExtractRelatesToEventId>( |
|
|
|
|
pdu.content.get(), |
|
|
|
|
) |
|
|
|
|
{ |
|
|
|
|
filter_rel_type |
|
|
|
|
.as_ref() |
|
|
|
|
.map_or(true, |r| &content.relates_to.rel_type == r) |
|
|
|
|
} else { |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
// Spec (v1.10) recommends depth of at least 3
|
|
|
|
|
let depth: u8 = if recurse { 3 } else { 1 }; |
|
|
|
|
|
|
|
|
|
match dir { |
|
|
|
|
Direction::Forward => { |
|
|
|
|
let relations_until = &services().rooms.pdu_metadata.relations_until( |
|
|
|
|
sender_user, |
|
|
|
|
room_id, |
|
|
|
|
target, |
|
|
|
|
from, |
|
|
|
|
depth, |
|
|
|
|
)?; |
|
|
|
|
let events_after: Vec<_> = relations_until // TODO: should be relations_after
|
|
|
|
|
.iter() |
|
|
|
|
.filter(|(_, pdu)| { |
|
|
|
|
filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) |
|
|
|
|
&& if let Ok(content) = |
|
|
|
|
serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) |
|
|
|
|
{ |
|
|
|
|
filter_rel_type |
|
|
|
|
.as_ref() |
|
|
|
|
.map_or(true, |r| &content.relates_to.rel_type == r) |
|
|
|
|
} else { |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.take(limit) |
|
|
|
|
.filter_map(|r| r.ok()) // Filter out buggy events
|
|
|
|
|
.filter(|(_, pdu)| { |
|
|
|
|
services() |
|
|
|
|
.rooms |
|
|
|
|
@ -86,7 +106,7 @@ impl Service {
|
|
|
|
|
.user_can_see_event(sender_user, room_id, &pdu.event_id) |
|
|
|
|
.unwrap_or(false) |
|
|
|
|
}) |
|
|
|
|
.take_while(|&(k, _)| Some(k) != to) // Stop at `to`
|
|
|
|
|
.take_while(|(k, _)| Some(k) != to.as_ref()) // Stop at `to`
|
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
next_token = events_after.last().map(|(count, _)| count).copied(); |
|
|
|
|
@ -101,31 +121,32 @@ impl Service {
|
|
|
|
|
chunk: events_after, |
|
|
|
|
next_batch: next_token.map(|t| t.stringify()), |
|
|
|
|
prev_batch: Some(from.stringify()), |
|
|
|
|
recursion_depth: if recurse { Some(depth.into()) } else { None }, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
ruma::api::Direction::Backward => { |
|
|
|
|
let events_before: Vec<_> = services() |
|
|
|
|
.rooms |
|
|
|
|
.pdu_metadata |
|
|
|
|
.relations_until(sender_user, room_id, target, from)? |
|
|
|
|
.filter(|r| { |
|
|
|
|
r.as_ref().map_or(true, |(_, pdu)| { |
|
|
|
|
filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) |
|
|
|
|
&& if let Ok(content) = |
|
|
|
|
serde_json::from_str::<ExtractRelatesToEventId>( |
|
|
|
|
pdu.content.get(), |
|
|
|
|
) |
|
|
|
|
{ |
|
|
|
|
filter_rel_type |
|
|
|
|
.as_ref() |
|
|
|
|
.map_or(true, |r| &content.relates_to.rel_type == r) |
|
|
|
|
} else { |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
Direction::Backward => { |
|
|
|
|
let relations_until = &services().rooms.pdu_metadata.relations_until( |
|
|
|
|
sender_user, |
|
|
|
|
room_id, |
|
|
|
|
target, |
|
|
|
|
from, |
|
|
|
|
depth, |
|
|
|
|
)?; |
|
|
|
|
let events_before: Vec<_> = relations_until |
|
|
|
|
.iter() |
|
|
|
|
.filter(|(_, pdu)| { |
|
|
|
|
filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) |
|
|
|
|
&& if let Ok(content) = |
|
|
|
|
serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) |
|
|
|
|
{ |
|
|
|
|
filter_rel_type |
|
|
|
|
.as_ref() |
|
|
|
|
.map_or(true, |r| &content.relates_to.rel_type == r) |
|
|
|
|
} else { |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.take(limit) |
|
|
|
|
.filter_map(|r| r.ok()) // Filter out buggy events
|
|
|
|
|
.filter(|(_, pdu)| { |
|
|
|
|
services() |
|
|
|
|
.rooms |
|
|
|
|
@ -133,7 +154,7 @@ impl Service {
|
|
|
|
|
.user_can_see_event(sender_user, room_id, &pdu.event_id) |
|
|
|
|
.unwrap_or(false) |
|
|
|
|
}) |
|
|
|
|
.take_while(|&(k, _)| Some(k) != to) // Stop at `to`
|
|
|
|
|
.take_while(|&(k, _)| Some(k) != to.as_ref()) // Stop at `to`
|
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
next_token = events_before.last().map(|(count, _)| count).copied(); |
|
|
|
|
@ -147,6 +168,7 @@ impl Service {
|
|
|
|
|
chunk: events_before, |
|
|
|
|
next_batch: next_token.map(|t| t.stringify()), |
|
|
|
|
prev_batch: Some(from.stringify()), |
|
|
|
|
recursion_depth: if recurse { Some(depth.into()) } else { None }, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -158,14 +180,44 @@ impl Service {
|
|
|
|
|
room_id: &'a RoomId, |
|
|
|
|
target: &'a EventId, |
|
|
|
|
until: PduCount, |
|
|
|
|
) -> Result<impl Iterator<Item = Result<(PduCount, PduEvent)>> + 'a> { |
|
|
|
|
max_depth: u8, |
|
|
|
|
) -> Result<Vec<(PduCount, PduEvent)>> { |
|
|
|
|
let room_id = services().rooms.short.get_or_create_shortroomid(room_id)?; |
|
|
|
|
let target = match services().rooms.timeline.get_pdu_count(target)? { |
|
|
|
|
Some(PduCount::Normal(c)) => c, |
|
|
|
|
// TODO: Support backfilled relations
|
|
|
|
|
_ => 0, // This will result in an empty iterator
|
|
|
|
|
}; |
|
|
|
|
self.db.relations_until(user_id, room_id, target, until) |
|
|
|
|
|
|
|
|
|
self.db |
|
|
|
|
.relations_until(user_id, room_id, target, until) |
|
|
|
|
.map(|mut relations| { |
|
|
|
|
let mut pdus: Vec<_> = (*relations).into_iter().filter_map(Result::ok).collect(); |
|
|
|
|
let mut stack: Vec<_> = |
|
|
|
|
pdus.clone().iter().map(|pdu| (pdu.to_owned(), 1)).collect(); |
|
|
|
|
|
|
|
|
|
while let Some(stack_pdu) = stack.pop() { |
|
|
|
|
let target = match stack_pdu.0 .0 { |
|
|
|
|
PduCount::Normal(c) => c, |
|
|
|
|
// TODO: Support backfilled relations
|
|
|
|
|
PduCount::Backfilled(_) => 0, // This will result in an empty iterator
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if let Ok(relations) = self.db.relations_until(user_id, room_id, target, until) |
|
|
|
|
{ |
|
|
|
|
for relation in relations.flatten() { |
|
|
|
|
if stack_pdu.1 < max_depth { |
|
|
|
|
stack.push((relation.clone(), stack_pdu.1 + 1)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pdus.push(relation); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pdus.sort_by(|a, b| a.0.cmp(&b.0)); |
|
|
|
|
pdus |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(self, room_id, event_ids))] |
|
|
|
|
|