diff --git a/po/POTFILES.in b/po/POTFILES.in index 5eacf937..1dd94e96 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -186,6 +186,6 @@ src/shortcuts.ui src/user_facing_error.rs src/utils/matrix/media_message.rs src/utils/matrix/mod.rs -src/utils/media/image.rs +src/utils/media/image/mod.rs src/utils/media/mod.rs src/window.ui diff --git a/src/components/avatar/editable.rs b/src/components/avatar/editable.rs index 5e0a0d1d..eab568f2 100644 --- a/src/components/avatar/editable.rs +++ b/src/components/avatar/editable.rs @@ -8,7 +8,7 @@ use gtk::{ prelude::*, CompositeTemplate, }; -use tracing::{debug, error, warn}; +use tracing::{debug, error}; use super::{AvatarData, AvatarImage}; use crate::{ @@ -16,7 +16,7 @@ use crate::{ toast, utils::{ expression, - media::image::{load_image, ImageDimensions, ImageError}, + media::image::{ImageDimensions, ImageError, IMAGE_QUEUE}, CountedRef, }, }; @@ -295,13 +295,10 @@ mod imp { /// Load the temporary paintable from the given file. pub(super) async fn set_temp_paintable_from_file(&self, file: gio::File) { - let paintable = load_image(file, Some(self.avatar_dimensions())) - .await - .map(Some) - .map_err(|error| { - warn!("Could not load avatar: {error}"); - error.into() - }); + let handle = IMAGE_QUEUE + .add_file_request(file, Some(self.avatar_dimensions())) + .await; + let paintable = handle.await.map(|image| Some(image.into())); self.set_temp_paintable(paintable); } diff --git a/src/components/avatar/image.rs b/src/components/avatar/image.rs index 75e5cef6..1b9a570a 100644 --- a/src/components/avatar/image.rs +++ b/src/components/avatar/image.rs @@ -3,13 +3,12 @@ use ruma::{ api::client::media::get_content_thumbnail::v3::Method, events::room::avatar::ImageInfo, OwnedMxcUri, }; -use tracing::{error, warn}; use crate::{ session::model::Session, spawn, utils::media::image::{ - load_image, ImageDimensions, ImageError, ImageSource, ThumbnailDownloader, + ImageDimensions, ImageError, ImageRequestPriority, ImageSource, ThumbnailDownloader, ThumbnailSettings, }, }; @@ -246,23 +245,10 @@ impl AvatarImage { prefer_thumbnail: true, }; - match downloader.download_to_file(&client, settings).await { - Ok(file) => { - let paintable = - load_image(file, Some(dimensions)) - .await - .map(Some) - .map_err(|error| { - warn!("Could not load avatar: {error}"); - error.into() - }); - - imp.set_paintable(paintable); - } - Err(error) => { - error!("Could not retrieve avatar: {error}"); - imp.set_paintable(Err(ImageError::Download)); - } - }; + // TODO: Change priority depending on size? + let result = downloader + .download(client, settings, ImageRequestPriority::Low) + .await; + imp.set_paintable(result.map(|image| Some(image.into()))); } } diff --git a/src/components/media/animated_image_paintable.rs b/src/components/media/animated_image_paintable.rs index 139dd94a..2a5ea713 100644 --- a/src/components/media/animated_image_paintable.rs +++ b/src/components/media/animated_image_paintable.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use glycin::{Frame, Image}; use gtk::{gdk, glib, glib::clone, graphene, prelude::*, subclass::prelude::*}; use tracing::error; @@ -14,12 +16,12 @@ mod imp { #[derive(Default)] pub struct AnimatedImagePaintable { - /// The source image. - image: OnceCell>>, + /// The image loader. + image_loader: OnceCell>>, /// The current frame that is displayed. - pub current_frame: RefCell>, + pub(super) current_frame: RefCell>>, /// The next frame of the animation, if any. - next_frame: RefCell>, + next_frame: RefCell>>, /// The source ID of the timeout to load the next frame, if any. timeout_source_id: RefCell>, /// The counted reference for the animation. @@ -43,7 +45,7 @@ mod imp { .borrow() .as_ref() .map(|f| f.height()) - .unwrap_or_else(|| self.image().info().height) as i32 + .unwrap_or_else(|| self.image_loader().info().height) as i32 } fn intrinsic_width(&self) -> i32 { @@ -51,7 +53,7 @@ mod imp { .borrow() .as_ref() .map(|f| f.width()) - .unwrap_or_else(|| self.image().info().width) as i32 + .unwrap_or_else(|| self.image_loader().info().width) as i32 } fn snapshot(&self, snapshot: &gdk::Snapshot, width: f64, height: f64) { @@ -85,14 +87,18 @@ mod imp { } impl AnimatedImagePaintable { - /// The source image. - fn image(&self) -> &Arc> { - self.image.get().unwrap() + /// The image loader. + fn image_loader(&self) -> &Arc> { + self.image_loader + .get() + .expect("image loader is initialized") } /// Initialize the image. - pub(super) fn init(&self, image: Image<'static>, first_frame: Frame) { - self.image.set(Arc::new(image)).unwrap(); + pub(super) fn init(&self, image_loader: Arc>, first_frame: Arc) { + self.image_loader + .set(image_loader) + .expect("image loader is uninitialized"); self.current_frame.replace(Some(first_frame)); self.update_animation(); @@ -179,7 +185,7 @@ mod imp { } async fn load_next_frame_inner(&self) { - let image = self.image().clone(); + let image = self.image_loader().clone(); let result = spawn_tokio!(async move { image.next_frame().await }) .await @@ -187,7 +193,7 @@ mod imp { match result { Ok(next_frame) => { - self.next_frame.replace(Some(next_frame)); + self.next_frame.replace(Some(next_frame.into())); // In case loading the frame took longer than the delay between frames. if self.timeout_source_id.borrow().is_none() { @@ -210,22 +216,23 @@ glib::wrapper! { } impl AnimatedImagePaintable { - /// Load an image from the given file. - pub fn new(image: Image<'static>, first_frame: Frame) -> Self { + /// Construct an `AnimatedImagePaintable` with the given loader and first + /// frame. + pub(crate) fn new(image_loader: Arc>, first_frame: Arc) -> Self { let obj = glib::Object::new::(); - obj.imp().init(image, first_frame); + obj.imp().init(image_loader, first_frame); obj } /// Get the current `GdkTexture` of this paintable, if any. - pub fn current_texture(&self) -> Option { + pub(crate) fn current_texture(&self) -> Option { Some(self.imp().current_frame.borrow().as_ref()?.texture()) } /// Get an animation ref. - pub fn animation_ref(&self) -> CountedRef { + pub(crate) fn animation_ref(&self) -> CountedRef { self.imp().animation_ref().clone() } } diff --git a/src/components/media/content_viewer.rs b/src/components/media/content_viewer.rs index 03d851de..429c4771 100644 --- a/src/components/media/content_viewer.rs +++ b/src/components/media/content_viewer.rs @@ -2,14 +2,13 @@ use adw::{prelude::*, subclass::prelude::*}; use geo_uri::GeoUri; use gettextrs::gettext; use gtk::{gdk, gio, glib, glib::clone, CompositeTemplate}; -use tracing::warn; use super::{AnimatedImagePaintable, AudioPlayer, LocationViewer}; use crate::{ components::ContextMenuBin, prelude::*, spawn, - utils::{media::image::load_image, CountedRef}, + utils::{media::image::IMAGE_QUEUE, CountedRef}, }; #[derive(Debug, Default, Clone, Copy)] @@ -243,15 +242,13 @@ impl MediaContentViewer { .unwrap_or_default(); match content_type { - ContentType::Image => match load_image(file, None).await { - Ok(texture) => { - self.view_image(&texture); + ContentType::Image => { + let handle = IMAGE_QUEUE.add_file_request(file, None).await; + if let Ok(image) = handle.await { + self.view_image(&gdk::Paintable::from(image)); return; } - Err(error) => { - warn!("Could not load image from file: {error}"); - } - }, + } ContentType::Audio => { let audio = if let Some(audio) = imp.viewer.child().and_downcast::() { audio diff --git a/src/session/view/content/room_details/history_viewer/visual_media_item.rs b/src/session/view/content/room_details/history_viewer/visual_media_item.rs index 5aaab794..b9435670 100644 --- a/src/session/view/content/room_details/history_viewer/visual_media_item.rs +++ b/src/session/view/content/room_details/history_viewer/visual_media_item.rs @@ -1,6 +1,5 @@ -use gtk::{glib, glib::clone, prelude::*, subclass::prelude::*, CompositeTemplate}; +use gtk::{gdk, glib, glib::clone, prelude::*, subclass::prelude::*, CompositeTemplate}; use ruma::api::client::media::get_content_thumbnail::v3::Method; -use tracing::warn; use super::{HistoryViewerEvent, VisualMediaHistoryViewer}; use crate::{ @@ -8,7 +7,7 @@ use crate::{ utils::{ add_activate_binding_action, matrix::VisualMediaMessage, - media::image::{load_image, ImageDimensions, ThumbnailSettings}, + media::image::{ImageDimensions, ImageRequestPriority, ThumbnailSettings}, }, }; @@ -174,22 +173,12 @@ mod imp { prefer_thumbnail: false, }; - let file = match media_message.thumbnail_tmp_file(&client, settings).await { - Ok(Some(file)) => file, - Ok(None) => return, - Err(error) => { - warn!("Could not retrieve media file: {error}"); - return; - } - }; - - match load_image(file, Some(dimensions)).await { - Ok(paintable) => { - self.picture.set_paintable(Some(&paintable)); - } - Err(error) => { - warn!("Image file not supported: {error}"); - } + if let Ok(Some(image)) = media_message + .thumbnail(client, settings, ImageRequestPriority::Default) + .await + { + self.picture + .set_paintable(Some(&gdk::Paintable::from(image))); } } } diff --git a/src/session/view/content/room_history/message_row/visual_media.rs b/src/session/view/content/room_history/message_row/visual_media.rs index 4f79674a..6bff7f67 100644 --- a/src/session/view/content/room_history/message_row/visual_media.rs +++ b/src/session/view/content/room_history/message_row/visual_media.rs @@ -17,7 +17,7 @@ use crate::{ spawn, utils::{ matrix::VisualMediaMessage, - media::image::{load_image, ImageDimensions, ImageError, ThumbnailSettings}, + media::image::{ImageDimensions, ImageRequestPriority, ThumbnailSettings}, CountedRef, LoadingState, }, }; @@ -340,13 +340,13 @@ impl MessageVisualMedia { #[weak(rename_to = obj)] self, async move { - obj.build_inner(media_message, &client).await; + obj.build_inner(media_message, client).await; } ) ); } - async fn build_inner(&self, media_message: VisualMediaMessage, client: &Client) { + async fn build_inner(&self, media_message: VisualMediaMessage, client: Client) { let imp = self.imp(); match &media_message { @@ -365,49 +365,38 @@ impl MessageVisualMedia { prefer_thumbnail: false, }; - let file = match media_message.thumbnail_tmp_file(client, settings).await { - Ok(Some(file)) => file, + let image = match media_message + .thumbnail(client, settings, ImageRequestPriority::Default) + .await + { + Ok(Some(image)) => image, Ok(None) => unreachable!("Image messages should always have a fallback"), Err(error) => { - warn!("Could not retrieve media file: {error}"); - imp.overlay_error - .set_tooltip_text(Some(&ImageError::Download.to_string())); + imp.overlay_error.set_tooltip_text(Some(&error.to_string())); imp.set_state(LoadingState::Error); return; } }; - match load_image(file, None).await { - Ok(paintable) => { - let child = - if let Some(child) = imp.media.child().and_downcast::() { - child - } else { - let child = gtk::Picture::new(); - imp.media.set_child(Some(&child)); - child - }; - child.set_paintable(Some(&paintable)); - - child.set_tooltip_text(Some(&filename)); - if is_sticker { - imp.media.remove_css_class("opaque-bg"); - } else { - imp.media.add_css_class("opaque-bg"); - } - } - Err(error) => { - warn!("Could not load image: {error}"); - let image_error = ImageError::from(error); - imp.overlay_error - .set_tooltip_text(Some(&image_error.to_string())); - imp.set_state(LoadingState::Error); - } + let child = if let Some(child) = imp.media.child().and_downcast::() { + child + } else { + let child = gtk::Picture::new(); + imp.media.set_child(Some(&child)); + child + }; + child.set_paintable(Some(&gdk::Paintable::from(image))); + + child.set_tooltip_text(Some(&filename)); + if is_sticker { + imp.media.remove_css_class("opaque-bg"); + } else { + imp.media.add_css_class("opaque-bg"); } } VisualMediaMessage::Video(_) => { - let file = match media_message.into_tmp_file(client).await { + let file = match media_message.into_tmp_file(&client).await { Ok(file) => file, Err(error) => { warn!("Could not retrieve media file: {error}"); diff --git a/src/utils/matrix/media_message.rs b/src/utils/matrix/media_message.rs index c585d4c8..08f60fdc 100644 --- a/src/utils/matrix/media_message.rs +++ b/src/utils/matrix/media_message.rs @@ -1,5 +1,5 @@ use gettextrs::gettext; -use gtk::{gio, glib, prelude::*}; +use gtk::{gio, prelude::*}; use matrix_sdk::Client; use ruma::{ events::{ @@ -17,7 +17,13 @@ use crate::{ prelude::*, toast, utils::{ - media::image::{ImageSource, ThumbnailDownloader, ThumbnailSettings}, + media::{ + image::{ + Image, ImageError, ImageRequestPriority, ImageSource, ThumbnailDownloader, + ThumbnailSettings, + }, + MediaFileError, + }, save_data_to_tmp_file, }, }; @@ -254,7 +260,7 @@ impl VisualMediaMessage { } /// Fetch a thumbnail of the media with the given client and thumbnail - /// settings and write it to a temporary file. + /// settings. /// /// This might not return a thumbnail at the requested size, depending on /// the message and the homeserver. @@ -263,12 +269,13 @@ impl VisualMediaMessage { /// could be downloaded. This only applies to video messages. /// /// Returns an error if something occurred while fetching the content or - /// saving the content to a file. - pub async fn thumbnail_tmp_file( + /// loading it. + pub async fn thumbnail( &self, - client: &Client, + client: Client, settings: ThumbnailSettings, - ) -> Result, MediaFileError> { + priority: ImageRequestPriority, + ) -> Result, ImageError> { let downloader = match &self { Self::Image(c) => { let image_info = c.info.as_deref(); @@ -316,9 +323,10 @@ impl VisualMediaMessage { } }; - let file = downloader.download_to_file(client, settings).await?; - - Ok(Some(file)) + downloader + .download(client, settings, priority) + .await + .map(Some) } /// Fetch the content of the media with the given client. @@ -372,13 +380,3 @@ impl From for MediaMessage { } } } - -/// All errors that can occur when downloading a media to a file. -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum MediaFileError { - /// An error occurred when downloading the media. - Sdk(#[from] matrix_sdk::Error), - /// An error occurred when writing the media to a file. - File(#[from] glib::Error), -} diff --git a/src/utils/matrix/mod.rs b/src/utils/matrix/mod.rs index aa39ade0..b6718f50 100644 --- a/src/utils/matrix/mod.rs +++ b/src/utils/matrix/mod.rs @@ -31,7 +31,7 @@ use thiserror::Error; mod media_message; -pub use self::media_message::{MediaFileError, MediaMessage, VisualMediaMessage}; +pub use self::media_message::{MediaMessage, VisualMediaMessage}; use crate::{ components::Pill, gettext_f, diff --git a/src/utils/media/image.rs b/src/utils/media/image/mod.rs similarity index 87% rename from src/utils/media/image.rs rename to src/utils/media/image/mod.rs index da15da96..6cc2bdac 100644 --- a/src/utils/media/image.rs +++ b/src/utils/media/image/mod.rs @@ -1,6 +1,6 @@ //! Collection of methods for images. -use std::{fmt, str::FromStr}; +use std::{fmt, str::FromStr, sync::Arc}; use gettextrs::gettext; use gtk::{gdk, gio, glib, prelude::*}; @@ -21,14 +21,13 @@ use ruma::{ }, OwnedMxcUri, }; -use tracing::warn; -use crate::{ - components::AnimatedImagePaintable, - spawn_tokio, - utils::{matrix::MediaFileError, save_data_to_tmp_file}, - DISABLE_GLYCIN_SANDBOX, -}; +mod queue; + +pub(crate) use queue::{ImageRequestPriority, IMAGE_QUEUE}; + +use super::MediaFileError; +use crate::{components::AnimatedImagePaintable, spawn_tokio, DISABLE_GLYCIN_SANDBOX}; /// The default width of a generated thumbnail. const THUMBNAIL_DEFAULT_WIDTH: u32 = 800; @@ -60,8 +59,8 @@ const THUMBNAIL_MAX_FILESIZE_THRESHOLD: u32 = 1024 * 1024; /// assume it's worth it to generate a thumbnail. const THUMBNAIL_DIMENSIONS_THRESHOLD: u32 = 200; -/// Get an image reader for the given file. -async fn image_reader(file: gio::File) -> Result, glycin::ErrorCtx> { +/// Get an image loader for the given file. +async fn image_loader(file: gio::File) -> Result, glycin::ErrorCtx> { let mut loader = glycin::Loader::new(file); if DISABLE_GLYCIN_SANDBOX { @@ -77,14 +76,14 @@ async fn image_reader(file: gio::File) -> Result, glycin: /// /// Set `request_dimensions` if the image will be shown at specific dimensions. /// To show the image at its natural size, set it to `None`. -pub async fn load_image( +async fn load_image( file: gio::File, request_dimensions: Option, -) -> Result { - let image = image_reader(file).await?; +) -> Result { + let image_loader = image_loader(file).await?; let frame_request = request_dimensions.map(|request| { - let image_info = image.info(); + let image_info = image_loader.info(); let original_dimensions = ImageDimensions { width: image_info.width, @@ -94,24 +93,44 @@ pub async fn load_image( original_dimensions.to_image_loader_request(request) }); - let (image, first_frame) = spawn_tokio!(async move { + spawn_tokio!(async move { let first_frame = if let Some(frame_request) = frame_request { - image.specific_frame(frame_request).await? + image_loader.specific_frame(frame_request).await? } else { - image.next_frame().await? + image_loader.next_frame().await? }; - Ok((image, first_frame)) + Ok(Image { + loader: image_loader.into(), + first_frame: first_frame.into(), + }) }) .await - .unwrap()?; + .expect("task was not aborted") +} - let paintable = if first_frame.delay().is_some() { - AnimatedImagePaintable::new(image, first_frame).upcast() - } else { - first_frame.texture().upcast() - }; +/// An image that was just loaded. +#[derive(Clone)] +pub struct Image { + /// The image loader. + loader: Arc>, + /// The first frame of the image. + first_frame: Arc, +} - Ok(paintable) +impl fmt::Debug for Image { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Image").finish_non_exhaustive() + } +} + +impl From for gdk::Paintable { + fn from(value: Image) -> Self { + if value.first_frame.delay().is_some() { + AnimatedImagePaintable::new(value.loader, value.first_frame).upcast() + } else { + value.first_frame.texture().upcast() + } + } } /// An API to load image information. @@ -130,8 +149,8 @@ impl ImageInfoLoader { async fn into_first_frame(self) -> Option { match self { Self::File(file) => { - let image_reader = image_reader(file).await.ok()?; - let handle = spawn_tokio!(async move { image_reader.next_frame().await }); + let image_loader = image_loader(file).await.ok()?; + let handle = spawn_tokio!(async move { image_loader.next_frame().await }); Some(Frame::Glycin(handle.await.unwrap().ok()?)) } Self::Texture(texture) => Some(Frame::Texture(gdk::TextureDownloader::new(&texture))), @@ -341,17 +360,17 @@ impl ImageDimensions { /// /// Returns `true` if either `width` or `height` is bigger than or equal to /// the given dimensions. - pub fn is_bigger_than(&self, other: ImageDimensions) -> bool { + fn is_bigger_than(&self, other: ImageDimensions) -> bool { self.width >= other.width || self.height >= other.height } /// Whether these dimensions should be resized to generate a thumbnail. - pub fn should_resize_for_thumbnail(&self, thumbnail_dimensions: ImageDimensions) -> bool { + fn should_resize_for_thumbnail(&self, thumbnail_dimensions: ImageDimensions) -> bool { self.is_bigger_than(thumbnail_dimensions.increase_by(THUMBNAIL_DIMENSIONS_THRESHOLD)) } /// Increase both these dimensions by the given value. - pub const fn increase_by(mut self, value: u32) -> Self { + const fn increase_by(mut self, value: u32) -> Self { self.width = self.width.saturating_add(value); self.height = self.height.saturating_add(value); self @@ -360,7 +379,7 @@ impl ImageDimensions { /// Compute the new dimensions for resizing to the requested dimensions /// while preserving the aspect ratio of these dimensions and respecting /// the given strategy. - pub fn resize(self, requested_dimensions: ImageDimensions, strategy: ResizeStrategy) -> Self { + fn resize(self, requested_dimensions: ImageDimensions, strategy: ResizeStrategy) -> Self { let w_ratio = self.width as f64 / requested_dimensions.width as f64; let h_ratio = self.height as f64 / requested_dimensions.height as f64; @@ -387,7 +406,7 @@ impl ImageDimensions { /// /// Returns `None` if these dimensions are smaller than the wanted /// dimensions. - pub fn resize_for_thumbnail(self) -> Option { + pub(super) fn resize_for_thumbnail(self) -> Option { let thumbnail_dimensions = THUMBNAIL_DEFAULT_DIMENSIONS; if !self.should_resize_for_thumbnail(thumbnail_dimensions) { @@ -399,7 +418,7 @@ impl ImageDimensions { /// Convert these dimensions to a request for the image loader with the /// requested dimensions. - pub fn to_image_loader_request( + fn to_image_loader_request( self, requested_dimensions: ImageDimensions, ) -> glycin::FrameRequest { @@ -502,14 +521,14 @@ impl<'a> ThumbnailDownloader<'a> { /// /// This might not return a thumbnail at the requested size, depending on /// the sources and the homeserver. - /// - /// Returns `Ok(None)` if no thumbnail could be retrieved. Returns an error - /// if something occurred while fetching the content. - pub async fn download_to_file( + pub async fn download( self, - client: &Client, + client: Client, settings: ThumbnailSettings, - ) -> Result { + priority: ImageRequestPriority, + ) -> Result { + let dimensions = settings.dimensions; + // First, select which source we are going to download from. let source = if let Some(alt) = self.alt { if !self.main.can_be_thumbnailed() @@ -527,42 +546,31 @@ impl<'a> ThumbnailDownloader<'a> { self.main }; - let data = if source.should_thumbnail(settings.prefer_thumbnail, settings.dimensions) { + if source.should_thumbnail(settings.prefer_thumbnail, settings.dimensions) { // Try to get a thumbnail. - let media = client.media(); let request = MediaRequest { source: source.source.to_common_media_source(), format: MediaFormat::Thumbnail(settings.into()), }; - let handle = spawn_tokio!(async move { media.get_media_content(&request, true).await }); - - match handle.await.unwrap() { - Ok(data) => Some(data), - Err(error) => { - warn!("Could not retrieve media thumbnail: {error}"); - None - } + let handle = IMAGE_QUEUE + .add_download_request(client.clone(), request, Some(dimensions), priority) + .await; + + if let Ok(image) = handle.await { + return Ok(image); } - } else { - None - }; + } // Fallback to downloading the full source. - let data = if let Some(data) = data { - data - } else { - let media = client.media(); - let request = MediaRequest { - source: source.source.to_common_media_source(), - format: MediaFormat::File, - }; - - spawn_tokio!(async move { media.get_media_content(&request, true).await }) - .await - .unwrap()? + let request = MediaRequest { + source: source.source.to_common_media_source(), + format: MediaFormat::File, }; + let handle = IMAGE_QUEUE + .add_download_request(client, request, Some(dimensions), priority) + .await; - Ok(save_data_to_tmp_file(&data)?) + handle.await } } @@ -780,10 +788,16 @@ pub enum ImageError { None, /// Could not download the image. Download, + /// Could not save the image to a temporary file. + File, /// The image uses an unsupported format. - Unsupported, + UnsupportedFormat, + /// An I/O error occurred when loading the image with glycin. + Io, /// An unexpected error occurred. Unknown, + /// The request for the image was aborted. + Aborted, } impl ImageError { @@ -796,20 +810,31 @@ impl ImageError { impl fmt::Display for ImageError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { - Self::None => unimplemented!(), + Self::None | Self::Aborted => unimplemented!(), Self::Download => gettext("Could not retrieve media"), - Self::Unsupported => gettext("Image format not supported"), - Self::Unknown => gettext("An unexpected error occurred"), + Self::UnsupportedFormat => gettext("Image format not supported"), + Self::File | Self::Io | Self::Unknown => gettext("An unexpected error occurred"), }; f.write_str(&s) } } +impl From for ImageError { + fn from(value: MediaFileError) -> Self { + match value { + MediaFileError::Sdk(_) => Self::Download, + MediaFileError::File(_) => Self::File, + } + } +} + impl From for ImageError { fn from(value: glycin::ErrorCtx) -> Self { if value.unsupported_format().is_some() { - Self::Unsupported + Self::UnsupportedFormat + } else if matches!(value.error(), glycin::Error::StdIoError { .. }) { + Self::Io } else { Self::Unknown } diff --git a/src/utils/media/image/queue.rs b/src/utils/media/image/queue.rs new file mode 100644 index 00000000..c6bd708a --- /dev/null +++ b/src/utils/media/image/queue.rs @@ -0,0 +1,752 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, + fmt, + future::IntoFuture, + path::PathBuf, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +use futures_util::future::BoxFuture; +use gtk::{gio, glib, prelude::*}; +use matrix_sdk::{ + media::{MediaRequest, UniqueKey}, + Client, +}; +use once_cell::sync::Lazy; +use tokio::{ + sync::{broadcast, Mutex as AsyncMutex}, + task::{spawn_blocking, AbortHandle}, +}; +use tracing::{debug, trace, warn}; + +use super::{load_image, Image, ImageDimensions, ImageError}; +use crate::{ + spawn_tokio, + utils::{media::MediaFileError, save_data_to_tmp_file}, +}; + +/// The default image request queue. +pub static IMAGE_QUEUE: Lazy = Lazy::new(ImageRequestQueue::new); + +/// The default limit of the [`ImageRequestQueue`], aka the maximum number of +/// concurrent image requests. +const DEFAULT_QUEUE_LIMIT: usize = 20; +/// The maximum number of retries for a single request. +const MAX_REQUEST_RETRY_COUNT: u8 = 2; +/// The time after which a request is considered to be stalled, 10 +/// seconds. +const STALLED_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// A queue for image requests. +/// +/// This implements the following features: +/// - Limit the number of concurrent requests, +/// - Prioritize requests according to their importance, +/// - Avoid duplicate requests, +/// - Watch requests that fail with I/O errors to: +/// - Reinsert them at the end of the queue to retry them later, +/// - Reduce the pool capacity temporarily to avoid more similar errors and +/// let the system recover. +/// - Watch requests that take too long to: +/// - Log them, +/// - Ignore them in the count of ongoing requests. +pub struct ImageRequestQueue { + inner: Arc>, +} + +struct ImageRequestQueueInner { + /// The current limit of the ongoing requests count. + /// + /// This may change if an error is encountered, to let the system recover. + limit: usize, + /// The image requests in the queue. + requests: HashMap, + /// The ongoing requests. + ongoing: HashSet, + /// The stalled requests. + stalled: HashSet, + /// The queue of requests with default priority. + queue_default: VecDeque, + /// The queue of requests with low priority. + queue_low: VecDeque, +} + +impl ImageRequestQueue { + /// Construct an empty `ImageRequestQueue` with the default settings. + fn new() -> Self { + Self { + inner: AsyncMutex::new(ImageRequestQueueInner { + limit: DEFAULT_QUEUE_LIMIT, + requests: Default::default(), + ongoing: Default::default(), + stalled: Default::default(), + queue_default: Default::default(), + queue_low: Default::default(), + }) + .into(), + } + } + + /// Add a request to download an image. + /// + /// If another request for the same image already exists, this will reuse + /// the same request. + pub async fn add_download_request( + &self, + client: Client, + settings: MediaRequest, + dimensions: Option, + priority: ImageRequestPriority, + ) -> ImageRequestHandle { + let inner = self.inner.clone(); + spawn_tokio!(async move { + inner + .lock() + .await + .add_download_request(client, settings, dimensions, priority) + }) + .await + .expect("task was not aborted") + } + + /// Add a request to load an image from a file. + /// + /// If another request for the same file already exists, this will reuse the + /// same request. + pub async fn add_file_request( + &self, + file: gio::File, + dimensions: Option, + ) -> ImageRequestHandle { + let inner = self.inner.clone(); + spawn_tokio!(async move { inner.lock().await.add_file_request(file, dimensions) }) + .await + .expect("task was not aborted") + } + + /// Mark the request with the given ID as stalled. + async fn mark_as_stalled(&self, request_id: ImageRequestId) { + self.inner.lock().await.mark_as_stalled(request_id); + } + + /// Retry the request with the given ID. + /// + /// If `lower_limit` is `true`, we will also lower the limit of the queue. + async fn retry_request(&self, request_id: ImageRequestId, lower_limit: bool) { + self.inner + .lock() + .await + .retry_request(request_id, lower_limit); + } + + /// Remove the request with the given ID. + async fn remove_request(&self, request_id: &ImageRequestId) { + self.inner.lock().await.remove_request(request_id); + } +} + +impl ImageRequestQueueInner { + /// Print the stats of the queue. + fn print_stats(&self) { + trace!( + "{} ongoing requests, {} total requests", + self.ongoing.len(), + self.requests.len() + ); + } + + /// Whether we have reache the current limit of concurrent requests. + fn is_limit_reached(&self) -> bool { + self.ongoing.len() >= self.limit + } + + /// Add the given request to the queue. + fn add_request(&mut self, request_id: ImageRequestId, request: ImageRequest) { + let is_limit_reached = self.is_limit_reached(); + if !is_limit_reached || request.priority == ImageRequestPriority::High { + // Spawn the request right away. + self.ongoing.insert(request_id.clone()); + request.spawn(); + trace!("Request {request_id} spawned"); + } else { + // Queue the request. + let queue = if request.priority == ImageRequestPriority::Default { + &mut self.queue_default + } else { + &mut self.queue_low + }; + + queue.push_back(request_id.clone()); + trace!("Request {request_id} queued"); + } + self.requests.insert(request_id, request); + self.print_stats(); + } + + /// Add a request to download an image. + /// + /// If another request for the same image already exists, this will reuse + /// the same request. + fn add_download_request( + &mut self, + client: Client, + settings: MediaRequest, + dimensions: Option, + priority: ImageRequestPriority, + ) -> ImageRequestHandle { + let data = DownloadRequestData { + client, + settings, + dimensions, + }; + let request_id = data.request_id(); + + // If the request already exists, use the existing one. + if let Some(request) = self.requests.get(&request_id) { + let result_receiver = request.result_sender.subscribe(); + trace!( + "Added receiver for {request_id}, new receiver count: {}", + request.result_sender.receiver_count() + ); + return ImageRequestHandle::new(result_receiver); + } + + // Build and add the request. + let (request, result_receiver) = ImageRequest::new(data, priority); + self.add_request(request_id.clone(), request); + + ImageRequestHandle::new(result_receiver) + } + + /// Add a request to load an image from a file. + /// + /// If another request for the same file already exists, this will reuse the + /// same request. + fn add_file_request( + &mut self, + file: gio::File, + dimensions: Option, + ) -> ImageRequestHandle { + let data = FileRequestData { file, dimensions }; + let request_id = data.request_id(); + + // If the request already exists, use the existing one. + if let Some(request) = self.requests.get(&request_id) { + let result_receiver = request.result_sender.subscribe(); + trace!( + "Added receiver for {request_id}, new receiver count: {}", + request.result_sender.receiver_count() + ); + return ImageRequestHandle::new(result_receiver); + } + + // Build and add the request. + // Always use high priority because file requests should always be for + // previewing a local image. + let (request, result_receiver) = ImageRequest::new(data, ImageRequestPriority::High); + + self.add_request(request_id.clone(), request); + + ImageRequestHandle::new(result_receiver) + } + + /// Mark the request with the given ID as stalled. + fn mark_as_stalled(&mut self, request_id: ImageRequestId) { + self.ongoing.remove(&request_id); + self.stalled.insert(request_id); + + self.spawn_next(); + } + + /// Retry the request with the given ID. + /// + /// If `lower_limit` is `true`, we will also lower the limit of the queue. + fn retry_request(&mut self, request_id: ImageRequestId, lower_limit: bool) { + self.ongoing.remove(&request_id); + + if lower_limit { + // Only one request at a time until the problem is likely fixed. + self.limit = 1; + } + + let is_limit_reached = self.is_limit_reached(); + + match self.requests.get_mut(&request_id) { + Some(request) => { + request.retries_count += 1; + + // For fairness, only re-spawn the request right away if there is no other + // request waiting with a priority higher or equal to this one. + let can_spawn_request = if request.priority == ImageRequestPriority::High { + true + } else { + !is_limit_reached + && self.queue_default.is_empty() + && (request.priority == ImageRequestPriority::Default + || self.queue_low.is_empty()) + }; + + if can_spawn_request { + // Re-spawn the request right away. + self.ongoing.insert(request_id.clone()); + request.spawn(); + trace!("Request {request_id} spawned"); + } else { + // Queue the request. + let queue = if request.priority == ImageRequestPriority::Default { + &mut self.queue_default + } else { + &mut self.queue_low + }; + + queue.push_back(request_id.clone()); + trace!("Request {request_id} queued"); + } + } + None => { + // This should not happen. + trace!("Could not find request {request_id} to retry"); + } + } + + self.spawn_next(); + } + + /// Remove the request with the given ID. + fn remove_request(&mut self, request_id: &ImageRequestId) { + self.ongoing.remove(request_id); + self.stalled.remove(request_id); + self.queue_default.retain(|id| id != request_id); + self.queue_low.retain(|id| id != request_id); + self.requests.remove(request_id); + trace!("Request {request_id} removed"); + + self.spawn_next(); + } + + /// Spawn as many requests as possible. + fn spawn_next(&mut self) { + while !self.is_limit_reached() { + let Some(request_id) = self + .queue_default + .pop_front() + .or_else(|| self.queue_low.pop_front()) + else { + // No request to spawn. + self.print_stats(); + return; + }; + let Some(request) = self.requests.get(&request_id) else { + // The queues and requests are out of sync, this should not happen. + trace!("Missing image request {request_id}"); + continue; + }; + + self.ongoing.insert(request_id.clone()); + request.spawn(); + trace!("Request {request_id} spawned"); + } + + // If there are no ongoing requests, restore the limit to its default value. + if self.ongoing.is_empty() { + self.limit = DEFAULT_QUEUE_LIMIT; + } + + self.print_stats(); + } +} + +/// A request for an image. +struct ImageRequest { + /// The data of the request. + data: ImageRequestData, + /// The priority of the request. + priority: ImageRequestPriority, + /// The sender of the channel to use to send the result. + result_sender: broadcast::Sender>, + /// The number of retries for this request. + retries_count: u8, + /// The handle for aborting the current task of this request. + task_handle: Arc>>, + /// The timeout source for marking this request as stalled. + stalled_timeout_source: Arc>>, +} + +impl ImageRequest { + /// Construct an image request with the given data and priority. + fn new( + data: impl Into, + priority: ImageRequestPriority, + ) -> (Self, broadcast::Receiver>) { + let (result_sender, result_receiver) = broadcast::channel(1); + ( + Self { + data: data.into(), + priority, + result_sender, + retries_count: 0, + task_handle: Default::default(), + stalled_timeout_source: Default::default(), + }, + result_receiver, + ) + } + + /// Whether we can retry a request with the given retries count and after + /// the given error. + fn can_retry(retries_count: u8, error: &ImageError) -> bool { + // Retry if we have not the max retry count && if it's a glycin error. + // We assume that the download requests have already been retried by the client. + retries_count < MAX_REQUEST_RETRY_COUNT && *error == ImageError::Unknown + } + + /// Spawn this request. + fn spawn(&self) { + let data = self.data.clone(); + let result_sender = self.result_sender.clone(); + let retries_count = self.retries_count; + let task_handle = self.task_handle.clone(); + let stalled_timeout_source = self.stalled_timeout_source.clone(); + + let abort_handle = spawn_tokio!(async move { + let request_id = data.request_id(); + let start_time = Instant::now(); + + let stalled_timeout_source_clone = stalled_timeout_source.clone(); + let request_id_clone = request_id.clone(); + let source = glib::timeout_add_once(STALLED_REQUEST_TIMEOUT, move || { + spawn_tokio!(async move { + // Drop the timeout source. + let _ = stalled_timeout_source_clone.lock().map(|mut s| s.take()); + + IMAGE_QUEUE.mark_as_stalled(request_id_clone.clone()).await; + debug!("Request {request_id_clone} is taking longer than {} seconds, it is now marked as stalled", STALLED_REQUEST_TIMEOUT.as_secs()); + }); + }); + if let Ok(Some(source)) = stalled_timeout_source.lock().map(|mut s| s.replace(source)) { + // This should not happen, but cancel the old timeout if we have one. + source.remove(); + }; + + let result = data.await; + let duration = Instant::now() - start_time; + trace!( + "Request {request_id} took {} ms, result: {result:?}", + duration.as_millis() + ); + + // Cancel the timeout. + if let Ok(Some(source)) = stalled_timeout_source.lock().map(|mut s| s.take()) { + source.remove(); + } + + // Now that we have the result, do not offer to abort the task anymore. + let _ = task_handle.lock().map(|mut s| s.take()); + + // If it is an error, maybe we can retry it. + if let Some(error) = result + .as_ref() + .err() + .filter(|error| Self::can_retry(retries_count, error)) + { + // Lower the limit of the queue if it is an I/O error, usually it means that glycin cannot spawn a sandbox. + let lower_limit = *error == ImageError::Io; + IMAGE_QUEUE + .retry_request(request_id.clone(), lower_limit) + .await; + return; + } + + // Send the result. + match result_sender.send(result) { + Ok(_) => trace!("Request result of {request_id} sent"), + Err(error) => trace!("Failed to send result of {request_id}: {error}"), + }; + IMAGE_QUEUE.remove_request(&request_id).await; + }).abort_handle(); + + if let Ok(Some(handle)) = self.task_handle.lock().map(|mut s| s.replace(abort_handle)) { + // This should not happen, but cancel the old task if we have one. + handle.abort(); + }; + } +} + +impl Drop for ImageRequest { + fn drop(&mut self) { + if let Ok(Some(source)) = self.stalled_timeout_source.lock().map(|mut s| s.take()) { + source.remove(); + } + if let Ok(Some(handle)) = self.task_handle.lock().map(|mut s| s.take()) { + handle.abort(); + + // Broadcast that the request was aborted. + let request_id = self.data.request_id(); + let result_sender = self.result_sender.clone(); + spawn_tokio!(async move { + match result_sender.send(Err(ImageError::Aborted)) { + Ok(_) => trace!("Request {request_id} was aborted"), + Err(error) => trace!("Failed to abort request {request_id}: {error}"), + }; + }); + } + } +} + +/// The data of a request to download an image. +#[derive(Clone)] +struct DownloadRequestData { + /// The Matrix client to use to make the request. + client: Client, + /// The settings of the request. + settings: MediaRequest, + /// The dimensions to request. + dimensions: Option, +} + +impl DownloadRequestData { + /// The ID of the image request with this data. + fn request_id(&self) -> ImageRequestId { + ImageRequestId::Download(self.settings.source.unique_key()) + } +} + +impl IntoFuture for DownloadRequestData { + type Output = Result; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let Self { + client, settings, .. + } = self; + + Box::pin(async move { + let media = client.media(); + let data = match media.get_media_content(&settings, true).await { + Ok(data) => data, + Err(error) => { + return Err(MediaFileError::from(error)); + } + }; + + let file = spawn_blocking(move || save_data_to_tmp_file(&data)) + .await + .expect("task was not aborted")?; + Ok(file) + }) + } +} + +/// The data of a request to load an image file into a paintable. +#[derive(Clone)] +struct FileRequestData { + /// The image file to load. + file: gio::File, + /// The dimensions to request. + dimensions: Option, +} + +impl FileRequestData { + /// The ID of the image request with this data. + fn request_id(&self) -> ImageRequestId { + ImageRequestId::File(self.file.path().expect("file has a path")) + } +} + +impl IntoFuture for FileRequestData { + type Output = Result; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let Self { file, dimensions } = self; + + Box::pin(async move { load_image(file, dimensions).await }) + } +} + +/// The data of an image request. +#[derive(Clone)] +enum ImageRequestData { + /// The data for a download request. + Download { + /// The data to download the image. + download_data: DownloadRequestData, + /// The data to load the image into a paintable, after it was + /// downloaded. + file_data: Option, + }, + /// The data for a file request. + File(FileRequestData), +} + +impl ImageRequestData { + /// The ID of the image request with this data. + fn request_id(&self) -> ImageRequestId { + match self { + ImageRequestData::Download { download_data, .. } => download_data.request_id(), + ImageRequestData::File(file_data) => file_data.request_id(), + } + } + + /// The data for the next request with this image request data. + fn into_next_request_data(self) -> DownloadOrFileRequestData { + match self { + Self::Download { + download_data, + file_data, + } => { + if let Some(file_data) = file_data { + file_data.into() + } else { + download_data.into() + } + } + Self::File(file_data) => file_data.into(), + } + } +} + +impl IntoFuture for ImageRequestData { + type Output = Result; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { + let file_data = match self.into_next_request_data() { + DownloadOrFileRequestData::Download(download_data) => { + let dimensions = download_data.dimensions; + + // Download the image to a file. + match download_data.await { + Ok(file) => FileRequestData { file, dimensions }, + Err(error) => { + warn!("Could not retrieve image: {error}"); + return Err(error.into()); + } + } + } + DownloadOrFileRequestData::File(file_data) => file_data, + }; + + // Load the image from the file. + match file_data.clone().await { + Ok(image) => Ok(image), + Err(error) => { + warn!("Could not load image from file: {error}"); + Err(error.into()) + } + } + }) + } +} + +impl From for ImageRequestData { + fn from(download_data: DownloadRequestData) -> Self { + Self::Download { + download_data, + file_data: None, + } + } +} + +impl From for ImageRequestData { + fn from(value: FileRequestData) -> Self { + Self::File(value) + } +} + +/// The data of a download request or a file request. +#[derive(Clone)] +enum DownloadOrFileRequestData { + /// The data for a download request. + Download(DownloadRequestData), + /// The data for a file request. + File(FileRequestData), +} + +impl From for DownloadOrFileRequestData { + fn from(download_data: DownloadRequestData) -> Self { + Self::Download(download_data) + } +} + +impl From for DownloadOrFileRequestData { + fn from(value: FileRequestData) -> Self { + Self::File(value) + } +} + +/// A unique identifier for an image request. +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +enum ImageRequestId { + /// The identifier for a download request. + Download(String), + /// The identifier for a file request. + File(PathBuf), +} + +impl fmt::Display for ImageRequestId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Download(id) => id.fmt(f), + Self::File(path) => path.to_string_lossy().fmt(f), + } + } +} + +/// The priority of an image request. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum ImageRequestPriority { + /// The highest priority. + /// + /// A request with this priority will be spawned right away and will not be + /// limited by the capacity of the pool. + /// + /// Should be used for images presented in the image viewer, the user avatar + /// in the account settings or the room avatar in the room details. + High, + /// The default priority. + /// + /// Should be used for images in messages in the room history, or in the + /// media history. + #[default] + Default, + /// The lowest priority. + /// + /// Should be used for avatars in the sidebar, the room history or the + /// members list. + Low, +} + +/// A handle for `await`ing an image request. +pub struct ImageRequestHandle { + receiver: broadcast::Receiver>, +} + +impl ImageRequestHandle { + /// Construct a new `ImageRequestHandle` with the given request ID. + fn new(receiver: broadcast::Receiver>) -> Self { + Self { receiver } + } +} + +impl IntoFuture for ImageRequestHandle { + type Output = Result; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let mut receiver = self.receiver; + Box::pin(async move { + let handle = spawn_tokio!(async move { receiver.recv().await }); + match handle.await.expect("task was not aborted") { + Ok(Ok(image)) => Ok(image), + Ok(err) => err, + Err(error) => { + warn!("Could not load image: {error}"); + Err(ImageError::Unknown) + } + } + }) + } +} diff --git a/src/utils/media/mod.rs b/src/utils/media/mod.rs index 1c3e7d39..f430383c 100644 --- a/src/utils/media/mod.rs +++ b/src/utils/media/mod.rs @@ -133,3 +133,13 @@ pub async fn load_audio_info(file: &gio::File) -> BaseAudioInfo { info.duration = media_info.duration().map(Into::into); info } + +/// All errors that can occur when downloading a media to a file. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum MediaFileError { + /// An error occurred when downloading the media. + Sdk(#[from] matrix_sdk::Error), + /// An error occurred when writing the media to a file. + File(#[from] glib::Error), +}