Browse Source

Use black

pull/440/head
Simon Conseil 5 years ago
parent
commit
67689730a6
  1. 4
      .pre-commit-config.yaml
  2. 14
      docs/changelog.py
  3. 11
      docs/conf.py
  4. 6
      pyproject.toml
  5. 5
      setup.cfg
  6. 94
      sigal/__init__.py
  7. 226
      sigal/gallery.py
  8. 65
      sigal/image.py
  9. 7
      sigal/log.py
  10. 3
      sigal/plugins/adjust.py
  11. 44
      sigal/plugins/compress_assets.py
  12. 8
      sigal/plugins/copyright.py
  13. 86
      sigal/plugins/encrypt/encrypt.py
  14. 15
      sigal/plugins/encrypt/endec.py
  15. 19
      sigal/plugins/feeds.py
  16. 40
      sigal/plugins/media_page.py
  17. 84
      sigal/plugins/nomedia.py
  18. 35
      sigal/plugins/nonmedia_files.py
  19. 47
      sigal/plugins/upload_s3.py
  20. 11
      sigal/plugins/watermark.py
  21. 15
      sigal/plugins/zip_gallery.py
  22. 30
      sigal/settings.py
  23. 13
      sigal/utils.py
  24. 108
      sigal/video.py
  25. 90
      sigal/writer.py
  26. 48
      tests/sample/sigal.conf.py
  27. 33
      tests/test_cli.py
  28. 57
      tests/test_compress_assets_plugin.py
  29. 21
      tests/test_encrypt.py
  30. 3
      tests/test_extended_caching.py
  31. 50
      tests/test_gallery.py
  32. 125
      tests/test_image.py
  33. 13
      tests/test_plugins.py
  34. 19
      tests/test_settings.py
  35. 3
      tests/test_utils.py
  36. 23
      tests/test_video.py
  37. 14
      tests/test_zip.py

4
.pre-commit-config.yaml

@ -17,3 +17,7 @@ repos:
rev: 5.8.0
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 21.6b0
hooks:
- id: black

14
docs/changelog.py

@ -56,10 +56,8 @@ def get_authors(revision_range):
lst_release, cur_release = (r.strip() for r in revision_range.split('..'))
# authors, in current release and previous to current release.
cur = set(re.findall(pat, this_repo.git.shortlog('-s', revision_range),
re.M))
pre = set(re.findall(pat, this_repo.git.shortlog('-s', lst_release),
re.M))
cur = set(re.findall(pat, this_repo.git.shortlog('-s', revision_range), re.M))
pre = set(re.findall(pat, this_repo.git.shortlog('-s', lst_release), re.M))
# Append '+' to new authors.
authors = [s + ' +' for s in cur - pre] + [s for s in cur & pre]
@ -71,18 +69,18 @@ def get_pull_requests(repo, revision_range):
prnums = []
# From regular merges
merges = this_repo.git.log(
'--oneline', '--merges', revision_range)
merges = this_repo.git.log('--oneline', '--merges', revision_range)
issues = re.findall("Merge pull request \\#(\\d*)", merges)
prnums.extend(int(s) for s in issues)
# From Homu merges (Auto merges)
issues = re. findall("Auto merge of \\#(\\d*)", merges)
issues = re.findall("Auto merge of \\#(\\d*)", merges)
prnums.extend(int(s) for s in issues)
# From fast forward squash-merges
commits = this_repo.git.log(
'--oneline', '--no-merges', '--first-parent', revision_range)
'--oneline', '--no-merges', '--first-parent', revision_range
)
issues = re.findall('^.*\\(\\#(\\d+)\\)$', commits, re.M)
prnums.extend(int(s) for s in issues)

11
docs/conf.py

