|
|
|
|
@ -42,19 +42,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
Ok(self |
|
|
|
|
.eventid_pduid |
|
|
|
|
.get(event_id.as_bytes())? |
|
|
|
|
.map(|pdu_id| Ok::<_, Error>(PduCount::Normal(pdu_count(&pdu_id)?))) |
|
|
|
|
.transpose()? |
|
|
|
|
.map_or_else( |
|
|
|
|
|| { |
|
|
|
|
Ok::<_, Error>( |
|
|
|
|
self.eventid_backfillpduid |
|
|
|
|
.get(event_id.as_bytes())? |
|
|
|
|
.map(|pdu_id| Ok::<_, Error>(PduCount::Backfilled(pdu_count(&pdu_id)?))) |
|
|
|
|
.transpose()?, |
|
|
|
|
) |
|
|
|
|
}, |
|
|
|
|
|x| Ok(Some(x)), |
|
|
|
|
)?) |
|
|
|
|
.map(|pdu_id| pdu_count(&pdu_id)) |
|
|
|
|
.transpose()?) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns the json of a pdu.
|
|
|
|
|
@ -83,21 +72,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid.")) |
|
|
|
|
}) |
|
|
|
|
.transpose()? |
|
|
|
|
.map_or_else( |
|
|
|
|
|| { |
|
|
|
|
Ok::<_, Error>( |
|
|
|
|
self.eventid_backfillpduid |
|
|
|
|
.get(event_id.as_bytes())? |
|
|
|
|
.map(|pduid| { |
|
|
|
|
self.pduid_backfillpdu.get(&pduid)?.ok_or_else(|| { |
|
|
|
|
Error::bad_database("Invalid pduid in eventid_pduid.") |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
.transpose()?, |
|
|
|
|
) |
|
|
|
|
}, |
|
|
|
|
|x| Ok(Some(x)), |
|
|
|
|
)? |
|
|
|
|
.map(|pdu| { |
|
|
|
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) |
|
|
|
|
}) |
|
|
|
|
@ -106,10 +80,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
|
|
|
|
|
/// Returns the pdu's id.
|
|
|
|
|
fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> { |
|
|
|
|
Ok(self.eventid_pduid.get(event_id.as_bytes())?.map_or_else( |
|
|
|
|
|| self.eventid_backfillpduid.get(event_id.as_bytes()), |
|
|
|
|
|x| Ok(Some(x)), |
|
|
|
|
)?) |
|
|
|
|
Ok(self.eventid_pduid.get(event_id.as_bytes())?) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns the pdu.
|
|
|
|
|
@ -124,21 +95,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid.")) |
|
|
|
|
}) |
|
|
|
|
.transpose()? |
|
|
|
|
.map_or_else( |
|
|
|
|
|| { |
|
|
|
|
Ok::<_, Error>( |
|
|
|
|
self.eventid_backfillpduid |
|
|
|
|
.get(event_id.as_bytes())? |
|
|
|
|
.map(|pduid| { |
|
|
|
|
self.pduid_backfillpdu.get(&pduid)?.ok_or_else(|| { |
|
|
|
|
Error::bad_database("Invalid pduid in eventid_pduid.") |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
.transpose()?, |
|
|
|
|
) |
|
|
|
|
}, |
|
|
|
|
|x| Ok(Some(x)), |
|
|
|
|
)? |
|
|
|
|
.map(|pdu| { |
|
|
|
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) |
|
|
|
|
}) |
|
|
|
|
@ -183,28 +139,22 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
///
|
|
|
|
|
/// This does __NOT__ check the outliers `Tree`.
|
|
|
|
|
fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<Option<PduEvent>> { |
|
|
|
|
self.pduid_pdu |
|
|
|
|
.get(pdu_id)? |
|
|
|
|
.map_or_else(|| self.pduid_backfillpdu.get(pdu_id), |x| Ok(Some(x)))? |
|
|
|
|
.map_or(Ok(None), |pdu| { |
|
|
|
|
Ok(Some( |
|
|
|
|
serde_json::from_slice(&pdu) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, |
|
|
|
|
)) |
|
|
|
|
}) |
|
|
|
|
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { |
|
|
|
|
Ok(Some( |
|
|
|
|
serde_json::from_slice(&pdu) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, |
|
|
|
|
)) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
|
|
|
|
|
fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<Option<CanonicalJsonObject>> { |
|
|
|
|
self.pduid_pdu |
|
|
|
|
.get(pdu_id)? |
|
|
|
|
.map_or_else(|| self.pduid_backfillpdu.get(pdu_id), |x| Ok(Some(x)))? |
|
|
|
|
.map_or(Ok(None), |pdu| { |
|
|
|
|
Ok(Some( |
|
|
|
|
serde_json::from_slice(&pdu) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, |
|
|
|
|
)) |
|
|
|
|
}) |
|
|
|
|
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { |
|
|
|
|
Ok(Some( |
|
|
|
|
serde_json::from_slice(&pdu) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, |
|
|
|
|
)) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn append_pdu( |
|
|
|
|
@ -236,13 +186,12 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
event_id: &EventId, |
|
|
|
|
json: &CanonicalJsonObject, |
|
|
|
|
) -> Result<()> { |
|
|
|
|
self.pduid_backfillpdu.insert( |
|
|
|
|
self.pduid_pdu.insert( |
|
|
|
|
pdu_id, |
|
|
|
|
&serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), |
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
self.eventid_backfillpduid |
|
|
|
|
.insert(event_id.as_bytes(), pdu_id)?; |
|
|
|
|
self.eventid_pduid.insert(event_id.as_bytes(), pdu_id)?; |
|
|
|
|
self.eventid_outlierpdu.remove(event_id.as_bytes())?; |
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
@ -272,64 +221,24 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
room_id: &RoomId, |
|
|
|
|
until: PduCount, |
|
|
|
|
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { |
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
|
let prefix = services() |
|
|
|
|
.rooms |
|
|
|
|
.short |
|
|
|
|
.get_shortroomid(room_id)? |
|
|
|
|
.expect("room exists") |
|
|
|
|
.to_be_bytes() |
|
|
|
|
.to_vec(); |
|
|
|
|
|
|
|
|
|
let mut current_backfill = prefix.clone(); |
|
|
|
|
// +1 so we don't send the base event
|
|
|
|
|
let backfill_count = match until { |
|
|
|
|
PduCount::Backfilled(x) => x + 1, |
|
|
|
|
PduCount::Normal(_) => 0, |
|
|
|
|
}; |
|
|
|
|
current_backfill.extend_from_slice(&backfill_count.to_be_bytes()); |
|
|
|
|
let (prefix, current) = count_to_id(&room_id, until, 1, true)?; |
|
|
|
|
|
|
|
|
|
let user_id = user_id.to_owned(); |
|
|
|
|
let user_id2 = user_id.to_owned(); |
|
|
|
|
let prefix2 = prefix.clone(); |
|
|
|
|
|
|
|
|
|
let backfill_iter = self |
|
|
|
|
.pduid_backfillpdu |
|
|
|
|
.iter_from(¤t_backfill, false) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(&v) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; |
|
|
|
|
if pdu.sender != user_id { |
|
|
|
|
pdu.remove_transaction_id()?; |
|
|
|
|
} |
|
|
|
|
let count = PduCount::Backfilled(pdu_count(&pdu_id)?); |
|
|
|
|
Ok((count, pdu)) |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
match until { |
|
|
|
|
PduCount::Backfilled(_) => Ok(Box::new(backfill_iter)), |
|
|
|
|
PduCount::Normal(x) => { |
|
|
|
|
let mut current_normal = prefix2.clone(); |
|
|
|
|
// -1 so we don't send the base event
|
|
|
|
|
current_normal.extend_from_slice(&x.saturating_sub(1).to_be_bytes()); |
|
|
|
|
let normal_iter = self |
|
|
|
|
.pduid_pdu |
|
|
|
|
.iter_from(¤t_normal, true) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix2)) |
|
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(&v) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; |
|
|
|
|
if pdu.sender != user_id2 { |
|
|
|
|
pdu.remove_transaction_id()?; |
|
|
|
|
} |
|
|
|
|
let count = PduCount::Normal(pdu_count(&pdu_id)?); |
|
|
|
|
Ok((count, pdu)) |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
Ok(Box::new(normal_iter.chain(backfill_iter))) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(Box::new( |
|
|
|
|
self.pduid_pdu |
|
|
|
|
.iter_from(¤t, true) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(&v) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; |
|
|
|
|
if pdu.sender != user_id { |
|
|
|
|
pdu.remove_transaction_id()?; |
|
|
|
|
} |
|
|
|
|
let count = pdu_count(&pdu_id)?; |
|
|
|
|
Ok((count, pdu)) |
|
|
|
|
}), |
|
|
|
|
)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn pdus_after<'a>( |
|
|
|
|
@ -338,64 +247,24 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
room_id: &RoomId, |
|
|
|
|
from: PduCount, |
|
|
|
|
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { |
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
|
let prefix = services() |
|
|
|
|
.rooms |
|
|
|
|
.short |
|
|
|
|
.get_shortroomid(room_id)? |
|
|
|
|
.expect("room exists") |
|
|
|
|
.to_be_bytes() |
|
|
|
|
.to_vec(); |
|
|
|
|
|
|
|
|
|
let mut current_normal = prefix.clone(); |
|
|
|
|
// +1 so we don't send the base event
|
|
|
|
|
let normal_count = match from { |
|
|
|
|
PduCount::Normal(x) => x + 1, |
|
|
|
|
PduCount::Backfilled(_) => 0, |
|
|
|
|
}; |
|
|
|
|
current_normal.extend_from_slice(&normal_count.to_be_bytes()); |
|
|
|
|
let (prefix, current) = count_to_id(&room_id, from, 1, false)?; |
|
|
|
|
|
|
|
|
|
let user_id = user_id.to_owned(); |
|
|
|
|
let user_id2 = user_id.to_owned(); |
|
|
|
|
let prefix2 = prefix.clone(); |
|
|
|
|
|
|
|
|
|
let normal_iter = self |
|
|
|
|
.pduid_pdu |
|
|
|
|
.iter_from(¤t_normal, false) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(&v) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; |
|
|
|
|
if pdu.sender != user_id { |
|
|
|
|
pdu.remove_transaction_id()?; |
|
|
|
|
} |
|
|
|
|
let count = PduCount::Normal(pdu_count(&pdu_id)?); |
|
|
|
|
Ok((count, pdu)) |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
match from { |
|
|
|
|
PduCount::Normal(_) => Ok(Box::new(normal_iter)), |
|
|
|
|
PduCount::Backfilled(x) => { |
|
|
|
|
let mut current_backfill = prefix2.clone(); |
|
|
|
|
// -1 so we don't send the base event
|
|
|
|
|
current_backfill.extend_from_slice(&x.saturating_sub(1).to_be_bytes()); |
|
|
|
|
let backfill_iter = self |
|
|
|
|
.pduid_backfillpdu |
|
|
|
|
.iter_from(¤t_backfill, true) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix2)) |
|
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(&v) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; |
|
|
|
|
if pdu.sender != user_id2 { |
|
|
|
|
pdu.remove_transaction_id()?; |
|
|
|
|
} |
|
|
|
|
let count = PduCount::Backfilled(pdu_count(&pdu_id)?); |
|
|
|
|
Ok((count, pdu)) |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
Ok(Box::new(backfill_iter.chain(normal_iter))) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(Box::new( |
|
|
|
|
self.pduid_pdu |
|
|
|
|
.iter_from(¤t, false) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(&v) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; |
|
|
|
|
if pdu.sender != user_id { |
|
|
|
|
pdu.remove_transaction_id()?; |
|
|
|
|
} |
|
|
|
|
let count = pdu_count(&pdu_id)?; |
|
|
|
|
Ok((count, pdu)) |
|
|
|
|
}), |
|
|
|
|
)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn increment_notification_counts( |
|
|
|
|
@ -428,7 +297,58 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns the `count` of this pdu's id.
|
|
|
|
|
fn pdu_count(pdu_id: &[u8]) -> Result<u64> { |
|
|
|
|
utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..]) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU has invalid count bytes.")) |
|
|
|
|
fn pdu_count(pdu_id: &[u8]) -> Result<PduCount> { |
|
|
|
|
let last_u64 = utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..]) |
|
|
|
|
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?; |
|
|
|
|
let second_last_u64 = utils::u64_from_bytes( |
|
|
|
|
&pdu_id[pdu_id.len() - 2 * size_of::<u64>()..pdu_id.len() - size_of::<u64>()], |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
if matches!(second_last_u64, Ok(0)) { |
|
|
|
|
Ok(PduCount::Backfilled(u64::MAX - last_u64)) |
|
|
|
|
} else { |
|
|
|
|
Ok(PduCount::Normal(last_u64)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn count_to_id( |
|
|
|
|
room_id: &RoomId, |
|
|
|
|
count: PduCount, |
|
|
|
|
offset: u64, |
|
|
|
|
subtract: bool, |
|
|
|
|
) -> Result<(Vec<u8>, Vec<u8>)> { |
|
|
|
|
let prefix = services() |
|
|
|
|
.rooms |
|
|
|
|
.short |
|
|
|
|
.get_shortroomid(room_id)? |
|
|
|
|
.expect("room exists") |
|
|
|
|
.to_be_bytes() |
|
|
|
|
.to_vec(); |
|
|
|
|
let mut pdu_id = prefix.clone(); |
|
|
|
|
// +1 so we don't send the base event
|
|
|
|
|
let count_raw = match count { |
|
|
|
|
PduCount::Normal(x) => { |
|
|
|
|
if subtract { |
|
|
|
|
x - offset |
|
|
|
|
} else { |
|
|
|
|
x + offset |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
PduCount::Backfilled(x) => { |
|
|
|
|
pdu_id.extend_from_slice(&0_u64.to_be_bytes()); |
|
|
|
|
let num = u64::MAX - x; |
|
|
|
|
if subtract { |
|
|
|
|
if num > 0 { |
|
|
|
|
num - offset |
|
|
|
|
} else { |
|
|
|
|
num |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
num + offset |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
pdu_id.extend_from_slice(&count_raw.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
Ok((prefix, pdu_id)) |
|
|
|
|
} |
|
|
|
|
|