Browse Source

utils: Create OneshotNotifier

Allows to use a simpler API around oneshot channels at the cost of
ignoring potential developer errors where the sender or receiver is
dropped too soon.
fractal-13
Kévin Commaille 6 months ago
parent
commit
fc92385f8f
No known key found for this signature in database
GPG Key ID: F26F4BE20A08255B
  1. 59
      src/account_chooser_dialog/mod.rs
  2. 31
      src/components/dialogs/auth/mod.rs
  3. 22
      src/login/advanced_dialog.rs
  4. 17
      src/session/model/notifications/mod.rs
  5. 41
      src/session/view/content/room_details/upgrade_dialog/mod.rs
  6. 34
      src/session/view/content/room_history/message_toolbar/attachment_dialog.rs
  7. 32
      src/utils/media/audio.rs
  8. 17
      src/utils/media/mod.rs
  9. 53
      src/utils/media/video.rs
  10. 105
      src/utils/mod.rs

59
src/account_chooser_dialog/mod.rs

@ -1,15 +1,16 @@
use adw::{prelude::*, subclass::prelude::*};
use futures_channel::oneshot;
use gtk::glib;
use tracing::error;
mod account_row;
use self::account_row::AccountRow;
use crate::session_list::{SessionInfo, SessionList};
use crate::{
session_list::{SessionInfo, SessionList},
utils::OneshotNotifier,
};
mod imp {
use std::cell::RefCell;
use std::cell::OnceCell;
use glib::subclass::InitializingObject;
@ -24,7 +25,7 @@ mod imp {
/// The list of logged-in sessions.
#[property(get, set = Self::set_session_list, construct)]
pub session_list: glib::WeakRef<SessionList>,
pub sender: RefCell<Option<oneshot::Sender<Option<String>>>>,
notifier: OnceCell<OneshotNotifier<Option<String>>>,
}
#[glib::object_subclass]
@ -50,17 +51,19 @@ mod imp {
impl AdwDialogImpl for AccountChooserDialog {
fn closed(&self) {
if self
.sender
.take()
.is_some_and(|sender| sender.send(None).is_err())
{
error!("Could not send selected session");
if let Some(notifier) = self.notifier.get() {
notifier.notify();
}
}
}
impl AccountChooserDialog {
/// The notifier for sending the response.
pub(super) fn notifier(&self) -> &OneshotNotifier<Option<String>> {
self.notifier
.get_or_init(|| OneshotNotifier::new("AccountChooserDialog"))
}
/// Set the list of logged-in sessions.
fn set_session_list(&self, session_list: &SessionList) {
self.accounts.bind_model(Some(session_list), |session| {
@ -92,34 +95,28 @@ impl AccountChooserDialog {
/// Open this dialog to choose an account.
pub async fn choose_account(&self, parent: &impl IsA<gtk::Widget>) -> Option<String> {
let (sender, receiver) = oneshot::channel();
self.imp().sender.replace(Some(sender));
let receiver = self.imp().notifier().listen();
self.present(Some(parent));
receiver.await.ok().flatten()
receiver.await
}
/// Select the given row in the session list.
#[template_callback]
fn select_row(&self, row: &gtk::ListBoxRow) {
if let Some(sender) = self.imp().sender.take() {
let index = row
.index()
.try_into()
.expect("selected row should have an index");
let session_id = self
.session_list()
.and_then(|l| l.item(index))
.and_downcast::<SessionInfo>()
.map(|s| s.session_id());
if sender.send(session_id).is_err() {
error!("Could not send selected session");
}
}
let index = row
.index()
.try_into()
.expect("selected row should have an index");
let session_id = self
.session_list()
.and_then(|l| l.item(index))
.and_downcast::<SessionInfo>()
.map(|s| s.session_id());
self.imp().notifier().notify_value(session_id);
self.close();
}
}

31
src/components/dialogs/auth/mod.rs

@ -1,7 +1,6 @@
use std::{fmt::Debug, future::Future};
use adw::{prelude::*, subclass::prelude::*};
use futures_channel::oneshot;
use gettextrs::gettext;
use gtk::{glib, glib::clone};
use matrix_sdk::{Error, encryption::CrossSigningResetAuthType};
@ -22,11 +21,14 @@ mod in_browser_page;
mod password_page;
use self::{in_browser_page::AuthDialogInBrowserPage, password_page::AuthDialogPasswordPage};
use crate::{components::ToastableDialog, prelude::*, session::model::Session, spawn_tokio, toast};
use crate::{
components::ToastableDialog, prelude::*, session::model::Session, spawn_tokio, toast,
utils::OneshotNotifier,
};
mod imp {
use std::{
cell::{Cell, RefCell},
cell::{Cell, OnceCell, RefCell},
rc::Rc,
sync::Arc,
};
@ -53,8 +55,8 @@ mod imp {
state: RefCell<Option<AuthState>>,
/// The page for the current stage.
current_page: RefCell<Option<gtk::Widget>>,
/// The sender to get the signal to perform the current stage.
sender: RefCell<Option<oneshot::Sender<()>>>,
/// The notifier to signal to perform the current stage.
notifier: OnceCell<OneshotNotifier<Option<()>>>,
/// The handle to abort the current future.
abort_handle: RefCell<Option<tokio::task::AbortHandle>>,
}
@ -69,9 +71,7 @@ mod imp {
Self::bind_template(klass);
klass.install_action("auth-dialog.continue", None, |obj, _, _| {
if let Some(sender) = obj.imp().sender.take() {
let _ = sender.send(());
}
obj.imp().notifier().notify_value(Some(()));
});
klass.install_action("auth-dialog.close", None, |obj, _, _| {
@ -98,6 +98,12 @@ mod imp {
impl ToastableDialogImpl for AuthDialog {}
impl AuthDialog {
/// The notifier to signal to perform the current stage.
fn notifier(&self) -> &OneshotNotifier<Option<()>> {
self.notifier
.get_or_init(|| OneshotNotifier::new("AuthDialog"))
}
/// Authenticate the user to the server via an interactive
/// authentication flow.
///
@ -270,8 +276,7 @@ mod imp {
return self.current_stage_auth_data();
}
let (sender, receiver) = futures_channel::oneshot::channel();
self.sender.replace(Some(sender));
let receiver = self.notifier().listen();
// If the stage didn't succeed, we get the same state again.
let is_same_state = self
@ -288,13 +293,11 @@ mod imp {
self.state.replace(Some(next_state));
}
if receiver.await.is_err() {
if receiver.await.is_none() {
// The sender was dropped, which means that the user closed the dialog.
return Err(AuthError::UserCancelled);
}
self.sender.take();
self.current_stage_auth_data()
}
@ -483,7 +486,7 @@ mod imp {
abort_handle.abort();
}
self.sender.take();
self.notifier().notify();
}
}
}

22
src/login/advanced_dialog.rs

@ -1,9 +1,10 @@
use adw::{prelude::*, subclass::prelude::*};
use futures_channel::oneshot;
use gtk::glib;
use crate::utils::OneshotNotifier;
mod imp {
use std::cell::{Cell, RefCell};
use std::cell::{Cell, OnceCell};
use glib::subclass::InitializingObject;
@ -16,7 +17,7 @@ mod imp {
/// Whether auto-discovery is enabled.
#[property(get, set, default = true)]
autodiscovery: Cell<bool>,
sender: RefCell<Option<oneshot::Sender<()>>>,
notifier: OnceCell<OneshotNotifier>,
}
#[glib::object_subclass]
@ -41,8 +42,8 @@ mod imp {
impl AdwDialogImpl for LoginAdvancedDialog {
fn closed(&self) {
if let Some(sender) = self.sender.take() {
sender.send(()).expect("receiver was not dropped");
if let Some(notifier) = self.notifier.get() {
notifier.notify();
}
}
}
@ -50,15 +51,20 @@ mod imp {
impl PreferencesDialogImpl for LoginAdvancedDialog {}
impl LoginAdvancedDialog {
/// Get the notifier for the close signal.
fn notifier(&self) -> &OneshotNotifier {
self.notifier
.get_or_init(|| OneshotNotifier::new("LoginAdvancedDialog"))
}
/// Present this dialog.
///
/// Returns when the dialog is closed.
pub(super) async fn run_future(&self, parent: &gtk::Widget) {
let (sender, receiver) = oneshot::channel();
self.sender.replace(Some(sender));
let receiver = self.notifier().listen();
self.obj().present(Some(parent));
receiver.await.expect("sender was not dropped");
receiver.await;
}
}
}

17
src/session/model/notifications/mod.rs

@ -1,4 +1,4 @@
use std::{borrow::Cow, cell::Cell, time::Duration};
use std::{borrow::Cow, time::Duration};
use gettextrs::gettext;
use gtk::{gdk, gio, glib, prelude::*, subclass::prelude::*};
@ -26,8 +26,9 @@ use crate::{
intent::SessionIntent,
prelude::*,
spawn_tokio,
utils::matrix::{
AnySyncOrStrippedTimelineEvent, MatrixEventIdUri, MatrixIdUri, MatrixRoomIdUri,
utils::{
OneshotNotifier,
matrix::{AnySyncOrStrippedTimelineEvent, MatrixEventIdUri, MatrixIdUri, MatrixRoomIdUri},
},
};
@ -193,16 +194,14 @@ impl Notifications {
if !room.is_room_info_initialized() {
// Wait for the room to finish initializing, otherwise we will not have the
// display name or the avatar.
let (sender, receiver) = futures_channel::oneshot::channel();
let notifier = OneshotNotifier::<()>::new("Notifications::show_push");
let receiver = notifier.listen();
let sender_cell = Cell::new(Some(sender));
let handler_id = room.connect_is_room_info_initialized_notify(move |_| {
if let Some(sender) = sender_cell.take() {
let _ = sender.send(());
}
notifier.notify();
});
let _ = receiver.await;
receiver.await;
room.disconnect(handler_id);
}

41
src/session/view/content/room_details/upgrade_dialog/mod.rs

@ -1,7 +1,6 @@
use std::cmp::Ordering;
use adw::{prelude::*, subclass::prelude::*};
use futures_channel::oneshot;
use gettextrs::{gettext, ngettext};
use gtk::{gio, glib, pango};
use ruma::{
@ -13,10 +12,10 @@ use tracing::error;
mod room_version;
use self::room_version::RoomVersion;
use crate::session::model::JoinRuleValue;
use crate::{session::model::JoinRuleValue, utils::OneshotNotifier};
mod imp {
use std::cell::{OnceCell, RefCell};
use std::cell::OnceCell;
use glib::subclass::InitializingObject;
@ -34,8 +33,8 @@ mod imp {
#[template_child]
creators_warning_label: TemplateChild<gtk::Label>,
header_factory: OnceCell<gtk::SignalListItemFactory>,
/// The sender for the response of the user.
sender: RefCell<Option<oneshot::Sender<Option<RoomVersionId>>>>,
/// The notifier for the response of the user.
notifier: OnceCell<OneshotNotifier<Option<RoomVersionId>>>,
}
#[glib::object_subclass]
@ -67,18 +66,20 @@ mod imp {
impl AdwDialogImpl for UpgradeDialog {
fn closed(&self) {
let Some(sender) = self.sender.take() else {
return;
};
if sender.send(None).is_err() {
error!("Could not cancel upgrade: receiver was dropped");
if let Some(notifier) = self.notifier.get() {
notifier.notify();
}
}
}
#[gtk::template_callbacks]
impl UpgradeDialog {
/// The notifier for the response of the user.
fn notifier(&self) -> &OneshotNotifier<Option<RoomVersionId>> {
self.notifier
.get_or_init(|| OneshotNotifier::new("UpgradeDialog"))
}
/// The header factory to separate stable from experimental versions.
fn header_factory(&self) -> &gtk::SignalListItemFactory {
self.header_factory.get_or_init(|| {
@ -140,13 +141,11 @@ mod imp {
self.update_invite_only_warning(info);
self.update_creators_warning(info);
let (sender, receiver) = oneshot::channel();
self.sender.replace(Some(sender));
let receiver = self.notifier().listen();
self.obj().present(Some(parent));
receiver
.await
.expect("sender should not have been dropped prematurely")
receiver.await
}
/// Update the room versions combo row with the given details.
@ -222,21 +221,13 @@ mod imp {
/// Confirm the upgrade.
#[template_callback]
fn upgrade(&self) {
let Some(sender) = self.sender.take() else {
error!("Could not confirm upgrade: response was already sent");
return;
};
let room_version = self
.version_combo
.selected_item()
.and_downcast::<RoomVersion>()
.map(|v| v.id().clone());
if sender.send(room_version).is_err() {
error!("Could not confirm upgrade: receiver was dropped");
}
self.notifier().notify_value(room_version);
self.obj().close();
}

34
src/session/view/content/room_history/message_toolbar/attachment_dialog.rs

@ -1,12 +1,10 @@
use adw::{prelude::*, subclass::prelude::*};
use futures_channel::oneshot;
use gtk::{gdk, gio, glib, glib::clone};
use tracing::error;
use crate::{components::MediaContentViewer, spawn};
use crate::{components::MediaContentViewer, spawn, utils::OneshotNotifier};
mod imp {
use std::cell::RefCell;
use std::cell::OnceCell;
use super::*;
@ -21,7 +19,7 @@ mod imp {
send_button: TemplateChild<gtk::Button>,
#[template_child]
media: TemplateChild<MediaContentViewer>,
sender: RefCell<Option<oneshot::Sender<gtk::ResponseType>>>,
notifier: OnceCell<OneshotNotifier<Option<()>>>,
}
#[glib::object_subclass]
@ -62,7 +60,7 @@ mod imp {
impl AdwDialogImpl for AttachmentDialog {
fn closed(&self) {
self.send_response(gtk::ResponseType::Cancel);
self.notifier().notify();
}
}
@ -74,15 +72,10 @@ mod imp {
self.grab_focus();
}
/// Sent the given response.
fn send_response(&self, response: gtk::ResponseType) {
if self
.sender
.take()
.is_some_and(|sender| sender.send(response).is_err())
{
error!("Could not send attachment dialog response {response:?}");
}
/// The notifier to send the response.
fn notifier(&self) -> &OneshotNotifier<Option<()>> {
self.notifier
.get_or_init(|| OneshotNotifier::new("AttachmentDialog"))
}
/// Set the image to preview.
@ -106,7 +99,7 @@ mod imp {
/// Emit the signal that the user wants to send the attachment.
#[template_callback]
fn send(&self) {
self.send_response(gtk::ResponseType::Ok);
self.notifier().notify_value(Some(()));
self.obj().close();
}
@ -115,12 +108,15 @@ mod imp {
/// The response is [`gtk::ResponseType::Ok`] if the user clicked on
/// send, otherwise it is [`gtk::ResponseType::Cancel`].
pub(super) async fn response_future(&self, parent: &gtk::Widget) -> gtk::ResponseType {
let (sender, receiver) = oneshot::channel();
self.sender.replace(Some(sender));
let receiver = self.notifier().listen();
self.obj().present(Some(parent));
receiver.await.unwrap_or(gtk::ResponseType::Cancel)
if receiver.await.is_some() {
gtk::ResponseType::Ok
} else {
gtk::ResponseType::Cancel
}
}
}
}

32
src/utils/media/audio.rs

@ -5,14 +5,13 @@ use std::{
time::Duration,
};
use futures_channel::oneshot;
use gst::prelude::*;
use gtk::{gio, glib, prelude::*};
use gtk::{gdk, gio, glib, prelude::*};
use matrix_sdk::attachment::BaseAudioInfo;
use tracing::warn;
use super::load_gstreamer_media_info;
use crate::utils::resample_slice;
use crate::utils::{OneshotNotifier, resample_slice};
/// Load information for the audio in the given file.
pub(crate) async fn load_audio_info(file: &gio::File) -> BaseAudioInfo {
@ -63,8 +62,8 @@ pub(crate) async fn generate_waveform(
}
};
let (sender, receiver) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(sender)));
let notifier = OneshotNotifier::<Option<gdk::Texture>>::new("generate_waveform");
let receiver = notifier.listen();
let samples = Arc::new(Mutex::new(vec![]));
let bus = pipeline.bus().expect("GstPipeline should have a GstBus");
@ -74,12 +73,12 @@ pub(crate) async fn generate_waveform(
match message.view() {
gst::MessageView::Eos(_) => {
// We are done collecting the samples.
send_empty_signal(&sender);
notifier.notify();
glib::ControlFlow::Break
}
gst::MessageView::Error(error) => {
warn!("Could not generate audio waveform: {error}");
send_empty_signal(&sender);
notifier.notify();
glib::ControlFlow::Break
}
gst::MessageView::Element(element) => {
@ -116,7 +115,7 @@ pub(crate) async fn generate_waveform(
match pipeline.set_state(gst::State::Playing) {
Ok(_) => {
let _ = receiver.await;
receiver.await;
}
Err(error) => {
warn!("Could not start GstPipeline for audio waveform: {error}");
@ -138,23 +137,6 @@ pub(crate) async fn generate_waveform(
Some(normalize_waveform(waveform)).filter(|waveform| !waveform.is_empty())
}
/// Try to send an empty signal through the given sender.
fn send_empty_signal(sender: &Mutex<Option<oneshot::Sender<()>>>) {
let mut sender = match sender.lock() {
Ok(sender) => sender,
Err(error) => {
warn!("Failed to lock audio waveform signal mutex: {error}");
return;
}
};
if let Some(sender) = sender.take()
&& sender.send(()).is_err()
{
warn!("Failed to send audio waveform end through channel");
}
}
/// Normalize the given waveform to have between 30 and 120 samples with a value
/// between 0 and 1.
///

17
src/utils/media/mod.rs

@ -1,12 +1,14 @@
//! Collection of methods for media.
use std::{cell::Cell, str::FromStr, sync::Mutex, time::Duration};
use std::{str::FromStr, time::Duration};
use gettextrs::gettext;
use gtk::{gio, glib, prelude::*};
use mime::Mime;
use ruma::UInt;
use crate::utils::OneshotNotifier;
pub(crate) mod audio;
pub(crate) mod image;
pub(crate) mod video;
@ -100,21 +102,20 @@ async fn load_gstreamer_media_info(file: &gio::File) -> Option<gst_pbutils::Disc
let timeout = gst::ClockTime::from_seconds(15);
let discoverer = gst_pbutils::Discoverer::new(timeout).ok()?;
let (sender, receiver) = futures_channel::oneshot::channel();
let sender = Mutex::new(Cell::new(Some(sender)));
let notifier = OneshotNotifier::new("load_gstreamer_media_info");
let receiver = notifier.listen();
discoverer.connect_discovered(move |_, info, _| {
if let Some(sender) = sender.lock().unwrap().take() {
sender.send(info.clone()).unwrap();
}
notifier.notify_value(Some(info.clone()));
});
discoverer.start();
discoverer.discover_uri_async(&file.uri()).ok()?;
let media_info = receiver.await.unwrap();
let media_info = receiver.await;
discoverer.stop();
Some(media_info)
media_info
}
/// All errors that can occur when downloading a media to a file.

53
src/utils/media/video.rs

@ -1,8 +1,5 @@
//! Collection of methods for videos.
use std::sync::{Arc, Mutex};
use futures_channel::oneshot;
use gst::prelude::*;
use gst_video::prelude::*;
use gtk::{gdk, gio, glib, glib::clone, prelude::*};
@ -13,9 +10,7 @@ use super::{
image::{Blurhash, TextureThumbnailer},
load_gstreamer_media_info,
};
/// A channel sender to send the result of a video thumbnail.
type ThumbnailResultSender = oneshot::Sender<Result<gdk::Texture, ()>>;
use crate::utils::OneshotNotifier;
/// Load information and try to generate a thumbnail for the video in the given
/// file.
@ -63,10 +58,10 @@ async fn generate_video_thumbnail_and_blurhash(
return None;
};
let (sender, receiver) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(sender)));
let notifier = OneshotNotifier::new("generate_video_thumbnail_and_blurhash");
let receiver = notifier.listen();
let pipeline = match create_thumbnailer_pipeline(&file.uri(), sender.clone()) {
let pipeline = match create_thumbnailer_pipeline(&file.uri(), notifier.clone()) {
Ok(pipeline) => pipeline,
Err(error) => {
warn!("Could not create pipeline for video thumbnail: {error}");
@ -95,7 +90,7 @@ async fn generate_video_thumbnail_and_blurhash(
// AsyncDone means that the pipeline has started now.
if pipeline.set_state(gst::State::Playing).is_err() {
warn!("Could not start pipeline for video thumbnail");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
return glib::ControlFlow::Break;
}
@ -111,7 +106,7 @@ async fn generate_video_thumbnail_and_blurhash(
}
gst::MessageView::Error(error) => {
warn!("Could not generate video thumbnail: {error}");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
glib::ControlFlow::Break
}
@ -127,7 +122,7 @@ async fn generate_video_thumbnail_and_blurhash(
let _ = pipeline.set_state(gst::State::Null);
bus.set_flushing(true);
let texture = texture.ok()?.ok()?;
let texture = texture?;
let thumbnail_blurhash = TextureThumbnailer(texture)
.generate_thumbnail_and_blurhash(widget.scale_factor(), &renderer);
@ -141,7 +136,7 @@ async fn generate_video_thumbnail_and_blurhash(
/// Create a pipeline to get a thumbnail of the first frame.
fn create_thumbnailer_pipeline(
uri: &str,
sender: Arc<Mutex<Option<ThumbnailResultSender>>>,
notifier: OneshotNotifier<Option<gdk::Texture>>,
) -> Result<gst::Pipeline, glib::Error> {
// Create our pipeline from a pipeline description string.
let pipeline = gst::parse::launch(&format!(
@ -177,7 +172,7 @@ fn create_thumbnailer_pipeline(
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let Some(buffer) = sample.buffer() else {
warn!("Could not get buffer from appsink");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
return Err(gst::FlowError::Error);
};
@ -190,13 +185,13 @@ fn create_thumbnailer_pipeline(
let Some(caps) = sample.caps() else {
warn!("Got video sample without caps");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
return Err(gst::FlowError::Error);
};
let Ok(info) = gst_video::VideoInfo::from_caps(caps) else {
warn!("Could not parse video caps");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
return Err(gst::FlowError::Error);
};
@ -204,17 +199,17 @@ fn create_thumbnailer_pipeline(
let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, &info)
.map_err(|_| {
warn!("Could not map video buffer readable");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
gst::FlowError::Error
})?;
if let Some(texture) = video_frame_to_texture(&frame) {
send_video_thumbnail_result(&sender, Ok(texture));
notifier.notify_value(Some(texture));
Err(gst::FlowError::Eos)
} else {
warn!("Could not convert video frame to GdkTexture");
send_video_thumbnail_result(&sender, Err(()));
notifier.notify();
Err(gst::FlowError::Error)
}
})
@ -224,26 +219,6 @@ fn create_thumbnailer_pipeline(
Ok(pipeline)
}
/// Try to send the given video thumbnail result through the given sender.
fn send_video_thumbnail_result(
sender: &Mutex<Option<ThumbnailResultSender>>,
result: Result<gdk::Texture, ()>,
) {
let mut sender = match sender.lock() {
Ok(sender) => sender,
Err(error) => {
warn!("Failed to lock video thumbnail mutex: {error}");
return;
}
};
if let Some(sender) = sender.take()
&& sender.send(result).is_err()
{
warn!("Failed to send video thumbnail result through channel");
}
}
/// Convert the given video frame to a `GdkTexture`.
fn video_frame_to_texture(
frame: &gst_video::VideoFrameRef<&gst::BufferRef>,

105
src/utils/mod.rs

@ -8,14 +8,17 @@ use std::{
ops::Deref,
path::{Path, PathBuf},
rc::{Rc, Weak},
sync::{Arc, LazyLock},
sync::{Arc, LazyLock, Mutex},
};
use adw::prelude::*;
use futures_channel::oneshot;
use futures_util::future::BoxFuture;
use gtk::{gio, glib};
use regex::Regex;
use tempfile::NamedTempFile;
use tokio::task::{AbortHandle, JoinHandle};
use tracing::error;
pub(crate) mod expression;
mod expression_list_model;
@ -750,3 +753,103 @@ pub(crate) fn resample_slice(slice: &[f32], new_len: usize) -> Cow<'_, [f32]> {
Cow::Owned(result)
}
/// A helper type to wait for a notification that can occur only one time.
///
/// [`OneshotNotifier::listen()`] must be called to initialize it and get a
/// receiver. The receiver must then be `.await`ed and the future will resolve
/// when it is notified.
///
/// The receiver will receive a signal the first time that
/// [`OneshotNotifier::notify_value()`] is called. Further calls to this
/// function will be noops until a new receiver is created.The value to return
/// must implement `Default`, as this is the value that will be sent to the
/// receiver when the notifier is dropped before a value is notified.
///
/// This notifier can be cloned freely and moved between threads.
///
/// It is also possible to share this notifier between tasks to make sure that a
/// single task is running at a time. If [`OneshotNotifier::listen()`] is called
/// while there is already a receiver waiting, it will be notified as if the
/// notifier was dropped.
#[derive(Debug, Clone)]
pub(crate) struct OneshotNotifier<T = ()> {
/// The context used to identify the notifier in logs.
context: &'static str,
/// The sender for the notification signal.
sender: Arc<Mutex<Option<oneshot::Sender<T>>>>,
}
impl<T> OneshotNotifier<T> {
/// Get a new `OneshotNotifier` for the given context.
pub(crate) fn new(context: &'static str) -> Self {
Self {
sender: Default::default(),
context,
}
}
}
impl<T> OneshotNotifier<T>
where
T: Default + Send + 'static,
{
/// Initialize this `OneshotNotifier` and get a receiver.
pub(crate) fn listen(&self) -> OneshotNotifierReceiver<T> {
let (sender, receiver) = oneshot::channel();
match self.sender.lock() {
Ok(mut guard) => *guard = Some(sender),
Err(error) => {
error!(
context = self.context,
"Failed to lock oneshot notifier: {error}"
);
}
}
OneshotNotifierReceiver(receiver)
}
/// Notify the receiver with the given value, if any receiver is still
/// listening.
pub(crate) fn notify_value(&self, value: T) {
match self.sender.lock() {
Ok(mut guard) => {
if let Some(sender) = guard.take() {
let _ = sender.send(value);
}
}
Err(error) => {
error!(
context = self.context,
"Failed to lock oneshot notifier: {error}"
);
}
}
}
/// Notify the receiver with the default value, if any receiver is still
/// listening.
pub(crate) fn notify(&self) {
self.notify_value(T::default());
}
}
/// A notification receiver associated to a [`OneshotNotifier`].
///
/// This should be `.await`ed to wait for a notification.
#[derive(Debug)]
pub(crate) struct OneshotNotifierReceiver<T = ()>(oneshot::Receiver<T>);
impl<T> IntoFuture for OneshotNotifierReceiver<T>
where
T: Default + Send + 'static,
{
type Output = T;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { self.0.await.unwrap_or_default() })
}
}

Loading…
Cancel
Save