@ -16,11 +16,7 @@ sys.path.append(os.path.abspath('..'))
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.extlinks',
'alabaster'
]
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.extlinks', 'alabaster']
extlinks = {'issue': ('https://github.com/saimn/sigal/issues/%s', '#')}
@ -88,7 +84,10 @@ html_theme_path = [alabaster.get_path()]
html_theme = 'alabaster'
html_sidebars = {
'**': [
'about.html', 'navigation.html', 'searchbox.html', 'donate.html',
'about.html',
'navigation.html',
'searchbox.html',
'donate.html',
]
}
html_theme_options = {

6
pyproject.toml

@ -4,3 +4,9 @@ build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
write_to = "sigal/version.py"
[tool.black]
line-length = 88
target-version = ['py37', 'py38', 'py39']
experimental-string-processing = true
skip-string-normalization = true

5
setup.cfg

@ -56,7 +56,10 @@ ignore =
readthedocs.yml
[flake8]
ignore = E731,W504,E501
# ignore = E731,W504,E501
max-line-length = 88
ignore = E203,W503,E731
[isort]
profile = black
known_third_party=blinker,click,jinja2,markdown,natsort,PIL,Pillow,pilkit

94
sigal/__init__.py

@ -69,6 +69,7 @@ def init(path):
sys.exit(1)
from pkg_resources import resource_string
conf = resource_string(__name__, 'templates/sigal.conf.py')
with open(path, 'w', encoding='utf-8') as f:
@ -79,21 +80,37 @@ def init(path):
@main.command()
@argument('source', required=False)
@argument('destination', required=False)
@option('-f', '--force', is_flag=True,
help="Force the reprocessing of existing images")
@option('-f', '--force', is_flag=True, help="Force the reprocessing of existing images")
@option('-v', '--verbose', is_flag=True, help="Show all messages")
@option('-d', '--debug', is_flag=True,
help="Show all messages, including debug messages. Also raise "
"exception if an error happen when processing files.")
@option(
'-d',
'--debug',
is_flag=True,
help=(
"Show all messages, including debug messages. Also raise "
"exception if an error happen when processing files."
),
)
@option('-q', '--quiet', is_flag=True, help="Show only error messages")
@option('-c', '--config', default=_DEFAULT_CONFIG_FILE, show_default=True,
help="Configuration file")
@option('-t', '--theme', help="Specify a theme directory, or a theme name for "
"the themes included with Sigal")
@option(
'-c',
'--config',
default=_DEFAULT_CONFIG_FILE,
show_default=True,
help="Configuration file",
)
@option(
'-t',
'--theme',
help=(
"Specify a theme directory, or a theme name for the themes included with Sigal"
),
)
@option('--title', help="Title of the gallery (overrides the title setting.")
@option('-n', '--ncpu', help="Number of cpu to use (default: all)")
def build(source, destination, debug, verbose, quiet, force, config, theme,
title, ncpu):
def build(
source, destination, debug, verbose, quiet, force, config, theme, title, ncpu
):
"""Run sigal to process a directory.
If provided, 'source', 'destination' and 'theme' will override the
@ -137,14 +154,14 @@ def build(source, destination, debug, verbose, quiet, force, config, theme,
# paths are anyway not relative
relative_check = True
try:
relative_check = os.path.relpath(settings['destination'],
settings['source']).startswith('..')
relative_check = os.path.relpath(
settings['destination'], settings['source']
).startswith('..')
except ValueError:
pass
if not relative_check:
logger.error("Output directory should be outside of the input "
"directory.")
logger.error("Output directory should be outside of the input directory.")
sys.exit(1)
if title:
@ -166,9 +183,11 @@ def build(source, destination, debug, verbose, quiet, force, config, theme,
stats = gal.stats
def format_stats(_type):
opt = ["{} {}".format(stats[_type + '_' + subtype], subtype)
for subtype in ('skipped', 'failed')
if stats[_type + '_' + subtype] > 0]
opt = [
"{} {}".format(stats[_type + '_' + subtype], subtype)
for subtype in ('skipped', 'failed')
if stats[_type + '_' + subtype] > 0
]
opt = ' ({})'.format(', '.join(opt)) if opt else ''
return f'{stats[_type]} {_type}s{opt}'
@ -178,8 +197,8 @@ def build(source, destination, debug, verbose, quiet, force, config, theme,
for t in types[:-1]:
stats_str += f'{format_stats(t)} and '
stats_str += f'{format_stats(types[-1])}'
print('Done, processed {} in {:.2f} seconds.'
.format(stats_str, time.time() - start_time))
end_time = time.time() - start_time
print(f'Done, processed {stats_str} in {end_time:.2f} seconds.')
def init_plugins(settings):
@ -209,8 +228,13 @@ def init_plugins(settings):
@main.command()
@argument('destination', default='_build')
@option('-p', '--port', help="Port to use", default=8000)
@option('-c', '--config', default=_DEFAULT_CONFIG_FILE,
show_default=True, help='Configuration file')
@option(
'-c',
'--config',
default=_DEFAULT_CONFIG_FILE,
show_default=True,
help='Configuration file',
)
def serve(destination, port, config):
"""Run a simple web server."""
if os.path.exists(destination):
@ -219,13 +243,16 @@ def serve(destination, port, config):
settings = read_settings(config)
destination = settings.get('destination')
if not os.path.exists(destination):
sys.stderr.write("The '{}' directory doesn't exist, maybe try "
"building first?\n".format(destination))
sys.stderr.write(
f"The '{destination}' directory doesn't exist, maybe try building"
" first?\n"
)
sys.exit(1)
else:
sys.stderr.write("The {destination} directory doesn't exist "
"and the config file ({config}) could not be read.\n"
.format(destination=destination, config=config))
sys.stderr.write(
f"The {destination} directory doesn't exist "
f"and the config file ({config}) could not be read.\n"
)
sys.exit(2)
print(f'DESTINATION : {destination}')
@ -246,8 +273,9 @@ def serve(destination, port, config):
@main.command()
@argument('target')
@argument('keys', nargs=-1)
@option('-o', '--overwrite', default=False, is_flag=True,
help='Overwrite existing .md file')
@option(
'-o', '--overwrite', default=False, is_flag=True, help='Overwrite existing .md file'
)
def set_meta(target, keys, overwrite=False):
"""Write metadata keys to .md file.
@ -270,12 +298,14 @@ def set_meta(target, keys, overwrite=False):
else:
descfile = os.path.splitext(target)[0] + '.md'
if os.path.exists(descfile) and not overwrite:
sys.stderr.write("Description file '{}' already exists. "
"Use --overwrite to overwrite it.\n".format(descfile))
sys.stderr.write(
f"Description file '{descfile}' already exists. "
"Use --overwrite to overwrite it.\n"
)
sys.exit(2)
with open(descfile, "w") as fp:
for i in range(len(keys) // 2):
k, v = keys[i * 2:(i + 1) * 2]
k, v = keys[i * 2 : (i + 1) * 2]
fp.write(f"{k.capitalize()}: {v}\n")
print(f"{len(keys) // 2} metadata key(s) written to {descfile}")

226
sigal/gallery.py

@ -43,9 +43,16 @@ from PIL import Image as PILImage
from . import image, signals, video
from .image import get_exif_tags, get_image_metadata, get_size, process_image
from .settings import Status, get_thumb
from .utils import (Devnull, cached_property, check_or_create_dir, copy,
get_mime, is_valid_html5_video, read_markdown,
url_from_path)
from .utils import (
Devnull,
cached_property,
check_or_create_dir,
copy,
get_mime,
is_valid_html5_video,
read_markdown,
url_from_path,
)
from .video import process_video
from .writer import AlbumListPageWriter, AlbumPageWriter
@ -139,8 +146,12 @@ class Media:
check_or_create_dir(orig_path)
big_path = join(orig_path, self.src_filename)
if not isfile(big_path):
copy(self.src_path, big_path, symlink=s['orig_link'],
rellink=self.settings['rel_link'])
copy(
self.src_path,
big_path,
symlink=s['orig_link'],
rellink=self.settings['rel_link'],
)
return join(s['orig_dir'], self.src_filename)
@property
@ -155,20 +166,23 @@ class Media:
if not isfile(self.thumb_path):
self.logger.debug('Generating thumbnail for %r', self)
path = (self.dst_path if os.path.exists(self.dst_path)
else self.src_path)
path = self.dst_path if os.path.exists(self.dst_path) else self.src_path
try:
# if thumbnail is missing (if settings['make_thumbs'] is False)
s = self.settings
if self.type == 'image':
image.generate_thumbnail(
path, self.thumb_path, s['thumb_size'],
fit=s['thumb_fit'])
path, self.thumb_path, s['thumb_size'], fit=s['thumb_fit']
)
elif self.type == 'video':
video.generate_thumbnail(
path, self.thumb_path, s['thumb_size'],
s['thumb_video_delay'], fit=s['thumb_fit'],
converter=s['video_converter'])
path,
self.thumb_path,
s['thumb_size'],
s['thumb_video_delay'],
fit=s['thumb_fit'],
converter=s['video_converter'],
)
except Exception as e:
self.logger.error('Failed to generate thumbnail: %s', e)
return
@ -217,8 +231,7 @@ class Image(Media):
def date(self):
"""The date from the EXIF DateTimeOriginal metadata if available, or
from the file date."""
return (self.exif and self.exif.get('dateobj', None) or
self._get_file_date())
return self.exif and self.exif.get('dateobj', None) or self._get_file_date()
@cached_property
def exif(self):
@ -226,8 +239,11 @@ class Image(Media):
information, see :ref:`simple-exif-data`.
"""
datetime_format = self.settings['datetime_format']
return (get_exif_tags(self.raw_exif, datetime_format=datetime_format)
if self.raw_exif and self.src_ext in ('.jpg', '.jpeg') else None)
return (
get_exif_tags(self.raw_exif, datetime_format=datetime_format)
if self.raw_exif and self.src_ext in ('.jpg', '.jpeg')
else None
)
def _get_metadata(self):
super()._get_metadata()
@ -330,8 +346,11 @@ class Album:
# optionally add index.html to the URLs
self.url_ext = self.output_file if settings['index_in_url'] else ''
self.index_url = url_from_path(os.path.relpath(
settings['destination'], self.dst_path)) + '/' + self.url_ext
self.index_url = (
url_from_path(os.path.relpath(settings['destination'], self.dst_path))
+ '/'
+ self.url_ext
)
#: List of all medias in the album (:class:`~sigal.gallery.Image` and
#: :class:`~sigal.gallery.Video`).
@ -361,12 +380,13 @@ class Album:
def __repr__(self):
return "<{}>(path={!r}, title={!r})".format(
self.__class__.__name__, self.path, self.title)
self.__class__.__name__, self.path, self.title
)
def __str__(self):
return (f'{self.path} : ' +
', '.join(f'{count} {_type}s'
for _type, count in self.medias_count.items()))
return f'{self.path} : ' + ', '.join(
f'{count} {_type}s' for _type, count in self.medias_count.items()
)
def __len__(self):
return len(self.medias)
@ -384,8 +404,7 @@ class Album:
self.description = ''
self.meta = {}
# default: get title from directory name
self.title = os.path.basename(self.path if self.path != '.'
else self.src_path)
self.title = os.path.basename(self.path if self.path != '.' else self.src_path)
if isfile(descfile):
meta = read_markdown(descfile)
@ -402,8 +421,7 @@ class Album:
check_or_create_dir(self.dst_path)
if self.medias:
check_or_create_dir(join(self.dst_path,
self.settings['thumb_dir']))
check_or_create_dir(join(self.dst_path, self.settings['thumb_dir']))
if self.medias and self.settings['keep_orig']:
self.orig_path = join(self.dst_path, self.settings['orig_dir'])
@ -432,6 +450,7 @@ class Album:
return album.meta.get(meta_key, [''])[0]
else:
def sort_key(s):
album = self.gallery.albums[join(root_path, s)]
return getattr(album, albums_sort_attr)
@ -451,15 +470,14 @@ class Album:
elif medias_sort_attr.startswith('meta.'):
meta_key = medias_sort_attr.split(".", 1)[1]
key = natsort_keygen(
key=lambda s: s.meta.get(meta_key, [''])[0],
alg=ns.LOCALE)
key=lambda s: s.meta.get(meta_key, [''])[0], alg=ns.LOCALE
)
else:
key = natsort_keygen(
key=lambda s: getattr(s, medias_sort_attr),
alg=ns.LOCALE)
key=lambda s: getattr(s, medias_sort_attr), alg=ns.LOCALE
)
self.medias.sort(key=key,
reverse=self.settings['medias_sort_reverse'])
self.medias.sort(key=key, reverse=self.settings['medias_sort_reverse'])
signals.medias_sorted.send(self)
@ -483,8 +501,7 @@ class Album:
sub-directory.
"""
root_path = self.path if self.path != '.' else ''
return [self.gallery.albums[join(root_path, path)]
for path in self.subdirs]
return [self.gallery.albums[join(root_path, path)] for path in self.subdirs]
@property
def nbmedias(self):
@ -508,8 +525,9 @@ class Album:
thumbnail = self.meta.get('thumbnail', [''])[0]
if thumbnail and isfile(join(self.src_path, thumbnail)):
self._thumbnail = url_from_path(join(
self.name, get_thumb(self.settings, thumbnail)))
self._thumbnail = url_from_path(
join(self.name, get_thumb(self.settings, thumbnail))
)
self.logger.debug("Thumbnail for %r : %s", self, self._thumbnail)
return self._thumbnail
else:
@ -527,15 +545,17 @@ class Album:
if size['width'] > size['height']:
try:
self._thumbnail = (url_quote(self.name) + '/' +
f.thumbnail)
self._thumbnail = url_quote(self.name) + '/' + f.thumbnail
except Exception as e:
self.logger.info("Failed to get thumbnail for %s: %s",
f.dst_filename, e)
self.logger.info(
"Failed to get thumbnail for %s: %s", f.dst_filename, e
)
else:
self.logger.debug(
"Use 1st landscape image as thumbnail for %r : %s",
self, self._thumbnail)
self,
self._thumbnail,
)
return self._thumbnail
# else simply return the 1st media file
@ -543,12 +563,14 @@ class Album:
for media in self.medias:
if media.thumbnail is not None:
try:
self._thumbnail = (url_quote(self.name) + '/' +
media.thumbnail)
self._thumbnail = (
url_quote(self.name) + '/' + media.thumbnail
)
except Exception as e:
self.logger.info(
"Failed to get thumbnail for %s: %s",
media.dst_filename, e
media.dst_filename,
e,
)
else:
break
@ -556,19 +578,21 @@ class Album:
self.logger.warning("No thumbnail found for %r", self)
return
self.logger.debug("Use the 1st image as thumbnail for %r : %s",
self, self._thumbnail)
self.logger.debug(
"Use the 1st image as thumbnail for %r : %s", self, self._thumbnail
)
return self._thumbnail
# use the thumbnail of their sub-directories
if not self._thumbnail:
for path, album in self.gallery.get_albums(self.path):
if album.thumbnail:
self._thumbnail = (url_quote(self.name) + '/' +
album.thumbnail)
self._thumbnail = url_quote(self.name) + '/' + album.thumbnail
self.logger.debug(
"Using thumbnail from sub-directory for %r : %s",
self, self._thumbnail)
self,
self._thumbnail,
)
return self._thumbnail
self.logger.error('Thumbnail not found for %r', self)
@ -576,8 +600,7 @@ class Album:
@property
def random_thumbnail(self):
try:
return url_from_path(join(self.name,
random.choice(self.medias).thumbnail))
return url_from_path(join(self.name, random.choice(self.medias).thumbnail))
except IndexError:
return self.thumbnail
@ -597,8 +620,7 @@ class Album:
if path == '.':
break
url = (url_from_path(os.path.relpath(path, self.path)) + '/' +
self.url_ext)
url = url_from_path(os.path.relpath(path, self.path)) + '/' + self.url_ext
breadcrumb.append((url, self.gallery.albums[path].title))
breadcrumb.reverse()
@ -606,8 +628,7 @@ class Album:
@property
def show_map(self):
"""Check if we have at least one photo with GPS location in the album
"""
"""Check if we have at least one photo with GPS location in the album"""
return any(image.has_location() for image in self.images)
@cached_property
@ -618,7 +639,6 @@ class Album:
class Gallery:
def __init__(self, settings, ncpu=None, quiet=False):
self.settings = settings
self.logger = logging.getLogger(__name__)
@ -637,20 +657,22 @@ class Gallery:
ignore_files = settings['ignore_files']
progressChars = cycle(["/", "-", "\\", "|"])
show_progress = (not quiet and
self.logger.getEffectiveLevel() >= logging.WARNING and
os.isatty(sys.stdout.fileno()))
show_progress = (
not quiet
and self.logger.getEffectiveLevel() >= logging.WARNING
and os.isatty(sys.stdout.fileno())
)
self.progressbar_target = None if show_progress else Devnull()
for path, dirs, files in os.walk(src_path, followlinks=True,
topdown=False):
for path, dirs, files in os.walk(src_path, followlinks=True, topdown=False):
if show_progress:
print("\rCollecting albums " + next(progressChars), end="")
relpath = os.path.relpath(path, src_path)
# Test if the directory match the ignore_dirs settings
if ignore_dirs and any(fnmatch.fnmatch(relpath, ignore)
for ignore in ignore_dirs):
if ignore_dirs and any(
fnmatch.fnmatch(relpath, ignore) for ignore in ignore_dirs
):
self.logger.info('Ignoring %s', relpath)
continue
@ -683,13 +705,19 @@ class Gallery:
if show_progress:
print("\rCollecting albums, done.")
with progressbar(albums.values(), label="%16s" % "Sorting albums",
file=self.progressbar_target) as progress_albums:
with progressbar(
albums.values(),
label="%16s" % "Sorting albums",
file=self.progressbar_target,
) as progress_albums:
for album in progress_albums:
album.sort_subdirs(settings['albums_sort_attr'])
with progressbar(albums.values(), label="%16s" % "Sorting media",
file=self.progressbar_target) as progress_albums:
with progressbar(
albums.values(),
label="%16s" % "Sorting media",
file=self.progressbar_target,
) as progress_albums:
for album in progress_albums:
album.sort_medias(settings['medias_sort_attr'])
@ -718,11 +746,12 @@ class Gallery:
self.logger.info("Using %s cores", ncpu)
if ncpu > 1:
def pool_init():
if self.settings['max_img_pixels']:
PILImage.MAX_IMAGE_PIXELS = self.settings['max_img_pixels']
self.pool = multiprocessing.Pool(processes=ncpu,
initializer=pool_init)
self.pool = multiprocessing.Pool(processes=ncpu, initializer=pool_init)
else:
self.pool = None
@ -751,17 +780,24 @@ class Gallery:
return ""
try:
with progressbar(self.albums.values(), label="Collecting files",
item_show_func=log_func, show_eta=False,
file=self.progressbar_target) as albums:
media_list = [f for album in albums
for f in self.process_dir(album, force=force)]
with progressbar(
self.albums.values(),
label="Collecting files",
item_show_func=log_func,
show_eta=False,
file=self.progressbar_target,
) as albums:
media_list = [
f for album in albums for f in self.process_dir(album, force=force)
]
except KeyboardInterrupt:
sys.exit('Interrupted')
bar_opt = {'label': "Processing files",
'show_pos': True,
'file': self.progressbar_target}
bar_opt = {
'label': "Processing files",
'show_pos': True,
'file': self.progressbar_target,
}
if self.pool:
result = []
@ -778,7 +814,8 @@ class Gallery:
"Failed to process files with the multiprocessing feature."
" This can be caused by some module import or object "
"defined in the settings file, which can't be serialized.",
exc_info=True)
exc_info=True,
)
sys.exit('Abort')
finally:
self.pool.close()
@ -788,19 +825,23 @@ class Gallery:
result = [process_file(media_item) for media_item in medias]
if any(result):
failed_files = [media for status, media in zip(result, media_list)
if status != 0]
failed_files = [
media for status, media in zip(result, media_list) if status != 0
]
self.remove_files(failed_files)
if self.settings['write_html']:
album_writer = AlbumPageWriter(self.settings,
index_title=self.title)
album_list_writer = AlbumListPageWriter(self.settings,
index_title=self.title)
with progressbar(self.albums.values(),
label="%16s" % "Writing files",
item_show_func=log_func, show_eta=False,
file=self.progressbar_target) as albums:
album_writer = AlbumPageWriter(self.settings, index_title=self.title)
album_list_writer = AlbumListPageWriter(
self.settings, index_title=self.title
)
with progressbar(
self.albums.values(),
label="%16s" % "Writing files",
item_show_func=log_func,
show_eta=False,
file=self.progressbar_target,
) as albums:
for album in albums:
if album.albums:
if album.medias:
@ -808,7 +849,8 @@ class Gallery:
"Album %s contains sub-albums and images. "
"Please move images to their own sub-album. "
"Images in album %s will not be visible.",
album.title, album.title
album.title,
album.title,
)
album_list_writer.write(album)
else:
@ -827,8 +869,10 @@ class Gallery:
self.stats[f.type + '_failed'] += 1
album.medias.remove(f)
break
self.logger.error('You can run "sigal build" in verbose (--verbose) or'
' debug (--debug) mode to get more details.')
self.logger.error(
'You can run "sigal build" in verbose (--verbose) or'
' debug (--debug) mode to get more details.'
)
def process_dir(self, album, force=False):
"""Process a list of images in a directory."""

65
sigal/image.py

