|
|
|
|
@ -3,8 +3,9 @@ mod data;
|
|
|
|
|
pub use data::Data; |
|
|
|
|
|
|
|
|
|
use std::{ |
|
|
|
|
collections::{BTreeMap, HashMap, HashSet}, |
|
|
|
|
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, |
|
|
|
|
fmt::Debug, |
|
|
|
|
future::Future, |
|
|
|
|
sync::Arc, |
|
|
|
|
time::{Duration, Instant}, |
|
|
|
|
}; |
|
|
|
|
@ -40,34 +41,34 @@ use ruma::{
|
|
|
|
|
}; |
|
|
|
|
use tokio::{ |
|
|
|
|
select, |
|
|
|
|
sync::{mpsc, Mutex, Semaphore}, |
|
|
|
|
sync::{mpsc, oneshot, Mutex, Semaphore}, |
|
|
|
|
}; |
|
|
|
|
use tracing::{debug, error, warn}; |
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)] |
|
|
|
|
pub enum OutgoingKind { |
|
|
|
|
pub enum OutgoingDestination { |
|
|
|
|
Appservice(String), |
|
|
|
|
Push(OwnedUserId, String), // user and pushkey
|
|
|
|
|
Normal(OwnedServerName), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl OutgoingKind { |
|
|
|
|
impl OutgoingDestination { |
|
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
|
pub fn get_prefix(&self) -> Vec<u8> { |
|
|
|
|
let mut prefix = match self { |
|
|
|
|
OutgoingKind::Appservice(server) => { |
|
|
|
|
OutgoingDestination::Appservice(server) => { |
|
|
|
|
let mut p = b"+".to_vec(); |
|
|
|
|
p.extend_from_slice(server.as_bytes()); |
|
|
|
|
p |
|
|
|
|
} |
|
|
|
|
OutgoingKind::Push(user, pushkey) => { |
|
|
|
|
OutgoingDestination::Push(user, pushkey) => { |
|
|
|
|
let mut p = b"$".to_vec(); |
|
|
|
|
p.extend_from_slice(user.as_bytes()); |
|
|
|
|
p.push(0xff); |
|
|
|
|
p.extend_from_slice(pushkey.as_bytes()); |
|
|
|
|
p |
|
|
|
|
} |
|
|
|
|
OutgoingKind::Normal(server) => { |
|
|
|
|
OutgoingDestination::Normal(server) => { |
|
|
|
|
let mut p = Vec::new(); |
|
|
|
|
p.extend_from_slice(server.as_bytes()); |
|
|
|
|
p |
|
|
|
|
@ -77,6 +78,47 @@ impl OutgoingKind {
|
|
|
|
|
|
|
|
|
|
prefix |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This wraps the OutgoingDestination key in an interruptible sleep future.
|
|
|
|
|
//
|
|
|
|
|
// The first return value is the future, the second is the oneshot that interrupts that future,
|
|
|
|
|
// and causes it to return instantly.
|
|
|
|
|
fn wrap_in_interruptible_sleep( |
|
|
|
|
self, |
|
|
|
|
at: Instant, |
|
|
|
|
) -> (impl Future<Output = Self>, oneshot::Sender<()>) { |
|
|
|
|
let (tx, rx) = oneshot::channel(); |
|
|
|
|
let at = tokio::time::Instant::from_std(at); |
|
|
|
|
|
|
|
|
|
( |
|
|
|
|
async move { |
|
|
|
|
let _ = tokio::time::timeout_at(at, rx).await; |
|
|
|
|
|
|
|
|
|
self |
|
|
|
|
}, |
|
|
|
|
tx, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl std::fmt::Display for OutgoingDestination { |
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
|
|
|
|
match self { |
|
|
|
|
OutgoingDestination::Appservice(appservice_id) => { |
|
|
|
|
write!(f, "Appservice (ID {:?})", appservice_id) |
|
|
|
|
} |
|
|
|
|
OutgoingDestination::Push(user, push_key) => { |
|
|
|
|
write!( |
|
|
|
|
f, |
|
|
|
|
"User Push Service (for {:?}, with key {:?})", |
|
|
|
|
user, push_key |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
OutgoingDestination::Normal(server) => { |
|
|
|
|
write!(f, "Matrix Server ({:?})", server) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)] |
|
|
|
|
@ -90,14 +132,29 @@ pub struct Service {
|
|
|
|
|
|
|
|
|
|
/// The state for a given state hash.
|
|
|
|
|
pub(super) maximum_requests: Arc<Semaphore>, |
|
|
|
|
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, |
|
|
|
|
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, |
|
|
|
|
pub sender: mpsc::UnboundedSender<(OutgoingDestination, SendingEventType, Vec<u8>)>, |
|
|
|
|
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingDestination, SendingEventType, Vec<u8>)>>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
enum TransactionStatus { |
|
|
|
|
// Currently running (for the first time)
|
|
|
|
|
Running, |
|
|
|
|
Failed(u32, Instant), // number of times failed, time of last failure
|
|
|
|
|
Retrying(u32), // number of times failed
|
|
|
|
|
// Failed, backing off for a retry
|
|
|
|
|
Failed { |
|
|
|
|
failures: u32, |
|
|
|
|
waker: Option<oneshot::Sender<()>>, |
|
|
|
|
}, |
|
|
|
|
// Currently retrying
|
|
|
|
|
Retrying { |
|
|
|
|
failures: u32, |
|
|
|
|
}, // number of times failed
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// A control-flow enum to dictate what the handler should do after (trying to) prepare a transaction
|
|
|
|
|
enum TransactionPrepOutcome { |
|
|
|
|
Send(Vec<SendingEventType>), |
|
|
|
|
Wake(OutgoingDestination), |
|
|
|
|
Nothing, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Service { |
|
|
|
|
@ -119,14 +176,17 @@ impl Service {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn handler(&self) -> Result<()> { |
|
|
|
|
let mut receiver = self.receiver.lock().await; |
|
|
|
|
let mut new_transactions = self.receiver.lock().await; |
|
|
|
|
let (waking_sender, mut waking_receiver) = mpsc::unbounded_channel(); |
|
|
|
|
|
|
|
|
|
let mut futures = FuturesUnordered::new(); |
|
|
|
|
let mut outgoing = FuturesUnordered::new(); |
|
|
|
|
let mut retrying = FuturesUnordered::new(); |
|
|
|
|
|
|
|
|
|
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); |
|
|
|
|
let mut current_transaction_status = |
|
|
|
|
HashMap::<OutgoingDestination, TransactionStatus>::new(); |
|
|
|
|
|
|
|
|
|
// Retry requests we could not finish yet
|
|
|
|
|
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new(); |
|
|
|
|
let mut initial_transactions = HashMap::<OutgoingDestination, Vec<SendingEventType>>::new(); |
|
|
|
|
|
|
|
|
|
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) { |
|
|
|
|
let entry = initial_transactions |
|
|
|
|
@ -147,12 +207,34 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
for (outgoing_kind, events) in initial_transactions { |
|
|
|
|
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); |
|
|
|
|
futures.push(Self::handle_events(outgoing_kind.clone(), events)); |
|
|
|
|
outgoing.push(Self::handle_events(outgoing_kind.clone(), events)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
loop { |
|
|
|
|
select! { |
|
|
|
|
Some(response) = futures.next() => { |
|
|
|
|
// New transactions to be sent out (from server/user activity)
|
|
|
|
|
Some((dest, event, key)) = new_transactions.recv() => { |
|
|
|
|
match self.prepare_transaction( |
|
|
|
|
&dest, |
|
|
|
|
vec![(event, key)], |
|
|
|
|
&mut current_transaction_status, |
|
|
|
|
true, |
|
|
|
|
) { |
|
|
|
|
Ok(TransactionPrepOutcome::Send(events)) => { |
|
|
|
|
outgoing.push(Self::handle_events(dest, events)); |
|
|
|
|
}, |
|
|
|
|
Ok(TransactionPrepOutcome::Wake(dest)) => { |
|
|
|
|
waking_sender.send(dest).expect("nothing closes this channel but ourselves"); |
|
|
|
|
}, |
|
|
|
|
Ok(TransactionPrepOutcome::Nothing) => {}, |
|
|
|
|
Err(err) => { |
|
|
|
|
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
Some(response) = outgoing.next() => { |
|
|
|
|
// Outgoing transaction succeeded
|
|
|
|
|
match response { |
|
|
|
|
Ok(outgoing_kind) => { |
|
|
|
|
self.db.delete_all_active_requests_for(&outgoing_kind)?; |
|
|
|
|
@ -164,9 +246,12 @@ impl Service {
|
|
|
|
|
// Insert pdus we found
|
|
|
|
|
self.db.mark_as_active(&new_events)?; |
|
|
|
|
|
|
|
|
|
futures.push( |
|
|
|
|
// Clear retries
|
|
|
|
|
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); |
|
|
|
|
|
|
|
|
|
outgoing.push( |
|
|
|
|
Self::handle_events( |
|
|
|
|
outgoing_kind.clone(), |
|
|
|
|
outgoing_kind, |
|
|
|
|
new_events.into_iter().map(|(event, _)| event).collect(), |
|
|
|
|
) |
|
|
|
|
); |
|
|
|
|
@ -174,72 +259,160 @@ impl Service {
|
|
|
|
|
current_transaction_status.remove(&outgoing_kind); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err((outgoing_kind, _)) => { |
|
|
|
|
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { |
|
|
|
|
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), |
|
|
|
|
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), |
|
|
|
|
TransactionStatus::Failed(_, _) => { |
|
|
|
|
error!("Request that was not even running failed?!"); |
|
|
|
|
return |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
// Outgoing transaction failed
|
|
|
|
|
Err((destination, err)) => { |
|
|
|
|
// Set status to Failed, create timer
|
|
|
|
|
let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone()); |
|
|
|
|
|
|
|
|
|
// Add timer to loop
|
|
|
|
|
retrying.push(timer); |
|
|
|
|
|
|
|
|
|
warn!("Outgoing request to {} failed: {}", destination, err); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
}, |
|
|
|
|
Some((outgoing_kind, event, key)) = receiver.recv() => { |
|
|
|
|
if let Ok(Some(events)) = self.select_events( |
|
|
|
|
&outgoing_kind, |
|
|
|
|
vec![(event, key)], |
|
|
|
|
|
|
|
|
|
// Transaction retry timers firing
|
|
|
|
|
Some(dest) = retrying.next() => { |
|
|
|
|
// Transition Failed => Retrying, return pending old transaction events
|
|
|
|
|
match self.prepare_transaction( |
|
|
|
|
&dest, |
|
|
|
|
vec![], // will be ignored because fresh == false
|
|
|
|
|
&mut current_transaction_status, |
|
|
|
|
false, |
|
|
|
|
) { |
|
|
|
|
futures.push(Self::handle_events(outgoing_kind, events)); |
|
|
|
|
Ok(TransactionPrepOutcome::Send(events)) => { |
|
|
|
|
outgoing.push(Self::handle_events(dest, events)); |
|
|
|
|
} |
|
|
|
|
Ok(_) => { |
|
|
|
|
// Unreachable because fresh == false
|
|
|
|
|
unreachable!("prepare_transaction on a stale transaction {} did not return ::Send", dest) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Err(err) => { |
|
|
|
|
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err); |
|
|
|
|
|
|
|
|
|
// transaction dropped, so drop destination as well.
|
|
|
|
|
current_transaction_status.remove(&dest); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
// Explicit wakeups, makes a backoff timer return immediately
|
|
|
|
|
Some(outgoing) = waking_receiver.recv() => { |
|
|
|
|
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) { |
|
|
|
|
if let Some(waker) = waker.take() { |
|
|
|
|
let _ = waker.send(()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Generates timer/oneshot, alters status to reflect Failed
|
|
|
|
|
///
|
|
|
|
|
/// Returns timer/oneshot future to wake up loop for next retry
|
|
|
|
|
fn mark_failed_and_backoff( |
|
|
|
|
status: &mut HashMap<OutgoingDestination, TransactionStatus>, |
|
|
|
|
dest: OutgoingDestination, |
|
|
|
|
) -> impl Future<Output = OutgoingDestination> { |
|
|
|
|
let now = Instant::now(); |
|
|
|
|
|
|
|
|
|
let entry = status |
|
|
|
|
.get_mut(&dest) |
|
|
|
|
.expect("guaranteed to be set before this function"); |
|
|
|
|
|
|
|
|
|
let failures = match entry { |
|
|
|
|
// Running -> Failed
|
|
|
|
|
TransactionStatus::Running => 1, |
|
|
|
|
// Retrying -> Failed
|
|
|
|
|
TransactionStatus::Retrying { failures } => *failures + 1, |
|
|
|
|
|
|
|
|
|
// The transition of Failed -> Retrying is handled by handle_events
|
|
|
|
|
TransactionStatus::Failed { .. } => { |
|
|
|
|
unreachable!("TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, bailing...") |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24); |
|
|
|
|
|
|
|
|
|
// Exponential backoff, clamp upper value to one day
|
|
|
|
|
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY); |
|
|
|
|
|
|
|
|
|
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup); |
|
|
|
|
|
|
|
|
|
*entry = TransactionStatus::Failed { |
|
|
|
|
failures, |
|
|
|
|
waker: Some(waker), |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
fut |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This prepares a transaction, checks the transaction state, and selects appropriate events.
|
|
|
|
|
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] |
|
|
|
|
fn select_events( |
|
|
|
|
fn prepare_transaction( |
|
|
|
|
&self, |
|
|
|
|
outgoing_kind: &OutgoingKind, |
|
|
|
|
outgoing_kind: &OutgoingDestination, |
|
|
|
|
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
|
|
|
|
|
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, |
|
|
|
|
) -> Result<Option<Vec<SendingEventType>>> { |
|
|
|
|
let mut retry = false; |
|
|
|
|
let mut allow = true; |
|
|
|
|
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, |
|
|
|
|
fresh: bool, // Wether or not this transaction came from server activity.
|
|
|
|
|
) -> Result<TransactionPrepOutcome> { |
|
|
|
|
let mut allowed = true; |
|
|
|
|
let mut retrying = false; |
|
|
|
|
let mut wake_up = false; |
|
|
|
|
|
|
|
|
|
let entry = current_transaction_status.entry(outgoing_kind.clone()); |
|
|
|
|
|
|
|
|
|
entry |
|
|
|
|
.and_modify(|e| match e { |
|
|
|
|
TransactionStatus::Running | TransactionStatus::Retrying(_) => { |
|
|
|
|
allow = false; // already running
|
|
|
|
|
} |
|
|
|
|
TransactionStatus::Failed(tries, time) => { |
|
|
|
|
// Fail if a request has failed recently (exponential backoff)
|
|
|
|
|
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); |
|
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
|
min_elapsed_duration = Duration::from_secs(60 * 60 * 24); |
|
|
|
|
if fresh { |
|
|
|
|
// If its fresh, we initialise the status if we need to.
|
|
|
|
|
//
|
|
|
|
|
// We do nothing if it is already running or retrying.
|
|
|
|
|
//
|
|
|
|
|
// We return with a wake if it is in the Failed state.
|
|
|
|
|
entry |
|
|
|
|
.and_modify(|e| match e { |
|
|
|
|
TransactionStatus::Running | TransactionStatus::Retrying { .. } => { |
|
|
|
|
// already running
|
|
|
|
|
allowed = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if time.elapsed() < min_elapsed_duration { |
|
|
|
|
allow = false; |
|
|
|
|
} else { |
|
|
|
|
retry = true; |
|
|
|
|
*e = TransactionStatus::Retrying(*tries); |
|
|
|
|
TransactionStatus::Failed { .. } => { |
|
|
|
|
// Currently sleeping, time to call the kool-aid man
|
|
|
|
|
wake_up = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.or_insert(TransactionStatus::Running); |
|
|
|
|
}) |
|
|
|
|
.or_insert(TransactionStatus::Running); |
|
|
|
|
} else { |
|
|
|
|
// If it's not fresh, we expect an entry.
|
|
|
|
|
//
|
|
|
|
|
// We also expect us to be the only one who are touching this destination right now, and its a stale transaction, so it must be in the Failed state
|
|
|
|
|
match entry { |
|
|
|
|
Entry::Occupied(mut e) => { |
|
|
|
|
let e = e.get_mut(); |
|
|
|
|
match e { |
|
|
|
|
TransactionStatus::Failed { failures, .. } => { |
|
|
|
|
*e = TransactionStatus::Retrying { failures: *failures }; |
|
|
|
|
retrying = true; |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
_ => unreachable!("Encountered bad state when preparing stale transaction: expected Failed state, got Running or Retrying") |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
Entry::Vacant(_) => unreachable!("Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !allow { |
|
|
|
|
return Ok(None); |
|
|
|
|
if wake_up { |
|
|
|
|
return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone())); |
|
|
|
|
} else if !allowed { |
|
|
|
|
return Ok(TransactionPrepOutcome::Nothing); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut events = Vec::new(); |
|
|
|
|
|
|
|
|
|
if retry { |
|
|
|
|
if retrying { |
|
|
|
|
// We retry the previous transaction
|
|
|
|
|
for (_, e) in self |
|
|
|
|
.db |
|
|
|
|
@ -254,7 +427,7 @@ impl Service {
|
|
|
|
|
events.push(e); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let OutgoingKind::Normal(server_name) = outgoing_kind { |
|
|
|
|
if let OutgoingDestination::Normal(server_name) = outgoing_kind { |
|
|
|
|
if let Ok((select_edus, last_count)) = self.select_edus(server_name) { |
|
|
|
|
events.extend(select_edus.into_iter().map(SendingEventType::Edu)); |
|
|
|
|
|
|
|
|
|
@ -263,7 +436,7 @@ impl Service {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Ok(Some(events)) |
|
|
|
|
Ok(TransactionPrepOutcome::Send(events)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(self, server_name))] |
|
|
|
|
@ -371,7 +544,7 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(self, pdu_id, user, pushkey))] |
|
|
|
|
pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { |
|
|
|
|
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); |
|
|
|
|
let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey); |
|
|
|
|
let event = SendingEventType::Pdu(pdu_id.to_owned()); |
|
|
|
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; |
|
|
|
|
self.sender |
|
|
|
|
@ -391,7 +564,7 @@ impl Service {
|
|
|
|
|
.into_iter() |
|
|
|
|
.map(|server| { |
|
|
|
|
( |
|
|
|
|
OutgoingKind::Normal(server), |
|
|
|
|
OutgoingDestination::Normal(server), |
|
|
|
|
SendingEventType::Pdu(pdu_id.to_owned()), |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
@ -418,7 +591,7 @@ impl Service {
|
|
|
|
|
serialized: Vec<u8>, |
|
|
|
|
id: u64, |
|
|
|
|
) -> Result<()> { |
|
|
|
|
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); |
|
|
|
|
let outgoing_kind = OutgoingDestination::Normal(server.to_owned()); |
|
|
|
|
let event = SendingEventType::Edu(serialized); |
|
|
|
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; |
|
|
|
|
self.sender |
|
|
|
|
@ -430,7 +603,7 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
|
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> { |
|
|
|
|
let outgoing_kind = OutgoingKind::Appservice(appservice_id); |
|
|
|
|
let outgoing_kind = OutgoingDestination::Appservice(appservice_id); |
|
|
|
|
let event = SendingEventType::Pdu(pdu_id); |
|
|
|
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; |
|
|
|
|
self.sender |
|
|
|
|
@ -446,18 +619,18 @@ impl Service {
|
|
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
|
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { |
|
|
|
|
self.db |
|
|
|
|
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; |
|
|
|
|
.delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?; |
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(events, kind))] |
|
|
|
|
async fn handle_events( |
|
|
|
|
kind: OutgoingKind, |
|
|
|
|
kind: OutgoingDestination, |
|
|
|
|
events: Vec<SendingEventType>, |
|
|
|
|
) -> Result<OutgoingKind, (OutgoingKind, Error)> { |
|
|
|
|
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> { |
|
|
|
|
match &kind { |
|
|
|
|
OutgoingKind::Appservice(id) => { |
|
|
|
|
OutgoingDestination::Appservice(id) => { |
|
|
|
|
let mut pdu_jsons = Vec::new(); |
|
|
|
|
|
|
|
|
|
for event in &events { |
|
|
|
|
@ -522,7 +695,7 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
response |
|
|
|
|
} |
|
|
|
|
OutgoingKind::Push(userid, pushkey) => { |
|
|
|
|
OutgoingDestination::Push(userid, pushkey) => { |
|
|
|
|
let mut pdus = Vec::new(); |
|
|
|
|
|
|
|
|
|
for event in &events { |
|
|
|
|
@ -561,14 +734,16 @@ impl Service {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let pusher = match services() |
|
|
|
|
.pusher |
|
|
|
|
.get_pusher(userid, pushkey) |
|
|
|
|
.map_err(|e| (OutgoingKind::Push(userid.clone(), pushkey.clone()), e))? |
|
|
|
|
{ |
|
|
|
|
Some(pusher) => pusher, |
|
|
|
|
None => continue, |
|
|
|
|
}; |
|
|
|
|
let pusher = |
|
|
|
|
match services().pusher.get_pusher(userid, pushkey).map_err(|e| { |
|
|
|
|
( |
|
|
|
|
OutgoingDestination::Push(userid.clone(), pushkey.clone()), |
|
|
|
|
e, |
|
|
|
|
) |
|
|
|
|
})? { |
|
|
|
|
Some(pusher) => pusher, |
|
|
|
|
None => continue, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let rules_for_user = services() |
|
|
|
|
.account_data |
|
|
|
|
@ -601,9 +776,9 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
drop(permit); |
|
|
|
|
} |
|
|
|
|
Ok(OutgoingKind::Push(userid.clone(), pushkey.clone())) |
|
|
|
|
Ok(OutgoingDestination::Push(userid.clone(), pushkey.clone())) |
|
|
|
|
} |
|
|
|
|
OutgoingKind::Normal(server) => { |
|
|
|
|
OutgoingDestination::Normal(server) => { |
|
|
|
|
let mut edu_jsons = Vec::new(); |
|
|
|
|
let mut pdu_jsons = Vec::new(); |
|
|
|
|
|
|
|
|
|
@ -615,11 +790,11 @@ impl Service {
|
|
|
|
|
services().rooms |
|
|
|
|
.timeline |
|
|
|
|
.get_pdu_json_from_id(pdu_id) |
|
|
|
|
.map_err(|e| (OutgoingKind::Normal(server.clone()), e))? |
|
|
|
|
.map_err(|e| (OutgoingDestination::Normal(server.clone()), e))? |
|
|
|
|
.ok_or_else(|| { |
|
|
|
|
error!("event not found: {server} {pdu_id:?}"); |
|
|
|
|
( |
|
|
|
|
OutgoingKind::Normal(server.clone()), |
|
|
|
|
OutgoingDestination::Normal(server.clone()), |
|
|
|
|
Error::bad_database( |
|
|
|
|
"[Normal] Event in servernamevent_data not found in db.", |
|
|
|
|
), |
|
|
|
|
|