From fc92385f8f4aeb288a0713c8a29664523fb470b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Commaille?= Date: Wed, 24 Sep 2025 12:15:09 +0200 Subject: [PATCH] utils: Create OneshotNotifier Allows to use a simpler API around oneshot channels at the cost of ignoring potential developer errors where the sender or receiver is dropped too soon. --- src/account_chooser_dialog/mod.rs | 59 +++++----- src/components/dialogs/auth/mod.rs | 31 +++--- src/login/advanced_dialog.rs | 22 ++-- src/session/model/notifications/mod.rs | 17 ++- .../room_details/upgrade_dialog/mod.rs | 41 +++---- .../message_toolbar/attachment_dialog.rs | 34 +++--- src/utils/media/audio.rs | 32 ++---- src/utils/media/mod.rs | 17 +-- src/utils/media/video.rs | 53 +++------ src/utils/mod.rs | 105 +++++++++++++++++- 10 files changed, 232 insertions(+), 179 deletions(-) diff --git a/src/account_chooser_dialog/mod.rs b/src/account_chooser_dialog/mod.rs index 447ab090..c51240a6 100644 --- a/src/account_chooser_dialog/mod.rs +++ b/src/account_chooser_dialog/mod.rs @@ -1,15 +1,16 @@ use adw::{prelude::*, subclass::prelude::*}; -use futures_channel::oneshot; use gtk::glib; -use tracing::error; mod account_row; use self::account_row::AccountRow; -use crate::session_list::{SessionInfo, SessionList}; +use crate::{ + session_list::{SessionInfo, SessionList}, + utils::OneshotNotifier, +}; mod imp { - use std::cell::RefCell; + use std::cell::OnceCell; use glib::subclass::InitializingObject; @@ -24,7 +25,7 @@ mod imp { /// The list of logged-in sessions. #[property(get, set = Self::set_session_list, construct)] pub session_list: glib::WeakRef, - pub sender: RefCell>>>, + notifier: OnceCell>>, } #[glib::object_subclass] @@ -50,17 +51,19 @@ mod imp { impl AdwDialogImpl for AccountChooserDialog { fn closed(&self) { - if self - .sender - .take() - .is_some_and(|sender| sender.send(None).is_err()) - { - error!("Could not send selected session"); + if let Some(notifier) = self.notifier.get() { + notifier.notify(); } } } impl AccountChooserDialog { + /// The notifier for sending the response. + pub(super) fn notifier(&self) -> &OneshotNotifier> { + self.notifier + .get_or_init(|| OneshotNotifier::new("AccountChooserDialog")) + } + /// Set the list of logged-in sessions. fn set_session_list(&self, session_list: &SessionList) { self.accounts.bind_model(Some(session_list), |session| { @@ -92,34 +95,28 @@ impl AccountChooserDialog { /// Open this dialog to choose an account. pub async fn choose_account(&self, parent: &impl IsA) -> Option { - let (sender, receiver) = oneshot::channel(); - self.imp().sender.replace(Some(sender)); + let receiver = self.imp().notifier().listen(); self.present(Some(parent)); - receiver.await.ok().flatten() + receiver.await } /// Select the given row in the session list. #[template_callback] fn select_row(&self, row: >k::ListBoxRow) { - if let Some(sender) = self.imp().sender.take() { - let index = row - .index() - .try_into() - .expect("selected row should have an index"); - - let session_id = self - .session_list() - .and_then(|l| l.item(index)) - .and_downcast::() - .map(|s| s.session_id()); - - if sender.send(session_id).is_err() { - error!("Could not send selected session"); - } - } - + let index = row + .index() + .try_into() + .expect("selected row should have an index"); + + let session_id = self + .session_list() + .and_then(|l| l.item(index)) + .and_downcast::() + .map(|s| s.session_id()); + + self.imp().notifier().notify_value(session_id); self.close(); } } diff --git a/src/components/dialogs/auth/mod.rs b/src/components/dialogs/auth/mod.rs index 4ddeea07..34f36492 100644 --- a/src/components/dialogs/auth/mod.rs +++ b/src/components/dialogs/auth/mod.rs @@ -1,7 +1,6 @@ use std::{fmt::Debug, future::Future}; use adw::{prelude::*, subclass::prelude::*}; -use futures_channel::oneshot; use gettextrs::gettext; use gtk::{glib, glib::clone}; use matrix_sdk::{Error, encryption::CrossSigningResetAuthType}; @@ -22,11 +21,14 @@ mod in_browser_page; mod password_page; use self::{in_browser_page::AuthDialogInBrowserPage, password_page::AuthDialogPasswordPage}; -use crate::{components::ToastableDialog, prelude::*, session::model::Session, spawn_tokio, toast}; +use crate::{ + components::ToastableDialog, prelude::*, session::model::Session, spawn_tokio, toast, + utils::OneshotNotifier, +}; mod imp { use std::{ - cell::{Cell, RefCell}, + cell::{Cell, OnceCell, RefCell}, rc::Rc, sync::Arc, }; @@ -53,8 +55,8 @@ mod imp { state: RefCell>, /// The page for the current stage. current_page: RefCell>, - /// The sender to get the signal to perform the current stage. - sender: RefCell>>, + /// The notifier to signal to perform the current stage. + notifier: OnceCell>>, /// The handle to abort the current future. abort_handle: RefCell>, } @@ -69,9 +71,7 @@ mod imp { Self::bind_template(klass); klass.install_action("auth-dialog.continue", None, |obj, _, _| { - if let Some(sender) = obj.imp().sender.take() { - let _ = sender.send(()); - } + obj.imp().notifier().notify_value(Some(())); }); klass.install_action("auth-dialog.close", None, |obj, _, _| { @@ -98,6 +98,12 @@ mod imp { impl ToastableDialogImpl for AuthDialog {} impl AuthDialog { + /// The notifier to signal to perform the current stage. + fn notifier(&self) -> &OneshotNotifier> { + self.notifier + .get_or_init(|| OneshotNotifier::new("AuthDialog")) + } + /// Authenticate the user to the server via an interactive /// authentication flow. /// @@ -270,8 +276,7 @@ mod imp { return self.current_stage_auth_data(); } - let (sender, receiver) = futures_channel::oneshot::channel(); - self.sender.replace(Some(sender)); + let receiver = self.notifier().listen(); // If the stage didn't succeed, we get the same state again. let is_same_state = self @@ -288,13 +293,11 @@ mod imp { self.state.replace(Some(next_state)); } - if receiver.await.is_err() { + if receiver.await.is_none() { // The sender was dropped, which means that the user closed the dialog. return Err(AuthError::UserCancelled); } - self.sender.take(); - self.current_stage_auth_data() } @@ -483,7 +486,7 @@ mod imp { abort_handle.abort(); } - self.sender.take(); + self.notifier().notify(); } } } diff --git a/src/login/advanced_dialog.rs b/src/login/advanced_dialog.rs index 344c9321..ca480a11 100644 --- a/src/login/advanced_dialog.rs +++ b/src/login/advanced_dialog.rs @@ -1,9 +1,10 @@ use adw::{prelude::*, subclass::prelude::*}; -use futures_channel::oneshot; use gtk::glib; +use crate::utils::OneshotNotifier; + mod imp { - use std::cell::{Cell, RefCell}; + use std::cell::{Cell, OnceCell}; use glib::subclass::InitializingObject; @@ -16,7 +17,7 @@ mod imp { /// Whether auto-discovery is enabled. #[property(get, set, default = true)] autodiscovery: Cell, - sender: RefCell>>, + notifier: OnceCell, } #[glib::object_subclass] @@ -41,8 +42,8 @@ mod imp { impl AdwDialogImpl for LoginAdvancedDialog { fn closed(&self) { - if let Some(sender) = self.sender.take() { - sender.send(()).expect("receiver was not dropped"); + if let Some(notifier) = self.notifier.get() { + notifier.notify(); } } } @@ -50,15 +51,20 @@ mod imp { impl PreferencesDialogImpl for LoginAdvancedDialog {} impl LoginAdvancedDialog { + /// Get the notifier for the close signal. + fn notifier(&self) -> &OneshotNotifier { + self.notifier + .get_or_init(|| OneshotNotifier::new("LoginAdvancedDialog")) + } + /// Present this dialog. /// /// Returns when the dialog is closed. pub(super) async fn run_future(&self, parent: >k::Widget) { - let (sender, receiver) = oneshot::channel(); - self.sender.replace(Some(sender)); + let receiver = self.notifier().listen(); self.obj().present(Some(parent)); - receiver.await.expect("sender was not dropped"); + receiver.await; } } } diff --git a/src/session/model/notifications/mod.rs b/src/session/model/notifications/mod.rs index 2091c487..fcf170af 100644 --- a/src/session/model/notifications/mod.rs +++ b/src/session/model/notifications/mod.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, cell::Cell, time::Duration}; +use std::{borrow::Cow, time::Duration}; use gettextrs::gettext; use gtk::{gdk, gio, glib, prelude::*, subclass::prelude::*}; @@ -26,8 +26,9 @@ use crate::{ intent::SessionIntent, prelude::*, spawn_tokio, - utils::matrix::{ - AnySyncOrStrippedTimelineEvent, MatrixEventIdUri, MatrixIdUri, MatrixRoomIdUri, + utils::{ + OneshotNotifier, + matrix::{AnySyncOrStrippedTimelineEvent, MatrixEventIdUri, MatrixIdUri, MatrixRoomIdUri}, }, }; @@ -193,16 +194,14 @@ impl Notifications { if !room.is_room_info_initialized() { // Wait for the room to finish initializing, otherwise we will not have the // display name or the avatar. - let (sender, receiver) = futures_channel::oneshot::channel(); + let notifier = OneshotNotifier::<()>::new("Notifications::show_push"); + let receiver = notifier.listen(); - let sender_cell = Cell::new(Some(sender)); let handler_id = room.connect_is_room_info_initialized_notify(move |_| { - if let Some(sender) = sender_cell.take() { - let _ = sender.send(()); - } + notifier.notify(); }); - let _ = receiver.await; + receiver.await; room.disconnect(handler_id); } diff --git a/src/session/view/content/room_details/upgrade_dialog/mod.rs b/src/session/view/content/room_details/upgrade_dialog/mod.rs index f82d3369..c064adbf 100644 --- a/src/session/view/content/room_details/upgrade_dialog/mod.rs +++ b/src/session/view/content/room_details/upgrade_dialog/mod.rs @@ -1,7 +1,6 @@ use std::cmp::Ordering; use adw::{prelude::*, subclass::prelude::*}; -use futures_channel::oneshot; use gettextrs::{gettext, ngettext}; use gtk::{gio, glib, pango}; use ruma::{ @@ -13,10 +12,10 @@ use tracing::error; mod room_version; use self::room_version::RoomVersion; -use crate::session::model::JoinRuleValue; +use crate::{session::model::JoinRuleValue, utils::OneshotNotifier}; mod imp { - use std::cell::{OnceCell, RefCell}; + use std::cell::OnceCell; use glib::subclass::InitializingObject; @@ -34,8 +33,8 @@ mod imp { #[template_child] creators_warning_label: TemplateChild, header_factory: OnceCell, - /// The sender for the response of the user. - sender: RefCell>>>, + /// The notifier for the response of the user. + notifier: OnceCell>>, } #[glib::object_subclass] @@ -67,18 +66,20 @@ mod imp { impl AdwDialogImpl for UpgradeDialog { fn closed(&self) { - let Some(sender) = self.sender.take() else { - return; - }; - - if sender.send(None).is_err() { - error!("Could not cancel upgrade: receiver was dropped"); + if let Some(notifier) = self.notifier.get() { + notifier.notify(); } } } #[gtk::template_callbacks] impl UpgradeDialog { + /// The notifier for the response of the user. + fn notifier(&self) -> &OneshotNotifier> { + self.notifier + .get_or_init(|| OneshotNotifier::new("UpgradeDialog")) + } + /// The header factory to separate stable from experimental versions. fn header_factory(&self) -> >k::SignalListItemFactory { self.header_factory.get_or_init(|| { @@ -140,13 +141,11 @@ mod imp { self.update_invite_only_warning(info); self.update_creators_warning(info); - let (sender, receiver) = oneshot::channel(); - self.sender.replace(Some(sender)); + let receiver = self.notifier().listen(); self.obj().present(Some(parent)); - receiver - .await - .expect("sender should not have been dropped prematurely") + + receiver.await } /// Update the room versions combo row with the given details. @@ -222,21 +221,13 @@ mod imp { /// Confirm the upgrade. #[template_callback] fn upgrade(&self) { - let Some(sender) = self.sender.take() else { - error!("Could not confirm upgrade: response was already sent"); - return; - }; - let room_version = self .version_combo .selected_item() .and_downcast::() .map(|v| v.id().clone()); - if sender.send(room_version).is_err() { - error!("Could not confirm upgrade: receiver was dropped"); - } - + self.notifier().notify_value(room_version); self.obj().close(); } diff --git a/src/session/view/content/room_history/message_toolbar/attachment_dialog.rs b/src/session/view/content/room_history/message_toolbar/attachment_dialog.rs index 0ca48d73..e9691c48 100644 --- a/src/session/view/content/room_history/message_toolbar/attachment_dialog.rs +++ b/src/session/view/content/room_history/message_toolbar/attachment_dialog.rs @@ -1,12 +1,10 @@ use adw::{prelude::*, subclass::prelude::*}; -use futures_channel::oneshot; use gtk::{gdk, gio, glib, glib::clone}; -use tracing::error; -use crate::{components::MediaContentViewer, spawn}; +use crate::{components::MediaContentViewer, spawn, utils::OneshotNotifier}; mod imp { - use std::cell::RefCell; + use std::cell::OnceCell; use super::*; @@ -21,7 +19,7 @@ mod imp { send_button: TemplateChild, #[template_child] media: TemplateChild, - sender: RefCell>>, + notifier: OnceCell>>, } #[glib::object_subclass] @@ -62,7 +60,7 @@ mod imp { impl AdwDialogImpl for AttachmentDialog { fn closed(&self) { - self.send_response(gtk::ResponseType::Cancel); + self.notifier().notify(); } } @@ -74,15 +72,10 @@ mod imp { self.grab_focus(); } - /// Sent the given response. - fn send_response(&self, response: gtk::ResponseType) { - if self - .sender - .take() - .is_some_and(|sender| sender.send(response).is_err()) - { - error!("Could not send attachment dialog response {response:?}"); - } + /// The notifier to send the response. + fn notifier(&self) -> &OneshotNotifier> { + self.notifier + .get_or_init(|| OneshotNotifier::new("AttachmentDialog")) } /// Set the image to preview. @@ -106,7 +99,7 @@ mod imp { /// Emit the signal that the user wants to send the attachment. #[template_callback] fn send(&self) { - self.send_response(gtk::ResponseType::Ok); + self.notifier().notify_value(Some(())); self.obj().close(); } @@ -115,12 +108,15 @@ mod imp { /// The response is [`gtk::ResponseType::Ok`] if the user clicked on /// send, otherwise it is [`gtk::ResponseType::Cancel`]. pub(super) async fn response_future(&self, parent: >k::Widget) -> gtk::ResponseType { - let (sender, receiver) = oneshot::channel(); - self.sender.replace(Some(sender)); + let receiver = self.notifier().listen(); self.obj().present(Some(parent)); - receiver.await.unwrap_or(gtk::ResponseType::Cancel) + if receiver.await.is_some() { + gtk::ResponseType::Ok + } else { + gtk::ResponseType::Cancel + } } } } diff --git a/src/utils/media/audio.rs b/src/utils/media/audio.rs index a2dea4f2..02d8e1a5 100644 --- a/src/utils/media/audio.rs +++ b/src/utils/media/audio.rs @@ -5,14 +5,13 @@ use std::{ time::Duration, }; -use futures_channel::oneshot; use gst::prelude::*; -use gtk::{gio, glib, prelude::*}; +use gtk::{gdk, gio, glib, prelude::*}; use matrix_sdk::attachment::BaseAudioInfo; use tracing::warn; use super::load_gstreamer_media_info; -use crate::utils::resample_slice; +use crate::utils::{OneshotNotifier, resample_slice}; /// Load information for the audio in the given file. pub(crate) async fn load_audio_info(file: &gio::File) -> BaseAudioInfo { @@ -63,8 +62,8 @@ pub(crate) async fn generate_waveform( } }; - let (sender, receiver) = oneshot::channel(); - let sender = Arc::new(Mutex::new(Some(sender))); + let notifier = OneshotNotifier::>::new("generate_waveform"); + let receiver = notifier.listen(); let samples = Arc::new(Mutex::new(vec![])); let bus = pipeline.bus().expect("GstPipeline should have a GstBus"); @@ -74,12 +73,12 @@ pub(crate) async fn generate_waveform( match message.view() { gst::MessageView::Eos(_) => { // We are done collecting the samples. - send_empty_signal(&sender); + notifier.notify(); glib::ControlFlow::Break } gst::MessageView::Error(error) => { warn!("Could not generate audio waveform: {error}"); - send_empty_signal(&sender); + notifier.notify(); glib::ControlFlow::Break } gst::MessageView::Element(element) => { @@ -116,7 +115,7 @@ pub(crate) async fn generate_waveform( match pipeline.set_state(gst::State::Playing) { Ok(_) => { - let _ = receiver.await; + receiver.await; } Err(error) => { warn!("Could not start GstPipeline for audio waveform: {error}"); @@ -138,23 +137,6 @@ pub(crate) async fn generate_waveform( Some(normalize_waveform(waveform)).filter(|waveform| !waveform.is_empty()) } -/// Try to send an empty signal through the given sender. -fn send_empty_signal(sender: &Mutex>>) { - let mut sender = match sender.lock() { - Ok(sender) => sender, - Err(error) => { - warn!("Failed to lock audio waveform signal mutex: {error}"); - return; - } - }; - - if let Some(sender) = sender.take() - && sender.send(()).is_err() - { - warn!("Failed to send audio waveform end through channel"); - } -} - /// Normalize the given waveform to have between 30 and 120 samples with a value /// between 0 and 1. /// diff --git a/src/utils/media/mod.rs b/src/utils/media/mod.rs index f7e0db7c..bcac4fab 100644 --- a/src/utils/media/mod.rs +++ b/src/utils/media/mod.rs @@ -1,12 +1,14 @@ //! Collection of methods for media. -use std::{cell::Cell, str::FromStr, sync::Mutex, time::Duration}; +use std::{str::FromStr, time::Duration}; use gettextrs::gettext; use gtk::{gio, glib, prelude::*}; use mime::Mime; use ruma::UInt; +use crate::utils::OneshotNotifier; + pub(crate) mod audio; pub(crate) mod image; pub(crate) mod video; @@ -100,21 +102,20 @@ async fn load_gstreamer_media_info(file: &gio::File) -> Option>; +use crate::utils::OneshotNotifier; /// Load information and try to generate a thumbnail for the video in the given /// file. @@ -63,10 +58,10 @@ async fn generate_video_thumbnail_and_blurhash( return None; }; - let (sender, receiver) = oneshot::channel(); - let sender = Arc::new(Mutex::new(Some(sender))); + let notifier = OneshotNotifier::new("generate_video_thumbnail_and_blurhash"); + let receiver = notifier.listen(); - let pipeline = match create_thumbnailer_pipeline(&file.uri(), sender.clone()) { + let pipeline = match create_thumbnailer_pipeline(&file.uri(), notifier.clone()) { Ok(pipeline) => pipeline, Err(error) => { warn!("Could not create pipeline for video thumbnail: {error}"); @@ -95,7 +90,7 @@ async fn generate_video_thumbnail_and_blurhash( // AsyncDone means that the pipeline has started now. if pipeline.set_state(gst::State::Playing).is_err() { warn!("Could not start pipeline for video thumbnail"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); return glib::ControlFlow::Break; } @@ -111,7 +106,7 @@ async fn generate_video_thumbnail_and_blurhash( } gst::MessageView::Error(error) => { warn!("Could not generate video thumbnail: {error}"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); glib::ControlFlow::Break } @@ -127,7 +122,7 @@ async fn generate_video_thumbnail_and_blurhash( let _ = pipeline.set_state(gst::State::Null); bus.set_flushing(true); - let texture = texture.ok()?.ok()?; + let texture = texture?; let thumbnail_blurhash = TextureThumbnailer(texture) .generate_thumbnail_and_blurhash(widget.scale_factor(), &renderer); @@ -141,7 +136,7 @@ async fn generate_video_thumbnail_and_blurhash( /// Create a pipeline to get a thumbnail of the first frame. fn create_thumbnailer_pipeline( uri: &str, - sender: Arc>>, + notifier: OneshotNotifier>, ) -> Result { // Create our pipeline from a pipeline description string. let pipeline = gst::parse::launch(&format!( @@ -177,7 +172,7 @@ fn create_thumbnailer_pipeline( let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let Some(buffer) = sample.buffer() else { warn!("Could not get buffer from appsink"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); return Err(gst::FlowError::Error); }; @@ -190,13 +185,13 @@ fn create_thumbnailer_pipeline( let Some(caps) = sample.caps() else { warn!("Got video sample without caps"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); return Err(gst::FlowError::Error); }; let Ok(info) = gst_video::VideoInfo::from_caps(caps) else { warn!("Could not parse video caps"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); return Err(gst::FlowError::Error); }; @@ -204,17 +199,17 @@ fn create_thumbnailer_pipeline( let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, &info) .map_err(|_| { warn!("Could not map video buffer readable"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); gst::FlowError::Error })?; if let Some(texture) = video_frame_to_texture(&frame) { - send_video_thumbnail_result(&sender, Ok(texture)); + notifier.notify_value(Some(texture)); Err(gst::FlowError::Eos) } else { warn!("Could not convert video frame to GdkTexture"); - send_video_thumbnail_result(&sender, Err(())); + notifier.notify(); Err(gst::FlowError::Error) } }) @@ -224,26 +219,6 @@ fn create_thumbnailer_pipeline( Ok(pipeline) } -/// Try to send the given video thumbnail result through the given sender. -fn send_video_thumbnail_result( - sender: &Mutex>, - result: Result, -) { - let mut sender = match sender.lock() { - Ok(sender) => sender, - Err(error) => { - warn!("Failed to lock video thumbnail mutex: {error}"); - return; - } - }; - - if let Some(sender) = sender.take() - && sender.send(result).is_err() - { - warn!("Failed to send video thumbnail result through channel"); - } -} - /// Convert the given video frame to a `GdkTexture`. fn video_frame_to_texture( frame: &gst_video::VideoFrameRef<&gst::BufferRef>, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c42146d5..1f329700 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,14 +8,17 @@ use std::{ ops::Deref, path::{Path, PathBuf}, rc::{Rc, Weak}, - sync::{Arc, LazyLock}, + sync::{Arc, LazyLock, Mutex}, }; use adw::prelude::*; +use futures_channel::oneshot; +use futures_util::future::BoxFuture; use gtk::{gio, glib}; use regex::Regex; use tempfile::NamedTempFile; use tokio::task::{AbortHandle, JoinHandle}; +use tracing::error; pub(crate) mod expression; mod expression_list_model; @@ -750,3 +753,103 @@ pub(crate) fn resample_slice(slice: &[f32], new_len: usize) -> Cow<'_, [f32]> { Cow::Owned(result) } + +/// A helper type to wait for a notification that can occur only one time. +/// +/// [`OneshotNotifier::listen()`] must be called to initialize it and get a +/// receiver. The receiver must then be `.await`ed and the future will resolve +/// when it is notified. +/// +/// The receiver will receive a signal the first time that +/// [`OneshotNotifier::notify_value()`] is called. Further calls to this +/// function will be noops until a new receiver is created.The value to return +/// must implement `Default`, as this is the value that will be sent to the +/// receiver when the notifier is dropped before a value is notified. +/// +/// This notifier can be cloned freely and moved between threads. +/// +/// It is also possible to share this notifier between tasks to make sure that a +/// single task is running at a time. If [`OneshotNotifier::listen()`] is called +/// while there is already a receiver waiting, it will be notified as if the +/// notifier was dropped. +#[derive(Debug, Clone)] +pub(crate) struct OneshotNotifier { + /// The context used to identify the notifier in logs. + context: &'static str, + /// The sender for the notification signal. + sender: Arc>>>, +} + +impl OneshotNotifier { + /// Get a new `OneshotNotifier` for the given context. + pub(crate) fn new(context: &'static str) -> Self { + Self { + sender: Default::default(), + context, + } + } +} + +impl OneshotNotifier +where + T: Default + Send + 'static, +{ + /// Initialize this `OneshotNotifier` and get a receiver. + pub(crate) fn listen(&self) -> OneshotNotifierReceiver { + let (sender, receiver) = oneshot::channel(); + + match self.sender.lock() { + Ok(mut guard) => *guard = Some(sender), + Err(error) => { + error!( + context = self.context, + "Failed to lock oneshot notifier: {error}" + ); + } + } + + OneshotNotifierReceiver(receiver) + } + + /// Notify the receiver with the given value, if any receiver is still + /// listening. + pub(crate) fn notify_value(&self, value: T) { + match self.sender.lock() { + Ok(mut guard) => { + if let Some(sender) = guard.take() { + let _ = sender.send(value); + } + } + Err(error) => { + error!( + context = self.context, + "Failed to lock oneshot notifier: {error}" + ); + } + } + } + + /// Notify the receiver with the default value, if any receiver is still + /// listening. + pub(crate) fn notify(&self) { + self.notify_value(T::default()); + } +} + +/// A notification receiver associated to a [`OneshotNotifier`]. +/// +/// This should be `.await`ed to wait for a notification. +#[derive(Debug)] +pub(crate) struct OneshotNotifierReceiver(oneshot::Receiver); + +impl IntoFuture for OneshotNotifierReceiver +where + T: Default + Send + 'static, +{ + type Output = T; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { self.0.await.unwrap_or_default() }) + } +}