@ -72,8 +72,10 @@ def _read_image(file_path):
im = PILImage.open(file_path)
for w in caught_warnings:
logger.warning(f'PILImage reported a warning for file {file_path}\n'
f'{w.category}: {w.message}')
logger.warning(
f'PILImage reported a warning for file {file_path}\n'
f'{w.category}: {w.message}'
)
return im
@ -97,9 +99,11 @@ def generate_image(source, outname, settings, options=None):
original_format = img.format
if settings['copy_exif_data'] and settings['autorotate_images']:
logger.warning("The 'autorotate_images' and 'copy_exif_data' settings "
"are not compatible because Sigal can't save the "
"modified Orientation tag.")
logger.warning(
"The 'autorotate_images' and 'copy_exif_data' settings "
"are not compatible because Sigal can't save the "
"modified Orientation tag."
)
# Preserve EXIF data
if settings['copy_exif_data'] and _has_exif_tags(img):
@ -120,8 +124,7 @@ def generate_image(source, outname, settings, options=None):
if settings['img_processor']:
try:
logger.debug('Processor: %s', settings['img_processor'])
processor_cls = getattr(pilkit.processors,
settings['img_processor'])
processor_cls = getattr(pilkit.processors, settings['img_processor'])
except AttributeError:
logger.error('Wrong processor name: %s', settings['img_processor'])
sys.exit()
@ -142,15 +145,15 @@ def generate_image(source, outname, settings, options=None):
# first, use hard-coded output format, or PIL format, or original image
# format, or fall back to JPEG
outformat = (settings.get('img_format') or img.format or
original_format or 'JPEG')
outformat = settings.get('img_format') or img.format or original_format or 'JPEG'
logger.debug('Save resized image to %s (%s)', outname, outformat)
save_image(img, outname, outformat, options=options, autoconvert=True)
def generate_thumbnail(source, outname, box, fit=True, options=None,
thumb_fit_centering=(0.5, 0.5)):
def generate_thumbnail(
source, outname, box, fit=True, options=None, thumb_fit_centering=(0.5, 0.5)
):
"""Create a thumbnail image."""
logger = logging.getLogger(__name__)
@ -159,8 +162,7 @@ def generate_thumbnail(source, outname, box, fit=True, options=None,
original_format = img.format
if fit:
img = ImageOps.fit(img, box, PILImage.ANTIALIAS,
centering=thumb_fit_centering)
img = ImageOps.fit(img, box, PILImage.ANTIALIAS, centering=thumb_fit_centering)
else:
img.thumbnail(box, PILImage.ANTIALIAS)
@ -183,8 +185,7 @@ def process_image(media):
options = {}
try:
generate_image(media.src_path, media.dst_path, media.settings,
options=options)
generate_image(media.src_path, media.dst_path, media.settings, options=options)
if media.settings['make_thumbs']:
generate_thumbnail(
@ -193,7 +194,7 @@ def process_image(media):
media.settings['thumb_size'],
fit=media.settings['thumb_fit'],
options=options,
thumb_fit_centering=media.settings["thumb_fit_centering"]
thumb_fit_centering=media.settings["thumb_fit_centering"],
)
except Exception as e:
logger.info('Failed to process: %r', e)
@ -232,17 +233,18 @@ def get_exif_data(filename):
return None
for w in caught_warnings:
fname = (filename.filename if isinstance(filename, PILImage.Image)
else filename)
logger.warning(f'PILImage reported a warning for file {fname}\n'
f'{w.category}: {w.message}')
fname = filename.filename if isinstance(filename, PILImage.Image) else filename
logger.warning(
f'PILImage reported a warning for file {fname}\n{w.category}: {w.message}'
)
data = {TAGS.get(tag, tag): value for tag, value in exif.items()}
if 'GPSInfo' in data:
try:
data['GPSInfo'] = {GPSTAGS.get(tag, tag): value
for tag, value in data['GPSInfo'].items()}
data['GPSInfo'] = {
GPSTAGS.get(tag, tag): value for tag, value in data['GPSInfo'].items()
}
except AttributeError:
logger.info('Failed to get GPS Info')
del data['GPSInfo']
@ -259,7 +261,8 @@ def get_iptc_data(filename):
# PILs IptcImagePlugin issues a SyntaxError in certain circumstances
# with malformed metadata, see PIL/IptcImagePlugin.py", line 71.
# ( https://github.com/python-pillow/Pillow/blob/9dd0348be2751beb2c617e32ff9985aa2f92ae5f/src/PIL/IptcImagePlugin.py#L71 )
# ( https://github.com/python-pillow/Pillow/blob/
# 9dd0348be2751beb2c617e32ff9985aa2f92ae5f/src/PIL/IptcImagePlugin.py#L71 )
try:
img = _read_image(filename)
raw_iptc = IptcImagePlugin.getiptcinfo(img)
@ -274,13 +277,11 @@ def get_iptc_data(filename):
# 2:120 is the IPTC description property
if raw_iptc and (2, 120) in raw_iptc:
iptc_data["description"] = raw_iptc[(2, 120)].decode('utf-8',
errors='replace')
iptc_data["description"] = raw_iptc[(2, 120)].decode('utf-8', errors='replace')
# 2:105 is the IPTC headline property
if raw_iptc and (2, 105) in raw_iptc:
iptc_data["headline"] = raw_iptc[(2, 105)].decode('utf-8',
errors='replace')
iptc_data["headline"] = raw_iptc[(2, 105)].decode('utf-8', errors='replace')
return iptc_data
@ -356,8 +357,7 @@ def get_exif_tags(data, datetime_format='%c'):
else:
simple['focal'] = round(float(focal[0]) / focal[1])
except Exception:
logger.debug('Skipped invalid FocalLength: %r', focal,
exc_info=True)
logger.debug('Skipped invalid FocalLength: %r', focal, exc_info=True)
if 'ExposureTime' in data:
exptime = data['ExposureTime']
@ -365,8 +365,7 @@ def get_exif_tags(data, datetime_format='%c'):
simple['exposure'] = str(exptime)
elif isinstance(exptime, tuple):
try:
simple['exposure'] = str(fractions.Fraction(exptime[0],
exptime[1]))
simple['exposure'] = str(fractions.Fraction(exptime[0], exptime[1]))
except ZeroDivisionError:
logger.info('Invalid ExposureTime: %r', exptime)
elif isinstance(exptime, int):
@ -402,8 +401,8 @@ def get_exif_tags(data, datetime_format='%c'):
logger.info('Failed to read GPS info')
else:
simple['gps'] = {
'lat': - lat if lat_ref_info != 'N' else lat,
'lon': - lon if lon_ref_info != 'E' else lon,
'lat': -lat if lat_ref_info != 'N' else lat,
'lon': -lon if lon_ref_info != 'E' else lon,
}
return simple

7
sigal/log.py

@ -25,8 +25,7 @@ from logging import Formatter
# The background is set with 40 plus the number of the color, and the
# foreground with 30
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = (30 + i
for i in range(8))
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = (30 + i for i in range(8))
COLORS = {
'DEBUG': BLUE,
@ -47,7 +46,6 @@ def colored(text, color):
class ColoredFormatter(Formatter):
def format(self, record):
level = record.levelname
return colored(level, COLORS[level]) + ': ' + record.getMessage()
@ -63,8 +61,7 @@ def init_logging(name, level=logging.INFO):
logger.setLevel(level)
try:
if os.isatty(sys.stdout.fileno()) and \
not sys.platform.startswith('win'):
if os.isatty(sys.stdout.fileno()) and not sys.platform.startswith('win'):
formatter = ColoredFormatter()
elif level == logging.DEBUG:
formatter = Formatter('%(levelname)s - %(message)s')

3
sigal/plugins/adjust.py

@ -2,7 +2,8 @@
Based on pilkit's Adjust_ processor.
.. _Adjust: https://github.com/matthewwithanm/pilkit/blob/master/pilkit/processors/base.py#L19
.. _Adjust: \
https://github.com/matthewwithanm/pilkit/blob/master/pilkit/processors/base.py#L19
Settings::

44
sigal/plugins/compress_assets.py

@ -48,8 +48,9 @@ class BaseCompressor:
suffix = None
def __init__(self, settings):
self.suffixes_to_compress = settings.get('suffixes',
DEFAULT_SETTINGS['suffixes'])
self.suffixes_to_compress = settings.get(
'suffixes', DEFAULT_SETTINGS['suffixes']
)
def do_compress(self, filename, compressed_filename):
"""
@ -92,9 +93,11 @@ class BaseCompressor:
pass
if file_stats and compressed_stats:
return (compressed_filename
if file_stats.st_mtime > compressed_stats.st_mtime
else False)
return (
compressed_filename
if file_stats.st_mtime > compressed_stats.st_mtime
else False
)
else:
return compressed_filename
@ -103,8 +106,9 @@ class GZipCompressor(BaseCompressor):
suffix = 'gz'
def do_compress(self, filename, compressed_filename):
with open(filename, 'rb') as f_in, \
gzip.open(compressed_filename, 'wb') as f_out:
with open(filename, 'rb') as f_in, gzip.open(
compressed_filename, 'wb'
) as f_out:
shutil.copyfileobj(f_in, f_out)
@ -113,8 +117,8 @@ class ZopfliCompressor(BaseCompressor):
def do_compress(self, filename, compressed_filename):
import zopfli.gzip
with open(filename, 'rb') as f_in, \
open(compressed_filename, 'wb') as f_out:
with open(filename, 'rb') as f_in, open(compressed_filename, 'wb') as f_out:
f_out.write(zopfli.gzip.compress(f_in.read()))
@ -123,8 +127,8 @@ class BrotliCompressor(BaseCompressor):
def do_compress(self, filename, compressed_filename):
import brotli
with open(filename, 'rb') as f_in, \
open(compressed_filename, 'wb') as f_out:
with open(filename, 'rb') as f_in, open(compressed_filename, 'wb') as f_out:
f_out.write(brotli.compress(f_in.read(), mode=brotli.MODE_TEXT))
@ -135,6 +139,7 @@ def get_compressor(settings):
elif name == 'zopfli':
try:
import zopfli.gzip # noqa
return ZopfliCompressor(settings)
except ImportError:
logging.error('Unable to import zopfli module')
@ -142,6 +147,7 @@ def get_compressor(settings):
elif name == 'brotli':
try:
import brotli # noqa
return BrotliCompressor(settings)
except ImportError:
logger.error('Unable to import brotli module')
@ -152,8 +158,9 @@ def get_compressor(settings):
def compress_gallery(gallery):
logging.info('Compressing assets for %s', gallery.title)
compress_settings = gallery.settings.get('compress_assets_options',
DEFAULT_SETTINGS)
compress_settings = gallery.settings.get(
'compress_assets_options', DEFAULT_SETTINGS
)
compressor = get_compressor(compress_settings)
if compressor is None:
@ -162,15 +169,16 @@ def compress_gallery(gallery):
# Collecting theme assets
theme_assets = []
for current_directory, _, filenames in os.walk(
os.path.join(gallery.settings['destination'], 'static')):
os.path.join(gallery.settings['destination'], 'static')
):
for filename in filenames:
theme_assets.append(os.path.join(current_directory, filename))
with progressbar(length=len(gallery.albums) + len(theme_assets),
label='Compressing static files') as bar:
with progressbar(
length=len(gallery.albums) + len(theme_assets), label='Compressing static files'
) as bar:
for album in gallery.albums.values():
compressor.compress(os.path.join(album.dst_path,
album.output_file))
compressor.compress(os.path.join(album.dst_path, album.output_file))
bar.update(1)
for theme_asset in theme_assets:

8
sigal/plugins/copyright.py

@ -32,20 +32,18 @@ def add_copyright(img, settings=None):
font_size = settings.get('copyright_text_font_size', 10)
assert font_size >= 0
color = settings.get('copyright_text_color', (0, 0, 0))
bottom_margin = 3 # bottom margin for text
bottom_margin = 3 # bottom margin for text
text_height = bottom_margin + 12 # default text height (of 15)
if font:
try:
font = ImageFont.truetype(font, font_size)
text_height = font.getsize(text)[1] + bottom_margin
except Exception: # load default font in case of any exception
logger.debug("Exception: Couldn't locate font %s, using "
"default font", font)
logger.debug("Exception: Couldn't locate font %s, using default font", font)
font = ImageFont.load_default()
else:
font = ImageFont.load_default()
left, top = settings.get('copyright_text_position',
(5, img.size[1] - text_height))
left, top = settings.get('copyright_text_position', (5, img.size[1] - text_height))
draw.text((left, top), text, fill=color, font=font)
return img

86
sigal/plugins/encrypt/encrypt.py

@ -37,7 +37,8 @@ from .endec import encrypt, kdf_gen_key
logger = logging.getLogger(__name__)
ASSETS_PATH = os.path.normpath(
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'static'))
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'static')
)
class Abort(Exception):
@ -45,9 +46,9 @@ class Abort(Exception):
def gen_rand_string(length=16):
return "".join(random.SystemRandom().choices(string.ascii_letters +
string.digits,
k=length))
return "".join(
random.SystemRandom().choices(string.ascii_letters + string.digits, k=length)
)
def get_options(settings, cache):
@ -62,8 +63,10 @@ def get_options(settings, cache):
options = settings["encrypt_options"]
table = str.maketrans({'"': r'\"', '\\': r'\\'})
if "password" not in settings["encrypt_options"] \
or len(settings["encrypt_options"]["password"]) == 0:
if (
"password" not in settings["encrypt_options"]
or len(settings["encrypt_options"]["password"]) == 0
):
logger.error("Encrypt: no password provided")
raise ValueError("no password provided")
else:
@ -71,10 +74,10 @@ def get_options(settings, cache):
options["escaped_password"] = options["password"].translate(table)
if "ask_password" not in options:
options["ask_password"] = settings["encrypt_options"].get(
"ask_password", False)
options["filtered_password"] = "" if options["ask_password"] else options[
"escaped_password"]
options["ask_password"] = settings["encrypt_options"].get("ask_password", False)
options["filtered_password"] = (
"" if options["ask_password"] else options["escaped_password"]
)
if "gcm_tag" not in options:
options["gcm_tag"] = gen_rand_string()
@ -96,7 +99,7 @@ def get_options(settings, cache):
"gcm_tag": options["gcm_tag"],
"kdf_salt": options["kdf_salt"],
"kdf_iters": options["kdf_iters"],
"galleryId": options["galleryId"]
"galleryId": options["galleryId"],
}
return options
@ -123,8 +126,7 @@ def get_encrypt_list(settings, media):
to_encrypt.append(get_thumb(settings, media.dst_filename)) # thumbnail
if media.big is not None and not settings["use_orig"]:
to_encrypt.append(media.big) # original image
to_encrypt = list(
map(lambda path: os.path.join(media.path, path), to_encrypt))
to_encrypt = list(map(lambda path: os.path.join(media.path, path), to_encrypt))
return to_encrypt
@ -145,16 +147,17 @@ def load_cache(settings):
try:
with open(cachePath, "rb") as cacheFile:
encryptCache = pickle.load(cacheFile)
logger.debug("Loaded encryption cache with %d entries",
len(encryptCache))
logger.debug("Loaded encryption cache with %d entries", len(encryptCache))
return encryptCache
except FileNotFoundError:
encryptCache = {}
return encryptCache
except Exception as e:
logger.error("Could not load encryption cache: %s", e)
logger.error("Giving up encryption. You may have to delete and "
"rebuild the entire gallery.")
logger.error(
"Giving up encryption. You may have to delete and "
"rebuild the entire gallery."
)
raise Abort
@ -187,18 +190,20 @@ def encrypt_files(settings, config, cache, albums, progressbar_target):
if settings["keep_orig"] and settings["orig_link"]:
logger.warning(
"Original images are symlinked! Encryption is aborted. "
"Please set 'orig_link' to False and restart gallery build.")
"Please set 'orig_link' to False and restart gallery build."
)
raise Abort
key = kdf_gen_key(config["password"], config["kdf_salt"],
config["kdf_iters"])
key = kdf_gen_key(config["password"], config["kdf_salt"], config["kdf_iters"])
gcm_tag = config["gcm_tag"].encode("utf-8")
medias = list(chain.from_iterable(albums.values()))
with progressbar(medias,
label="%16s" % "Encrypting files",
file=progressbar_target,
show_eta=True) as medias:
with progressbar(
medias,
label="%16s" % "Encrypting files",
file=progressbar_target,
show_eta=True,
) as medias:
for media in medias:
if media.type != "image":
logger.info("Skipping non-image file %s", media.src_filename)
@ -222,8 +227,7 @@ def encrypt_files(settings, config, cache, albums, progressbar_target):
save_cache(settings, cache)
raise Abort
key_check_path = os.path.join(settings["destination"], 'static',
'keycheck.txt')
key_check_path = os.path.join(settings["destination"], 'static', 'keycheck.txt')
encrypt_file("keycheck.txt", key_check_path, key, gcm_tag)
@ -248,18 +252,24 @@ def encrypt_file(filename, full_path, key, gcm_tag):
def copy_assets(settings):
theme_path = os.path.join(settings["destination"], 'static')
copy(os.path.join(ASSETS_PATH, "decrypt.js"),
theme_path,
symlink=False,
rellink=False)
copy(os.path.join(ASSETS_PATH, "keycheck.txt"),
theme_path,
symlink=False,
rellink=False)
copy(os.path.join(ASSETS_PATH, "sw.js"),
settings["destination"],
symlink=False,
rellink=False)
copy(
os.path.join(ASSETS_PATH, "decrypt.js"),
theme_path,
symlink=False,
rellink=False,
)
copy(
os.path.join(ASSETS_PATH, "keycheck.txt"),
theme_path,
symlink=False,
rellink=False,
)
copy(
os.path.join(ASSETS_PATH, "sw.js"),
settings["destination"],
symlink=False,
rellink=False,
)
def inject_scripts(context):

15
sigal/plugins/encrypt/endec.py

@ -40,11 +40,7 @@ def kdf_gen_key(password: str, salt: str, iters: int) -> bytes:
password = password.encode("utf-8")
salt = salt.encode("utf-8")
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(),
length=16,
salt=salt,
iterations=iters,
backend=backend
algorithm=hashes.SHA1(), length=16, salt=salt, iterations=iters, backend=backend
)
key = kdf.derive(password)
return key
@ -102,11 +98,16 @@ def decrypt(key: bytes, infile: BinaryIO, outfile: BinaryIO, tag: bytes):
if __name__ == "__main__":
import argparse as ap
parser = ap.ArgumentParser(description="Encrypt or decrypt using AES-128-GCM")
parser.add_argument("-k", "--key", help="Base64-encoded key")
parser.add_argument("-p", "--password", help="Password in plaintext")
parser.add_argument("--kdf-salt", help="PBKDF2 salt", default="saltysaltsweetysweet")
parser.add_argument("--kdf-iters", type=int, help="PBKDF2 iterations", default=10000)
parser.add_argument(
"--kdf-salt", help="PBKDF2 salt", default="saltysaltsweetysweet"
)
parser.add_argument(
"--kdf-iters", type=int, help="PBKDF2 iterations", default=10000
)
parser.add_argument("--gcm-tag", help="AES-GCM tag", default="AuTheNTiCatIoNtAG")
parser.add_argument("-i", "--infile", help="Input file")
parser.add_argument("-o", "--outfile", help="Output file")

19
sigal/plugins/feeds.py

@ -29,27 +29,31 @@ logger = logging.getLogger(__name__)
def generate_feeds(gallery):
# Get all images and videos and sort by date
medias = [med for album in gallery.albums.values()
for med in album.medias if med.date is not None]
medias = [
med
for album in gallery.albums.values()
for med in album.medias
if med.date is not None
]
medias.sort(key=lambda m: m.date, reverse=True)
settings = gallery.settings
if settings.get('rss_feed'):
generate_feed(gallery, medias, feed_type='rss', **settings['rss_feed'])
if settings.get('atom_feed'):
generate_feed(gallery, medias, feed_type='atom',
**settings['atom_feed'])
generate_feed(gallery, medias, feed_type='atom', **settings['atom_feed'])
def generate_feed(gallery, medias, feed_type=None, feed_url='', nb_items=0):
from feedgenerator import Atom1Feed, Rss201rev2Feed
root_album = gallery.albums['.']
cls = Rss201rev2Feed if feed_type == 'rss' else Atom1Feed
feed = cls(
title=Markup.escape(root_album.title),
link='/',
feed_url=feed_url,
description=Markup.escape(root_album.description).striptags()
description=Markup.escape(root_album.description).striptags(),
)
theme = gallery.settings['theme']
@ -69,8 +73,9 @@ def generate_feed(gallery, medias, feed_type=None, feed_url='', nb_items=0):
# unique_id='tag:%s,%s:%s' % (urlparse(link).netloc,
# item.date.date(),
# urlparse(link).path.lstrip('/')),
description='<img src="{}/{}/{}" />'.format(base_url, item.path,
item.thumbnail),
description='<img src="{}/{}/{}" />'.format(
base_url, item.path, item.thumbnail
),
# categories=item.tags if hasattr(item, 'tags') else None,
author_name=getattr(item, 'author', ''),
pubdate=item.date or datetime.now(),

40
sigal/plugins/media_page.py

@ -37,37 +37,39 @@ from sigal.writer import AbstractWriter
class PageWriter(AbstractWriter):
'''A writer for writing media pages, based on writer'''
"""A writer for writing media pages, based on writer"""
template_file = "media.html"
def write(self, album, media_group):
''' Generate the media page and save it '''
"""Generate the media page and save it"""
from sigal import __url__ as sigal_link
ctx = {
"album": album,
"media": media_group[0],
"previous_media": media_group[-1],
"next_media": media_group[1],
"index_title": self.index_title,
"settings": self.settings,
"sigal_link": sigal_link,
"theme": {
"name": os.path.basename(self.theme),
"url": url_from_path(os.path.relpath(self.theme_path, album.dst_path)),
},
}
page = self.template.render(ctx)
file_path = os.path.join(album.dst_path, media_group[0].dst_filename)
output_file = f"{file_path}.html"
page = self.template.render({
'album': album,
'media': media_group[0],
'previous_media': media_group[-1],
'next_media': media_group[1],
'index_title': self.index_title,
'settings': self.settings,
'sigal_link': sigal_link,
'theme': {'name': os.path.basename(self.theme),
'url': url_from_path(os.path.relpath(self.theme_path,
album.dst_path))},
})
output_file = "%s.html" % file_path
with open(output_file, 'w', encoding='utf-8') as f:
with open(output_file, "w", encoding="utf-8") as f:
f.write(page)
def generate_media_pages(gallery):
'''Generates and writes the media pages for all media in the gallery'''
"""Generates and writes the media pages for all media in the gallery"""
writer = PageWriter(gallery.settings, index_title=gallery.title)

84
sigal/plugins/nomedia.py

@ -62,13 +62,12 @@ def _remove_albums_with_subdirs(albums, keystoremove, prefix=""):
# remove them first
try:
album = albums[key]
settings = album.settings
if album.medias:
os.rmdir(os.path.join(album.dst_path,
album.settings['thumb_dir']))
os.rmdir(os.path.join(album.dst_path, settings["thumb_dir"]))
if album.medias and album.settings['keep_orig']:
os.rmdir(os.path.join(album.dst_path,
album.settings['orig_dir']))
if album.medias and settings["keep_orig"]:
os.rmdir(os.path.join(album.dst_path, settings["orig_dir"]))
os.rmdir(album.dst_path)
except OSError:
@ -84,41 +83,46 @@ def filter_nomedia(album, settings=None):
"""Removes all filtered Media and subdirs from an Album"""
nomediapath = os.path.join(album.src_path, ".nomedia")
if os.path.isfile(nomediapath):
if os.path.getsize(nomediapath) == 0:
logger.info("Ignoring album '%s' because of present 0-byte "
".nomedia file", album.name)
# subdirs have been added to the gallery already, remove them
# there, too
_remove_albums_with_subdirs(album.gallery.albums, [album.path])
try:
os.rmdir(album.dst_path)
except OSError:
# directory was created and populated with images in a
# previous run => keep it
pass
# cannot set albums => empty subdirs so that no albums are
# generated
album.subdirs = []
album.medias = []
else:
with open(nomediapath) as nomediaFile:
logger.info("Found a .nomedia file in %s, ignoring its "
"entries", album.name)
ignored = nomediaFile.read().split("\n")
album.medias = [media for media in album.medias
if media.src_filename not in ignored]
album.subdirs = [dirname for dirname in album.subdirs
if dirname not in ignored]
# subdirs have been added to the gallery already, remove
# them there, too
_remove_albums_with_subdirs(album.gallery.albums,
ignored, album.path + os.path.sep)
if not os.path.isfile(nomediapath):
return
if os.path.getsize(nomediapath) == 0:
logger.info(
"Ignoring album '%s' because of present 0-byte .nomedia file", album.name
)
# subdirs have been added to the gallery already, remove them
# there, too
_remove_albums_with_subdirs(album.gallery.albums, [album.path])
try:
os.rmdir(album.dst_path)
except OSError:
# directory was created and populated with images in a
# previous run => keep it
pass
# cannot set albums => empty subdirs so that no albums are
# generated
album.subdirs = []
album.medias = []
else:
with open(nomediapath) as nomediaFile:
logger.info("Found a .nomedia file in %s, ignoring its entries", album.name)
ignored = nomediaFile.read().split("\n")
album.medias = [
media for media in album.medias if media.src_filename not in ignored
]
album.subdirs = [
dirname for dirname in album.subdirs if dirname not in ignored
]
# subdirs have been added to the gallery already, remove
# them there, too
_remove_albums_with_subdirs(
album.gallery.albums, ignored, album.path + os.path.sep
)
def register(settings):

35
sigal/plugins/nonmedia_files.py

@ -92,19 +92,26 @@ class NonMedia(Media):
kwargs['font_color'] = plugin_settings['thumb_font_color']
if plugin_settings.get('thumb_font_size', None):
kwargs['font_size'] = plugin_settings['thumb_font_size']
generate_thumbnail(self.src_ext[1:].upper(), self.thumb_path,
self.settings['thumb_size'],
options=self.settings['jpg_options'],
**kwargs)
generate_thumbnail(
self.src_ext[1:].upper(),
self.thumb_path,
self.settings['thumb_size'],
options=self.settings['jpg_options'],
**kwargs,
)
return super().thumbnail
def generate_thumbnail(text, outname, box,
bg_color=DEFAULT_CONFIG['thumb_bg_color'],
font=DEFAULT_CONFIG['thumb_font'],
font_color=DEFAULT_CONFIG['thumb_font_color'],
font_size=DEFAULT_CONFIG['thumb_font_size'],
options=None):
def generate_thumbnail(
text,
outname,
box,
bg_color=DEFAULT_CONFIG['thumb_bg_color'],
font=DEFAULT_CONFIG['thumb_font'],
font_color=DEFAULT_CONFIG['thumb_font_color'],
font_size=DEFAULT_CONFIG['thumb_font_size'],
options=None,
):
"""Create a thumbnail image."""
kwargs = {}
@ -132,8 +139,7 @@ def process_nonmedia(media):
plugin_settings = settings.get('nonmedia_files_options', {})
try:
utils.copy(media.src_path, media.dst_path,
symlink=settings['orig_link'])
utils.copy(media.src_path, media.dst_path, symlink=settings['orig_link'])
except Exception:
if logger.getEffectiveLevel() == logging.DEBUG:
raise
@ -155,7 +161,7 @@ def process_nonmedia(media):
media.thumb_path,
settings['thumb_size'],
options=settings['jpg_options'],
**kwargs
**kwargs,
)
except Exception:
if logger.getEffectiveLevel() == logging.DEBUG:
@ -169,7 +175,8 @@ def album_file(album, filename, media=None):
if not media:
ext = os.path.splitext(filename)[1]
ext_ignore = album.settings.get('nonmedia_files_options', {}).get(
'ignore_ext', DEFAULT_CONFIG['ignore_ext'])
'ignore_ext', DEFAULT_CONFIG['ignore_ext']
)
if ext in ext_ignore:
logger.info('Ignoring non-media file: %s', filename)
else:

47
sigal/plugins/upload_s3.py

@ -34,33 +34,34 @@ logger = logging.getLogger(__name__)
def upload_s3(gallery, settings=None):
import boto
upload_files = []
# Get local files
for root, dirs, files in os.walk(gallery.settings['destination']):
for root, dirs, files in os.walk(gallery.settings["destination"]):
for f in files:
path = os.path.join(
root[len(gallery.settings['destination']) + 1:], f)
path = os.path.join(root[len(gallery.settings["destination"]) + 1 :], f)
size = os.path.getsize(os.path.join(root, f))
upload_files += [(path, size)]
# Connect to specified bucket
conn = boto.connect_s3()
bucket = conn.get_bucket(gallery.settings['upload_s3_options']['bucket'])
bucket = conn.get_bucket(gallery.settings["upload_s3_options"]["bucket"])
# Upload the files
with progressbar(upload_files, label="Uploading files to S3") as bar:
for (f, size) in bar:
if gallery.settings['upload_s3_options']['overwrite'] is False:
if gallery.settings["upload_s3_options"]["overwrite"] is False:
# Check if file was uploaded before
key = bucket.get_key(f)
if key is not None and key.size == size:
cache_metadata = generate_cache_metadata(gallery, f)
if key.get_metadata('Cache-Control') != cache_metadata:
key.set_remote_metadata({
'Cache-Control': cache_metadata}, {}, True)
logger.debug("Skipping file %s" % (f))
if key.get_metadata("Cache-Control") != cache_metadata:
key.set_remote_metadata(
{"Cache-Control": cache_metadata}, {}, True
)
logger.debug("Skipping file %s", f)
else:
upload_file(gallery, bucket, f)
else:
@ -69,37 +70,37 @@ def upload_s3(gallery, settings=None):
def generate_cache_metadata(gallery, f):
filename, file_extension = os.path.splitext(f)
filename, ext = os.path.splitext(f)
options = gallery.settings["upload_s3_options"]
proposed_cache_control = None
if 'media_max_age' in gallery.settings['upload_s3_options'] and \
file_extension in ['.jpg', '.png', '.webm', '.mp4']:
proposed_cache_control = "max-age=%s" % \
gallery.settings['upload_s3_options']['media_max_age']
elif 'max_age' in gallery.settings['upload_s3_options']:
proposed_cache_control = "max-age=%s" % \
gallery.settings['upload_s3_options']['max_age']
if "media_max_age" in options and ext in [".jpg", ".png", ".webm", ".mp4"]:
proposed_cache_control = "max-age=%s" % options["media_max_age"]
elif "max_age" in options:
proposed_cache_control = "max-age=%s" % options["max_age"]
return proposed_cache_control
def upload_file(gallery, bucket, f):
logger.debug("Uploading file %s" % (f))
logger.debug("Uploading file %s", f)
from boto.s3.key import Key
key = Key(bucket)
key.key = f
cache_metadata = generate_cache_metadata(gallery, f)
if cache_metadata:
key.set_metadata('Cache-Control', cache_metadata)
key.set_metadata("Cache-Control", cache_metadata)
key.set_contents_from_filename(
os.path.join(gallery.settings['destination'], f),
policy=gallery.settings['upload_s3_options']['policy'])
os.path.join(gallery.settings["destination"], f),
policy=gallery.settings["upload_s3_options"]["policy"],
)
def register(settings):
if settings.get('upload_s3_options'):
if settings.get("upload_s3_options"):
signals.gallery_build.connect(upload_s3)
else:
logger.warning('Upload to S3 is not configured.')
logger.warning("Upload to S3 is not configured.")

11
sigal/plugins/watermark.py

@ -1,4 +1,4 @@
# Copyright (c) 2005 - Shane Hathaway (http://code.activestate.com/recipes/362879-watermark-with-pil/)
# Copyright (c) 2005 - Shane Hathaway
# Copyright (c) 2015 - Abdul Qabiz
# Permission is hereby granted, free of charge, to any person obtaining a copy
@ -21,6 +21,9 @@
"""Plugin which adds a watermark to the image.
Based on http://code.activestate.com/recipes/362879-watermark-with-pil/
(Licensed under the PSF License).
Settings:
- ``watermark``: path to the watermark image.
@ -69,13 +72,11 @@ def watermark(im, mark, position, opacity=1):
layer.paste(mark, (x, y))
elif position == 'scale':
# scale, but preserve the aspect ratio
ratio = min(
float(im.size[0]) / mark.size[0], float(im.size[1]) / mark.size[1])
ratio = min(float(im.size[0]) / mark.size[0], float(im.size[1]) / mark.size[1])
w = int(mark.size[0] * ratio)
h = int(mark.size[1] * ratio)
mark = mark.resize((w, h))
layer.paste(mark, (int((im.size[0] - w) / 2),
int((im.size[1] - h) / 2)))
layer.paste(mark, (int((im.size[0] - w) / 2), int((im.size[1] - h) / 2)))
else:
layer.paste(mark, position)
# composite the watermark with the layer

15
sigal/plugins/zip_gallery.py

@ -61,14 +61,14 @@ def _generate_album_zip(album):
if zip_gallery and len(album) > 0:
zip_gallery = zip_gallery.format(album=album)
archive_path = join(album.dst_path, zip_gallery)
if (album.settings.get('zip_skip_if_exists', False) and
isfile(archive_path)):
if album.settings.get('zip_skip_if_exists', False) and isfile(archive_path):
logger.debug("Archive %s already created, passing", archive_path)
return zip_gallery
archive = zipfile.ZipFile(archive_path, 'w', allowZip64=True)
attr = ('src_path' if album.settings['zip_media_format'] == 'orig'
else 'dst_path')
attr = (
'src_path' if album.settings['zip_media_format'] == 'orig' else 'dst_path'
)
for p in album:
path = getattr(p, attr)
@ -95,8 +95,11 @@ def generate_album_zip(album):
# check if ZIP file generation as been disabled by .nozip_gallery file
if not _should_generate_album_zip(album):
logger.info("Ignoring ZIP gallery generation for album '%s' because of present "
".nozip_gallery file", album.name)
logger.info(
"Ignoring ZIP gallery generation for album '%s' because of present "
".nozip_gallery file",
album.name,
)
return False
return _generate_album_zip(album)

30
sigal/settings.py

@ -40,8 +40,7 @@ _DEFAULT_CONFIG = {
'google_tag_manager': '',
'ignore_directories': [],
'ignore_files': [],
'img_extensions': ['.jpg', '.jpeg', '.png', '.gif', '.tif', '.tiff',
'.webp'],
'img_extensions': ['.jpg', '.jpeg', '.png', '.gif', '.tif', '.tiff', '.webp'],
'img_processor': 'ResizeToFit',
'img_size': (640, 480),
'img_format': None,
@ -85,8 +84,7 @@ _DEFAULT_CONFIG = {
'video_always_convert': False,
'video_size': (480, 360),
'watermark': '',
'webm_options': ['-crf', '10', '-b:v', '1.6M',
'-qmin', '4', '-qmax', '63'],
'webm_options': ['-crf', '10', '-b:v', '1.6M', '-qmin', '4', '-qmax', '63'],
'webm_options_second_pass': None,
'write_html': True,
'zip_gallery': False,
@ -119,8 +117,11 @@ def get_thumb(settings, filename):
if ext.lower() in settings['video_extensions']:
ext = '.jpg'
return join(path, settings['thumb_dir'], settings['thumb_prefix'] +
name + settings['thumb_suffix'] + ext)
return join(
path,
settings['thumb_dir'],
settings['thumb_prefix'] + name + settings['thumb_suffix'] + ext,
)
def read_settings(filename=None):
@ -139,15 +140,16 @@ def read_settings(filename=None):
code = compile(f.read(), filename, 'exec')
exec(code, tempdict)
settings.update((k, v) for k, v in tempdict.items()
if k not in ['__builtins__'])
settings.update(
(k, v) for k, v in tempdict.items() if k not in ['__builtins__']
)
# Make the paths relative to the settings file
paths = ['source', 'destination', 'watermark']
if os.path.isdir(join(settings_path, settings['theme'])) and \
os.path.isdir(join(settings_path, settings['theme'],
'templates')):
if os.path.isdir(join(settings_path, settings['theme'])) and os.path.isdir(
join(settings_path, settings['theme'], 'templates')
):
paths.append('theme')
for p in paths:
@ -161,8 +163,10 @@ def read_settings(filename=None):
w, h = settings[key]
if h > w:
settings[key] = (h, w)
logger.warning("The %s setting should be specified with the "
"largest value first.", key)
logger.warning(
"The %s setting should be specified with the largest value first.",
key,
)
if not settings['img_processor']:
logger.info('No Processor, images will not be resized')

13
sigal/utils.py

@ -25,9 +25,7 @@ from urllib.parse import quote
from markdown import Markdown
from markupsafe import Markup
VIDEO_MIMES = {'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.ogv': 'video/ogg'}
VIDEO_MIMES = {'.mp4': 'video/mp4', '.webm': 'video/webm', '.ogv': 'video/ogg'}
MD = None
@ -86,9 +84,10 @@ def read_markdown(filename):
text = f.read()
if MD is None:
MD = Markdown(extensions=['markdown.extensions.meta',
'markdown.extensions.tables'],
output_format='html5')
MD = Markdown(
extensions=['markdown.extensions.meta', 'markdown.extensions.tables'],
output_format='html5',
)
else:
MD.reset()
# When https://github.com/Python-Markdown/markdown/pull/672
@ -123,7 +122,7 @@ def get_mime(ext):
class cached_property:
""" A property that is only computed once per instance and then replaces
"""A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
property.
Source:

108
sigal/video.py

@ -45,27 +45,27 @@ def check_subprocess(cmd, source, outname=None):
try:
res = subprocess.run(cmd, capture_output=True)
except KeyboardInterrupt:
logger.debug('Process terminated, removing file %s', outname)
logger.debug("Process terminated, removing file %s", outname)
if outname and os.path.isfile(outname):
os.remove(outname)
raise
if res.returncode:
logger.debug('STDOUT:\n %s', res.stdout.decode('utf8'))
logger.debug('STDERR:\n %s', res.stderr.decode('utf8'))
logger.debug("STDOUT:\n %s", res.stdout.decode("utf8"))
logger.debug("STDERR:\n %s", res.stderr.decode("utf8"))
if outname and os.path.isfile(outname):
logger.debug('Removing file %s', outname)
logger.debug("Removing file %s", outname)
os.remove(outname)
raise SubprocessException('Failed to process ' + source)
raise SubprocessException("Failed to process " + source)
def video_size(source, converter='ffmpeg'):
def video_size(source, converter="ffmpeg"):
"""Return the dimensions of the video."""
res = subprocess.run([converter, '-i', source], stderr=subprocess.PIPE)
stderr = res.stderr.decode('utf8', errors='ignore')
pattern = re.compile(r'Stream.*Video.* ([0-9]+)x([0-9]+)')
res = subprocess.run([converter, "-i", source], stderr=subprocess.PIPE)
stderr = res.stderr.decode("utf8", errors="ignore")
pattern = re.compile(r"Stream.*Video.* ([0-9]+)x([0-9]+)")
match = pattern.search(stderr)
rot_pattern = re.compile(r'rotate\s*:\s*-?(90|270)')
rot_pattern = re.compile(r"rotate\s*:\s*-?(90|270)")
rot_match = rot_pattern.search(stderr)
if match:
@ -87,7 +87,7 @@ def get_resize_options(source, converter, output_size):
logger = logging.getLogger(__name__)
w_src, h_src = video_size(source, converter=converter)
w_dst, h_dst = output_size
logger.debug('Video size: %i, %i -> %i, %i', w_src, h_src, w_dst, h_dst)
logger.debug("Video size: %i, %i -> %i, %i", w_src, h_src, w_dst, h_dst)
# do not resize if input dimensions are smaller than output dimensions
if w_src <= w_dst and h_src <= h_dst:
@ -97,10 +97,10 @@ def get_resize_options(source, converter, output_size):
# + I made a drawing on paper to figure this out
if h_dst * w_src < h_src * w_dst:
# biggest fitting dimension is height
resize_opt = ['-vf', "scale=trunc(oh*a/2)*2:%i" % h_dst]
resize_opt = ["-vf", "scale=trunc(oh*a/2)*2:%i" % h_dst]
else:
# biggest fitting dimension is width
resize_opt = ['-vf', "scale=%i:trunc(ow/a/2)*2" % w_dst]
resize_opt = ["-vf", "scale=%i:trunc(ow/a/2)*2" % w_dst]
return resize_opt
@ -120,9 +120,9 @@ def generate_video_pass(converter, source, options, outname=None):
outname_opt = [] if not outname else [outname]
# Encoding options improved, thanks to
# http://ffmpeg.org/trac/ffmpeg/wiki/vpxEncodingGuide
cmd = [converter, '-i', source, '-y'] # -y to overwrite output files
cmd = [converter, "-i", source, "-y"] # -y to overwrite output files
cmd += options + outname_opt
logger.debug('Processing video: %s', ' '.join(cmd))
logger.debug("Processing video: %s", " ".join(cmd))
check_subprocess(cmd, source, outname=outname)
@ -137,60 +137,62 @@ def generate_video(source, outname, settings):
"""
logger = logging.getLogger(__name__)
video_format = settings.get('video_format')
options = settings.get(video_format + '_options')
second_pass_options = settings.get(video_format + '_options_second_pass')
video_always_convert = settings.get('video_always_convert')
converter = settings['video_converter']
video_format = settings.get("video_format")
options = settings.get(video_format + "_options")
second_pass_options = settings.get(video_format + "_options_second_pass")
video_always_convert = settings.get("video_always_convert")
converter = settings["video_converter"]
resize_opt = []
if settings.get("video_size"):
resize_opt = get_resize_options(source, converter,
settings['video_size'])
resize_opt = get_resize_options(source, converter, settings["video_size"])
base, src_ext = splitext(source)
base, dst_ext = splitext(outname)
if dst_ext == src_ext and not resize_opt and not video_always_convert:
logger.debug('For %s, the source and destination extension are the '
'same, there is no resizing to be done, and '
'video_always_convert is False, so the output is '
' being copied', outname)
logger.debug(
"For %s, the source and destination extension are the "
"same, there is no resizing to be done, and "
"video_always_convert is False, so the output is "
" being copied",
outname,
)
shutil.copy(source, outname)
return
final_pass_options = _get_empty_if_none_else_variable(options) + resize_opt
if second_pass_options:
generate_video_pass(converter, source, final_pass_options)
final_second_pass_options = _get_empty_if_none_else_variable(
second_pass_options) + resize_opt
generate_video_pass(converter, source,
final_second_pass_options, outname)
final_second_pass_options = (
_get_empty_if_none_else_variable(second_pass_options) + resize_opt
)
generate_video_pass(converter, source, final_second_pass_options, outname)
else:
generate_video_pass(converter, source, final_pass_options, outname)
def generate_thumbnail(source, outname, box, delay, fit=True, options=None,
converter='ffmpeg'):
def generate_thumbnail(
source, outname, box, delay, fit=True, options=None, converter="ffmpeg"
):
"""Create a thumbnail image for the video source, based on ffmpeg."""
logger = logging.getLogger(__name__)
tmpfile = outname + ".tmp.jpg"
# dump an image of the video
cmd = [converter, '-i', source, '-an', '-r', '1',
'-ss', str(delay), '-vframes', '1', '-y', tmpfile]
logger.debug('Create thumbnail for video: %s', ' '.join(cmd))
cmd = [converter, "-i", source, "-an", "-r", "1"]
cmd += ["-ss", str(delay), "-vframes", "1", "-y", tmpfile]
logger.debug("Create thumbnail for video: %s", " ".join(cmd))
check_subprocess(cmd, source, outname)
# Sometimes ffmpeg fails with returncode zero but without producing an
# output file Thus, we need to check if an output file was created. If
# not, assume ffmpeg failed
if not os.path.isfile(tmpfile):
logger.debug('Thumbnail generation failed. Likely due to very short '
'video length.')
cmd = [converter, '-i', source, '-an', '-r', '1',
'-ss', '0', '-vframes', '1', '-y', tmpfile]
logger.debug('Retry to create thumbnail for video: %s', ' '.join(cmd))
logger.debug("Thumbnail generation failed. Likely due to short video length.")
cmd = [converter, "-i", source, "-an", "-r", "1"]
cmd += ["-ss", "0", "-vframes", "1", "-y", tmpfile]
logger.debug("Retry to create thumbnail for video: %s", " ".join(cmd))
check_subprocess(cmd, source, outname)
# use the generate_thumbnail function from sigal.image
@ -205,16 +207,16 @@ def process_video(media):
settings = media.settings
try:
if settings['use_orig'] and is_valid_html5_video(media.src_ext):
utils.copy(media.src_path, media.dst_path,
symlink=settings['orig_link'])
if settings["use_orig"] and is_valid_html5_video(media.src_ext):
utils.copy(media.src_path, media.dst_path, symlink=settings["orig_link"])
else:
valid_formats = ['mp4', 'webm']
video_format = settings['video_format']
valid_formats = ["mp4", "webm"]
video_format = settings["video_format"]
if video_format not in valid_formats:
logger.error('Invalid video_format. Please choose one of: %s',
valid_formats)
logger.error(
"Invalid video_format. Please choose one of: %s", valid_formats
)
raise ValueError
generate_video(media.src_path, media.dst_path, settings)
except Exception:
@ -223,16 +225,16 @@ def process_video(media):
else:
return Status.FAILURE
if settings['make_thumbs']:
if settings["make_thumbs"]:
try:
generate_thumbnail(
media.dst_path,
media.thumb_path,
settings['thumb_size'],
settings['thumb_video_delay'],
fit=settings['thumb_fit'],
options=settings['jpg_options'],
converter=settings['video_converter']
settings["thumb_size"],
settings["thumb_video_delay"],
fit=settings["thumb_fit"],
options=settings["jpg_options"],
converter=settings["video_converter"],
)
except Exception:
if logger.getEffectiveLevel() == logging.DEBUG:

90
sigal/writer.py

@ -34,51 +34,52 @@ from jinja2.exceptions import TemplateNotFound
from . import signals
from .utils import url_from_path
THEMES_PATH = os.path.normpath(os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'themes'))
THEMES_PATH = os.path.normpath(
os.path.join(os.path.abspath(os.path.dirname(__file__)), "themes")
)
class AbstractWriter:
template_file = None
def __init__(self, settings, index_title=''):
def __init__(self, settings, index_title=""):
self.settings = settings
self.output_dir = settings['destination']
self.theme = settings['theme']
self.output_dir = settings["destination"]
self.theme = settings["theme"]
self.index_title = index_title
self.logger = logging.getLogger(__name__)
# search the theme in sigal/theme if the given one does not exists
if not os.path.exists(self.theme) or \
not os.path.exists(os.path.join(self.theme, 'templates')):
if not os.path.exists(self.theme) or not os.path.exists(
os.path.join(self.theme, "templates")
):
self.theme = os.path.join(THEMES_PATH, self.theme)
if not os.path.exists(self.theme):
raise Exception("Impossible to find the theme %s" % self.theme)
self.logger.info("Theme : %s", self.theme)
theme_relpath = os.path.join(self.theme, 'templates')
default_loader = FileSystemLoader(os.path.join(THEMES_PATH, 'default',
'templates'))
theme_relpath = os.path.join(self.theme, "templates")
default_loader = FileSystemLoader(
os.path.join(THEMES_PATH, "default", "templates")
)
# setup jinja env
env_options = {'trim_blocks': True, 'autoescape': True}
env_options = {"trim_blocks": True, "autoescape": True}
try:
if tuple(int(x) for x in jinja2.__version__.split('.')) >= (2, 7):
env_options['lstrip_blocks'] = True
if tuple(int(x) for x in jinja2.__version__.split(".")) >= (2, 7):
env_options["lstrip_blocks"] = True
except ValueError:
pass
env = Environment(
loader=ChoiceLoader([
FileSystemLoader(theme_relpath),
default_loader, # implicit inheritance
PrefixLoader({'!default': default_loader}) # explicit one
]),
**env_options
)
loaders = [
FileSystemLoader(theme_relpath),
default_loader, # implicit inheritance
PrefixLoader({"!default": default_loader}), # explicit one
]
env = Environment(loader=ChoiceLoader(loaders), **env_options)
# handle optional filters.py
filters_py = os.path.join(self.theme, 'filters.py')
filters_py = os.path.join(self.theme, "filters.py")
if os.path.exists(filters_py):
mod = importlib.import_module(filters_py)
for name in dir(mod):
@ -89,40 +90,45 @@ class AbstractWriter:
self.template = env.get_template(self.template_file)
except TemplateNotFound:
self.logger.error(
'The template %s was not found in template folder %s.',
self.template_file, theme_relpath)
"The template %s was not found in template folder %s.",
self.template_file,
theme_relpath,
)
sys.exit(1)
# Copy the theme files in the output dir
self.theme_path = os.path.join(self.output_dir, 'static')
self.theme_path = os.path.join(self.output_dir, "static")
if os.path.isdir(self.theme_path):
shutil.rmtree(self.theme_path)
# FIXME: use dirs_exist_ok when minimum Python is 3.8
shutil.copytree(os.path.join(self.theme, 'static'), self.theme_path)
shutil.copytree(os.path.join(self.theme, "static"), self.theme_path)
if self.settings['user_css']:
if not os.path.exists(self.settings['user_css']):
self.logger.error('CSS file %s could not be found',
self.settings['user_css'])
if self.settings["user_css"]:
if not os.path.exists(self.settings["user_css"]):
self.logger.error(
"CSS file %s could not be found", self.settings["user_css"]
)
else:
shutil.copy(self.settings['user_css'], self.theme_path)
shutil.copy(self.settings["user_css"], self.theme_path)
def generate_context(self, album):
"""Generate the context dict for the given path."""
from . import __url__ as sigal_link
self.logger.info("Output album : %r", album)
ctx = {
'album': album,
'index_title': self.index_title,
'settings': self.settings,
'sigal_link': sigal_link,
'theme': {'name': os.path.basename(self.theme),
'url': url_from_path(os.path.relpath(self.theme_path,
album.dst_path))},
"album": album,
"index_title": self.index_title,
"settings": self.settings,
"sigal_link": sigal_link,
"theme": {
"name": os.path.basename(self.theme),
"url": url_from_path(os.path.relpath(self.theme_path, album.dst_path)),
},
}
if self.settings['user_css']:
ctx['user_css'] = os.path.basename(self.settings['user_css'])
if self.settings["user_css"]:
ctx["user_css"] = os.path.basename(self.settings["user_css"])
return ctx
def write(self, album):
@ -132,15 +138,17 @@ class AbstractWriter:
page = self.template.render(**context)
output_file = os.path.join(album.dst_path, album.output_file)
with open(output_file, 'w', encoding='utf-8') as f:
with open(output_file, "w", encoding="utf-8") as f:
f.write(page)
class AlbumListPageWriter(AbstractWriter):
"""Generate an html page for a directory of albums"""
template_file = "album_list.html"
class AlbumPageWriter(AbstractWriter):
"""Generate html pages for a directory of images."""
template_file = "album.html"

48
tests/sample/sigal.conf.py

@ -1,37 +1,43 @@
author = 'John Doe'
title = 'Sigal test gallery ☺'
source = 'pictures'
thumb_suffix = '.tn'
author = "John Doe"
title = "Sigal test gallery ☺"
source = "pictures"
thumb_suffix = ".tn"
keep_orig = True
thumb_video_delay = 5
# img_format = 'jpeg'
links = [('Example link', 'http://example.org'),
('Another link', 'http://example.org')]
links = [
("Example link", "http://example.org"),
("Another link", "http://example.org"),
]
files_to_copy = (('../watermark.png', 'watermark.png'),)
files_to_copy = (("../watermark.png", "watermark.png"),)
plugins = [
'sigal.plugins.adjust',
'sigal.plugins.copyright',
'sigal.plugins.extended_caching',
'sigal.plugins.feeds',
'sigal.plugins.nomedia',
'sigal.plugins.watermark',
'sigal.plugins.zip_gallery',
"sigal.plugins.adjust",
"sigal.plugins.copyright",
"sigal.plugins.extended_caching",
"sigal.plugins.feeds",
"sigal.plugins.nomedia",
"sigal.plugins.watermark",
"sigal.plugins.zip_gallery",
]
copyright = '© An example copyright message'
adjust_options = {'color': 0.9, 'brightness': 1.0,
'contrast': 1.0, 'sharpness': 0.0}
watermark = 'watermark.png'
copyright = "© An example copyright message"
adjust_options = {
"color": 0.9,
"brightness": 1.0,
"contrast": 1.0,
"sharpness": 0.0,
}
watermark = "watermark.png"
watermark_position = (10, 10)
watermark_opacity = 0.3
theme = 'colorbox'
theme = "colorbox"
thumb_size = (200, 150)
rss_feed = {'feed_url': 'http://127.0.0.1:8000/feed.rss', 'nb_items': 10}
atom_feed = {'feed_url': 'http://127.0.0.1:8000/feed.atom', 'nb_items': 10}
rss_feed = {"feed_url": "http://127.0.0.1:8000/feed.rss", "nb_items": 10}
atom_feed = {"feed_url": "http://127.0.0.1:8000/feed.atom", "nb_items": 10}
# theme = 'photoswipe'
# theme = 'galleria'

33
tests/test_cli.py

@ -19,8 +19,9 @@ def test_init(tmpdir):
result = runner.invoke(init, [config_file])
assert result.exit_code == 1
assert result.output == ("Found an existing config file, will abort to "
"keep it safe.\n")
assert (
result.output == "Found an existing config file, will abort to keep it safe.\n"
)
def test_build(tmpdir, disconnect_signals):
@ -33,10 +34,11 @@ def test_build(tmpdir, disconnect_signals):
try:
result = runner.invoke(init, [config_file])
assert result.exit_code == 0
os.symlink(join(TESTGAL, 'watermark.png'),
join(tmpdir, 'watermark.png'))
os.symlink(join(TESTGAL, 'pictures', 'dir2', 'KeckObservatory20071020.jpg'),
join(tmpdir, 'pictures', 'KeckObservatory20071020.jpg'))
os.symlink(join(TESTGAL, 'watermark.png'), join(tmpdir, 'watermark.png'))
os.symlink(
join(TESTGAL, 'pictures', 'dir2', 'KeckObservatory20071020.jpg'),
join(tmpdir, 'pictures', 'KeckObservatory20071020.jpg'),
)
result = runner.invoke(build, ['-n', 1, '--debug'])
assert result.exit_code == 1
@ -46,8 +48,7 @@ def test_build(tmpdir, disconnect_signals):
result = runner.invoke(build, ['foo', '-n', 1, '--debug'])
assert result.exit_code == 1
result = runner.invoke(build, ['pictures', 'pictures/out',
'-n', 1, '--debug'])
result = runner.invoke(build, ['pictures', 'pictures/out', '-n', 1, '--debug'])
assert result.exit_code == 1
with open(config_file) as f:
@ -72,12 +73,13 @@ atom_feed = {'feed_url': 'http://example.org/feed.atom', 'nb_items': 10}
with open(config_file, 'w') as f:
f.write(text)
result = runner.invoke(build, ['pictures', 'build',
'--title', 'Testing build',
'-n', 1, '--debug'])
result = runner.invoke(
build, ['pictures', 'build', '--title', 'Testing build', '-n', 1, '--debug']
)
assert result.exit_code == 0
assert os.path.isfile(join(tmpdir, 'build', 'thumbnails',
'KeckObservatory20071020.jpg'))
assert os.path.isfile(
join(tmpdir, 'build', 'thumbnails', 'KeckObservatory20071020.jpg')
)
assert os.path.isfile(join(tmpdir, 'build', 'feed.atom'))
assert os.path.isfile(join(tmpdir, 'build', 'feed.rss'))
assert os.path.isfile(join(tmpdir, 'build', 'watermark.png'))
@ -121,8 +123,9 @@ def test_set_meta(tmpdir):
result = runner.invoke(set_meta, [str(testdir), "title", "testing"])
assert result.exit_code == 2
result = runner.invoke(set_meta, [str(testdir.join("non-existant.jpg")),
"title", "testing"])
result = runner.invoke(
set_meta, [str(testdir.join("non-existant.jpg")), "title", "testing"]
)
assert result.exit_code == 1
result = runner.invoke(set_meta, [str(testfile), "title", "testing"])

57
tests/test_compress_assets_plugin.py

@ -36,44 +36,53 @@ def walk_destination(destination, suffixes, compress_suffix):
for path, dirs, files in os.walk(destination):
for file in files:
original_filename = os.path.join(path, file)
compressed_filename = '{}.{}'.format(os.path.join(path, file),
compress_suffix)
compressed_filename = '{}.{}'.format(
os.path.join(path, file), compress_suffix
)
path_exists = os.path.exists(compressed_filename)
file_ext = os.path.splitext(file)[1][1:]
if file_ext in suffixes:
assert path_exists
assert (os.stat(original_filename).st_mtime <=
os.stat(compressed_filename).st_mtime)
assert (
os.stat(original_filename).st_mtime
<= os.stat(compressed_filename).st_mtime
)
else:
assert not path_exists
@pytest.mark.parametrize("method,compress_suffix,test_import",
[('gzip', 'gz', None),
('zopfli', 'gz', 'zopfli.gzip'),
('brotli', 'br', 'brotli')])
def test_compress(disconnect_signals, settings, tmpdir, method,
compress_suffix, test_import):
@pytest.mark.parametrize(
"method,compress_suffix,test_import",
[('gzip', 'gz', None), ('zopfli', 'gz', 'zopfli.gzip'), ('brotli', 'br', 'brotli')],
)
def test_compress(
disconnect_signals, settings, tmpdir, method, compress_suffix, test_import
):
if test_import:
pytest.importorskip(test_import)
# Compress twice to test compression skip based on mtime
for _ in range(2):
compress_options = make_gallery(settings, tmpdir, method)
walk_destination(settings['destination'],
compress_options['suffixes'],
compress_suffix)
@pytest.mark.parametrize("method,compress_suffix,mask",
[('zopfli', 'gz', 'zopfli.gzip'),
('brotli', 'br', 'brotli'),
('__does_not_exist__', 'br', None)])
def test_failed_compress(disconnect_signals, settings, tmpdir,
method, compress_suffix, mask):
walk_destination(
settings['destination'], compress_options['suffixes'], compress_suffix
)
@pytest.mark.parametrize(
"method,compress_suffix,mask",
[
('zopfli', 'gz', 'zopfli.gzip'),
('brotli', 'br', 'brotli'),
('__does_not_exist__', 'br', None),
],
)
def test_failed_compress(
disconnect_signals, settings, tmpdir, method, compress_suffix, mask
):
# See https://medium.com/python-pandemonium/how-to-test-your-imports-1461c1113be1
with mock.patch.dict(sys.modules, {mask: None}):
make_gallery(settings, tmpdir, method)
walk_destination(settings['destination'],
[], # No file should be compressed
compress_suffix)
walk_destination(
settings['destination'], [], compress_suffix # No file should be compressed
)

21
tests/test_encrypt.py

@ -13,9 +13,7 @@ CURRENT_DIR = os.path.dirname(__file__)
def get_key_tag(settings):
options = settings["encrypt_options"]
key = endec.kdf_gen_key(
options["password"],
options["kdf_salt"],
options["kdf_iters"]
options["password"], options["kdf_salt"], options["kdf_iters"]
)
tag = options["gcm_tag"].encode("utf-8")
return (key, tag)
@ -32,7 +30,7 @@ def test_encrypt(settings, tmpdir, disconnect_signals):
'gcm_tag': 'AuTheNTiCatIoNtAG',
'kdf_salt': 'saltysaltsweetysweet',
'kdf_iters': 10000,
'encrypt_symlinked_originals': False
'encrypt_symlinked_originals': False,
}
init_plugins(settings)
@ -58,13 +56,11 @@ def test_encrypt(settings, tmpdir, disconnect_signals):
assert "thumb_size" in encryptCache[cache_key(media)]
assert "encrypted" in encryptCache[cache_key(media)]
encryptedImages = [
media.dst_path,
media.thumb_path
]
encryptedImages = [media.dst_path, media.thumb_path]
if settings["keep_orig"]:
encryptedImages.append(os.path.join(settings["destination"],
media.path, media.big))
encryptedImages.append(
os.path.join(settings["destination"], media.path, media.big)
)
# check if images are encrypted by trying to decrypt
for image in encryptedImages:
@ -79,7 +75,8 @@ def test_encrypt(settings, tmpdir, disconnect_signals):
assert os.path.isfile(os.path.join(settings["destination"], "sw.js"))
# check keycheck file
with open(os.path.join(settings["destination"],
'static', "keycheck.txt"), "rb") as infile:
with open(
os.path.join(settings["destination"], 'static', "keycheck.txt"), "rb"
) as infile:
with BytesIO() as outfile:
endec.decrypt(key, infile, outfile, tag)

3
tests/test_extended_caching.py

@ -37,8 +37,7 @@ def test_load_exif(settings, tmpdir):
settings['destination'] = str(tmpdir)
gal1 = Gallery(settings, ncpu=1)
gal1.albums["exifTest"].medias[2].exif = "blafoo"
gal1.exifCache = {"exifTest/21.jpg": "Foo",
"exifTest/22.jpg": "Bar"}
gal1.exifCache = {"exifTest/21.jpg": "Foo", "exifTest/22.jpg": "Bar"}
extended_caching.load_exif(gal1.albums["exifTest"])

50
tests/test_gallery.py

@ -24,9 +24,12 @@ REF = {
'name': 'test1',
'thumbnail': 'test1/thumbnails/11.tn.jpg',
'subdirs': [],
'medias': ['11.jpg', 'CMB_Timeline300_no_WMAP.jpg',
'flickr_jerquiaga_2394751088_cc-by-nc.jpg',
'example.gif'],
'medias': [
'11.jpg',
'CMB_Timeline300_no_WMAP.jpg',
'flickr_jerquiaga_2394751088_cc-by-nc.jpg',
'example.gif',
],
},
'dir1/test2': {
'title': 'test2',
@ -47,10 +50,12 @@ REF = {
'name': 'dir2',
'thumbnail': 'dir2/thumbnails/m57_the_ring_nebula-587px.tn.jpg',
'subdirs': [],
'medias': ['KeckObservatory20071020.jpg',
'Hubble Interacting Galaxy NGC 5257.jpg',
'Hubble ultra deep field.jpg',
'm57_the_ring_nebula-587px.jpg'],
'medias': [
'KeckObservatory20071020.jpg',
'Hubble Interacting Galaxy NGC 5257.jpg',
'Hubble ultra deep field.jpg',
'm57_the_ring_nebula-587px.jpg',
],
},
'accentué': {
'title': 'accentué',
@ -64,14 +69,14 @@ REF = {
'name': 'video',
'thumbnail': 'video/thumbnails/example%20video.tn.jpg',
'subdirs': [],
'medias': ['example video.ogv']
'medias': ['example video.ogv'],
},
'webp': {
'title': 'webp',
'name': 'webp',
'thumbnail': 'webp/thumbnails/_MG_7805_lossy80.tn.webp',
'subdirs': [],
'medias': ['_MG_7805_lossy80.webp', '_MG_7808_lossy80.webp']
'medias': ['_MG_7805_lossy80.webp', '_MG_7808_lossy80.webp'],
},
}
@ -122,10 +127,13 @@ def test_media_iptc_override(settings):
# Markdown parsing adds formatting. Let's just focus on content
assert "Markdown description beats iptc" in img_with_md.description
img_no_md = Image('1.jpg', 'iptcTest', settings)
assert img_no_md.title == ('Haemostratulus clouds over Canberra - '
'2005-12-28 at 03-25-07')
assert img_no_md.description == (
'"Haemo" because they look like haemoglobin '
assert (
img_no_md.title
== 'Haemostratulus clouds over Canberra - 2005-12-28 at 03-25-07'
)
assert (
img_no_md.description
== '"Haemo" because they look like haemoglobin '
'cells and "stratulus" because I can\'t work out whether '
'they\'re Stratus or Cumulus clouds.\nWe\'re driving down '
'the main drag in Canberra so it\'s Parliament House that '
@ -190,8 +198,9 @@ def test_album(path, album, settings, tmpdir):
assert a.thumbnail == album['thumbnail']
if path == 'video':
assert list(a.images) == []
assert [m.dst_filename for m in a.medias] == \
[album['medias'][0].replace('.ogv', '.webm')]
assert [m.dst_filename for m in a.medias] == [
album['medias'][0].replace('.ogv', '.webm')
]
else:
assert list(a.videos) == []
assert [m.dst_filename for m in a.medias] == album['medias']
@ -258,7 +267,10 @@ def test_medias_sort(settings):
a = Album('dir1/test2', settings, album['subdirs'], album['medias'], gal)
a.sort_medias(settings['medias_sort_attr'])
assert [im.dst_filename for im in a.images] == [
'21.tiff', '22.jpg', 'CMB_Timeline300_no_WMAP.jpg']
'21.tiff',
'22.jpg',
'CMB_Timeline300_no_WMAP.jpg',
]
def test_gallery(settings, tmpdir):
@ -340,7 +352,5 @@ def test_ignores(settings, tmpdir):
assert 'test2' not in os.listdir(join(tmp, 'dir1'))
assert 'accentué' not in os.listdir(tmp)
assert 'CMB_Timeline300_no_WMAP.jpg' not in os.listdir(
join(tmp, 'dir1', 'test1'))
assert 'Hubble Interacting Galaxy NGC 5257.jpg' not in os.listdir(
join(tmp, 'dir2'))
assert 'CMB_Timeline300_no_WMAP.jpg' not in os.listdir(join(tmp, 'dir1', 'test1'))
assert 'Hubble Interacting Galaxy NGC 5257.jpg' not in os.listdir(join(tmp, 'dir2'))

125
tests/test_image.py

@ -6,8 +6,15 @@ from PIL import Image as PILImage
from sigal import init_logging
from sigal.gallery import Image
from sigal.image import (generate_image, generate_thumbnail, get_exif_data,
get_exif_tags, get_iptc_data, get_size, process_image)
from sigal.image import (
generate_image,
generate_thumbnail,
get_exif_data,
get_exif_tags,
get_iptc_data,
get_size,
process_image,
)
from sigal.settings import Status, create_settings
CURRENT_DIR = os.path.dirname(__file__)
@ -25,10 +32,12 @@ def test_process_image(tmpdir):
status = process_image(Image('foo.txt', 'bar', create_settings()))
assert status == Status.FAILURE
settings = create_settings(img_processor='ResizeToFill',
make_thumbs=False,
source=os.path.join(SRCDIR, 'dir2'),
destination=str(tmpdir))
settings = create_settings(
img_processor='ResizeToFill',
make_thumbs=False,
source=os.path.join(SRCDIR, 'dir2'),
destination=str(tmpdir),
)
image = Image(TEST_IMAGE, '.', settings)
status = process_image(image)
assert status == Status.SUCCESS
@ -41,8 +50,9 @@ def test_generate_image(tmpdir):
dstfile = str(tmpdir.join(TEST_IMAGE))
for i, size in enumerate([(600, 600), (300, 200)]):
settings = create_settings(img_size=size, img_processor='ResizeToFill',
copy_exif_data=True)
settings = create_settings(
img_size=size, img_processor='ResizeToFill', copy_exif_data=True
)
options = None if i == 0 else {'quality': 85}
generate_image(SRCFILE, dstfile, settings, options=options)
im = PILImage.open(dstfile)
@ -54,10 +64,12 @@ def test_generate_image_imgformat(tmpdir):
dstfile = str(tmpdir.join(TEST_IMAGE))
for i, outfmt in enumerate(["JPEG", "PNG", "TIFF"]):
settings = create_settings(img_size=(300, 300),
img_processor='ResizeToFill',
copy_exif_data=True,
img_format=outfmt)
settings = create_settings(
img_size=(300, 300),
img_processor='ResizeToFill',
copy_exif_data=True,
img_format=outfmt,
)
options = {'quality': 85}
generate_image(SRCFILE, dstfile, settings, options=options)
im = PILImage.open(dstfile)
@ -70,7 +82,9 @@ def test_resize_image_portrait(tmpdir):
settings = create_settings(img_size=size)
portrait_image = 'm57_the_ring_nebula-587px.jpg'
portrait_src = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'dir2', portrait_image)
portrait_src = os.path.join(
CURRENT_DIR, 'sample', 'pictures', 'dir2', portrait_image
)
portrait_dst = str(tmpdir.join(portrait_image))
generate_image(portrait_src, portrait_dst, settings)
@ -82,7 +96,9 @@ def test_resize_image_portrait(tmpdir):
assert im.size[0] == 200
landscape_image = 'KeckObservatory20071020.jpg'
landscape_src = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'dir2', landscape_image)
landscape_src = os.path.join(
CURRENT_DIR, 'sample', 'pictures', 'dir2', landscape_image
)
landscape_dst = str(tmpdir.join(landscape_image))
generate_image(landscape_src, landscape_dst, settings)
@ -90,8 +106,9 @@ def test_resize_image_portrait(tmpdir):
assert im.size[1] == 200
@pytest.mark.parametrize(("image", "path"), [(TEST_IMAGE, SRCFILE),
(TEST_GIF_IMAGE, SRC_GIF_FILE)])
@pytest.mark.parametrize(
("image", "path"), [(TEST_IMAGE, SRCFILE), (TEST_GIF_IMAGE, SRC_GIF_FILE)]
)
def test_generate_image_passthrough(tmpdir, image, path):
"Test the generate_image function with use_orig=True."
@ -121,8 +138,7 @@ def test_generate_image_processor(tmpdir):
init_logging('sigal')
dstfile = str(tmpdir.join(TEST_IMAGE))
settings = create_settings(img_size=(200, 200),
img_processor='WrongMethod')
settings = create_settings(img_size=(200, 200), img_processor='WrongMethod')
with pytest.raises(SystemExit):
generate_image(SRCFILE, dstfile, settings)
@ -130,8 +146,11 @@ def test_generate_image_processor(tmpdir):
@pytest.mark.parametrize(
("image", "path", "wide_size", "high_size"),
[(TEST_IMAGE, SRCFILE, (200, 133), (150, 100)),
(TEST_GIF_IMAGE, SRC_GIF_FILE, (134, 150), (150, 168))])
[
(TEST_IMAGE, SRCFILE, (200, 133), (150, 100)),
(TEST_GIF_IMAGE, SRC_GIF_FILE, (134, 150), (150, 168)),
],
)
def test_generate_thumbnail(tmpdir, image, path, wide_size, high_size):
"Test the generate_thumbnail function."
@ -141,8 +160,7 @@ def test_generate_thumbnail(tmpdir, image, path, wide_size, high_size):
im = PILImage.open(dstfile)
assert im.size == size
for size, thumb_size in [((200, 150), wide_size),
((150, 200), high_size)]:
for size, thumb_size in [((200, 150), wide_size), ((150, 200), high_size)]:
generate_thumbnail(path, dstfile, size, fit=False)
im = PILImage.open(dstfile)
assert im.size == thumb_size
@ -150,8 +168,9 @@ def test_generate_thumbnail(tmpdir, image, path, wide_size, high_size):
def test_get_exif_tags():
test_image = '11.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1',
test_image)
src_file = os.path.join(
CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1', test_image
)
data = get_exif_data(src_file)
simple = get_exif_tags(data, datetime_format='%d/%m/%Y')
assert simple['fstop'] == 3.9
@ -171,11 +190,16 @@ def test_get_exif_tags():
assert 'focal' not in simple
assert simple['exposure'] == '10'
data = {'ExposureTime': '--', 'DateTimeOriginal': '---',
'GPSInfo': {'GPSLatitude': ((34, 0), (1, 0), (4500, 100)),
'GPSLatitudeRef': 'N',
'GPSLongitude': ((116, 0), (8, 0), (3900, 100)),
'GPSLongitudeRef': 'W'}}
data = {
'ExposureTime': '--',
'DateTimeOriginal': '---',
'GPSInfo': {
'GPSLatitude': ((34, 0), (1, 0), (4500, 100)),
'GPSLatitudeRef': 'N',
'GPSLongitude': ((116, 0), (8, 0), (3900, 100)),
'GPSLongitudeRef': 'W',
},
}
simple = get_exif_tags(data)
assert 'exposure' not in simple
assert 'datetime' not in simple
@ -184,28 +208,30 @@ def test_get_exif_tags():
def test_get_iptc_data(caplog):
test_image = '1.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'iptcTest',
test_image)
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'iptcTest', test_image)
data = get_iptc_data(src_file)
# Title
assert data["title"] == 'Haemostratulus clouds over Canberra - ' + \
'2005-12-28 at 03-25-07'
assert (
data["title"]
== 'Haemostratulus clouds over Canberra - ' + '2005-12-28 at 03-25-07'
)
# Description
assert data["description"] == '"Haemo" because they look like haemoglobin ' + \
'cells and "stratulus" because I can\'t work out whether ' + \
'they\'re Stratus or Cumulus clouds.\nWe\'re driving down ' + \
'the main drag in Canberra so it\'s Parliament House that ' + \
'you can see at the end of the road.'
assert (
data["description"]
== '"Haemo" because they look like haemoglobin '
+ 'cells and "stratulus" because I can\'t work out whether '
+ 'they\'re Stratus or Cumulus clouds.\nWe\'re driving down '
+ 'the main drag in Canberra so it\'s Parliament House that '
+ 'you can see at the end of the road.'
)
# This file has no IPTC data
test_image = '21.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'exifTest',
test_image)
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'exifTest', test_image)
assert get_iptc_data(src_file) == {}
# Headline
test_image = '3.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'iptcTest',
test_image)
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'iptcTest', test_image)
data = get_iptc_data(src_file)
assert data["headline"] == 'Ring Nebula, M57'
@ -239,8 +265,9 @@ def test_exif_copy(tmpdir):
"Test if EXIF data can transferred copied to the resized image."
test_image = '11.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1',
test_image)
src_file = os.path.join(
CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1', test_image
)
dst_file = str(tmpdir.join(test_image))
settings = create_settings(img_size=(300, 400), copy_exif_data=True)
@ -258,8 +285,9 @@ def test_exif_gps(tmpdir):
"""Test reading out correct geo tags"""
test_image = 'flickr_jerquiaga_2394751088_cc-by-nc.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1',
test_image)
src_file = os.path.join(
CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1', test_image
)
dst_file = str(tmpdir.join(test_image))
settings = create_settings(img_size=(400, 300), copy_exif_data=True)
@ -278,8 +306,9 @@ def test_get_size(tmpdir):
"""Test reading out image size"""
test_image = 'flickr_jerquiaga_2394751088_cc-by-nc.jpg'
src_file = os.path.join(CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1',
test_image)
src_file = os.path.join(
CURRENT_DIR, 'sample', 'pictures', 'dir1', 'test1', test_image
)
result = get_size(src_file)
assert result == {'height': 800, 'width': 600}

13
tests/test_plugins.py

@ -18,8 +18,9 @@ def test_plugins(settings, tmpdir, disconnect_signals):
gal = Gallery(settings)
gal.build()
out_html = os.path.join(settings['destination'],
'dir2', 'KeckObservatory20071020.jpg.html')
out_html = os.path.join(
settings['destination'], 'dir2', 'KeckObservatory20071020.jpg.html'
)
assert os.path.isfile(out_html)
for path, dirs, files in os.walk(os.path.join(str(tmpdir), "nomedia")):
@ -39,10 +40,10 @@ def test_nonmedia_files(settings, tmpdir, disconnect_signals):
gal = Gallery(settings)
gal.build()
outfile = os.path.join(settings['destination'],
'nonmedia_files', 'dummy.pdf')
outfile = os.path.join(settings['destination'], 'nonmedia_files', 'dummy.pdf')
assert os.path.isfile(outfile)
outthumb = os.path.join(settings['destination'],
'nonmedia_files', 'thumbnails', 'dummy.tn.jpg')
outthumb = os.path.join(
settings['destination'], 'nonmedia_files', 'thumbnails', 'dummy.tn.jpg'
)
assert os.path.isfile(outthumb)

19
tests/test_settings.py

@ -10,21 +10,24 @@ def test_read_settings(settings):
assert settings['img_size'] == (640, 480)
assert settings['thumb_size'] == (200, 150)
assert settings['thumb_suffix'] == '.tn'
assert settings['source'] == os.path.join(CURRENT_DIR, 'sample',
'pictures')
assert settings['source'] == os.path.join(CURRENT_DIR, 'sample', 'pictures')
def test_get_thumb(settings):
"""Test the get_thumb function."""
tests = [('example.jpg', 'thumbnails/example.tn.jpg'),
('test/example.jpg', 'test/thumbnails/example.tn.jpg'),
('test/t/example.jpg', 'test/t/thumbnails/example.tn.jpg')]
tests = [
('example.jpg', 'thumbnails/example.tn.jpg'),
('test/example.jpg', 'test/thumbnails/example.tn.jpg'),
('test/t/example.jpg', 'test/t/thumbnails/example.tn.jpg'),
]
for src, ref in tests:
assert get_thumb(settings, src) == ref
tests = [('example.webm', 'thumbnails/example.tn.jpg'),
('test/example.mp4', 'test/thumbnails/example.tn.jpg'),
('test/t/example.avi', 'test/t/thumbnails/example.tn.jpg')]
tests = [
('example.webm', 'thumbnails/example.tn.jpg'),
('test/example.mp4', 'test/thumbnails/example.tn.jpg'),
('test/t/example.avi', 'test/t/thumbnails/example.tn.jpg'),
]
for src, ref in tests:
assert get_thumb(settings, src) == ref

3
tests/test_utils.py

@ -66,8 +66,7 @@ def test_read_markdown():
m = utils.read_markdown(src)
assert m['title'] == "Foo Bar"
assert m['meta']['location'][0] == "Bavaria"
assert m['description'] == \
"<p>This is a funny description of this image</p>"
assert m['description'] == "<p>This is a funny description of this image</p>"
def test_read_markdown_empty_file(tmpdir):

23
tests/test_video.py

@ -5,8 +5,7 @@ import pytest
from sigal.gallery import Video
from sigal.settings import Status, create_settings
from sigal.video import (generate_thumbnail, generate_video, process_video,
video_size)
from sigal.video import generate_thumbnail, generate_video, process_video, video_size
CURRENT_DIR = os.path.dirname(__file__)
SRCDIR = os.path.join(CURRENT_DIR, 'sample', 'pictures')
@ -30,10 +29,13 @@ def test_generate_thumbnail(tmpdir):
def test_process_video(tmpdir):
base, ext = os.path.splitext(TEST_VIDEO)
settings = create_settings(video_format='ogv',
use_orig=True, orig_link=True,
source=os.path.join(SRCDIR, 'video'),
destination=str(tmpdir))
settings = create_settings(
video_format='ogv',
use_orig=True,
orig_link=True,
source=os.path.join(SRCDIR, 'video'),
destination=str(tmpdir),
)
video = Video(TEST_VIDEO, '.', settings)
process_video(video)
dstfile = str(tmpdir.join(base + '.ogv'))
@ -102,9 +104,12 @@ def test_second_pass_video(mock_generate_video_pass, fmt, tmpdir):
dstfile = str(tmpdir.join(base + '.' + fmt))
settings_1 = '-c:v libvpx-vp9 -b:v 0 -crf 30 -pass 1 -an -f null dev/null'
settings_2 = f'-c:v libvpx-vp9 -b:v 0 -crf 30 -pass 2 -f {fmt}'
settings_opts = {'video_size': (100, 50), 'video_format': fmt,
fmt + '_options': settings_1.split(" "),
fmt + '_options_second_pass': settings_2.split(" ")}
settings_opts = {
'video_size': (100, 50),
'video_format': fmt,
fmt + '_options': settings_1.split(" "),
fmt + '_options_second_pass': settings_2.split(" "),
}
settings = create_settings(**settings_opts)
generate_video(SRCFILE, dstfile, settings)

14
tests/test_zip.py

@ -28,9 +28,12 @@ def test_zipped_correctly(tmpdir):
assert os.path.isfile(zipf)
zip_file = zipfile.ZipFile(zipf, 'r')
expected = ('11.jpg', 'CMB_Timeline300_no_WMAP.jpg',
'flickr_jerquiaga_2394751088_cc-by-nc.jpg',
'example.gif')
expected = (
'11.jpg',
'CMB_Timeline300_no_WMAP.jpg',
'flickr_jerquiaga_2394751088_cc-by-nc.jpg',
'example.gif',
)
for filename in zip_file.namelist():
assert filename in expected
@ -44,8 +47,9 @@ def test_not_zipped(tmpdir):
# test that the zip file is not created when the .nozip_gallery file
# is present
outpath = str(tmpdir)
gallery = make_gallery(destination=outpath, zip_gallery='archive.zip',
source_dir='dir2')
gallery = make_gallery(
destination=outpath, zip_gallery='archive.zip', source_dir='dir2'
)
gallery.build()
assert not os.path.isfile(os.path.join(outpath, 'archive.zip'))

Loading…
Cancel
Save