Browse Source

Refactor subprocess calls for videos.

pull/107/head
Simon Conseil 12 years ago
parent
commit
054d1e2c00
  1. 12
      sigal/gallery.py
  2. 18
      sigal/utils.py
  3. 78
      sigal/video.py
  4. 10
      tests/test_utils.py

12
sigal/gallery.py

@ -32,6 +32,7 @@ import zipfile
from collections import defaultdict
from datetime import datetime
from functools import partial
from os.path import isfile, join, splitext
from PIL import Image as PILImage
@ -507,9 +508,15 @@ class Gallery(object):
else:
processor = process_file
if sys.stdout.isatty():
log_func = partial(print, colored('->', BLUE))
else:
log_func = self.logger.warn
try:
for album in self.albums.values():
if len(album) > 0:
log_func(str(album))
for files in self.process_dir(album, force=force):
processor(files)
except KeyboardInterrupt:
@ -539,11 +546,6 @@ class Gallery(object):
def process_dir(self, album, force=False):
"""Process a list of images in a directory."""
if sys.stdout.isatty():
print(colored('->', BLUE), str(album))
else:
self.logger.warn(album)
for f in album:
if isfile(f.dst_path) and not force:
self.logger.info("%s exists - skipping", f.filename)

18
sigal/utils.py

@ -21,9 +21,12 @@
# IN THE SOFTWARE.
import codecs
import markdown
import os
import shutil
from markdown import Markdown
from subprocess import Popen, PIPE
from . import compat
def copy(src, dst, symlink=False):
@ -55,7 +58,7 @@ def read_markdown(filename):
with codecs.open(filename, 'r', 'utf-8-sig') as f:
text = f.read()
md = markdown.Markdown(extensions=['meta'])
md = Markdown(extensions=['meta'])
html = md.convert(text)
return {
@ -63,3 +66,14 @@ def read_markdown(filename):
'description': html,
'meta': md.Meta.copy()
}
def call_subprocess(cmd):
"""Wrapper to call ``subprocess.Popen`` and return stdout & stderr."""
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
if not compat.PY2:
stderr = stderr.decode('utf8')
stdout = stdout.decode('utf8')
return p.returncode, stdout, stderr

78
sigal/video.py

@ -27,36 +27,32 @@ import logging
import os
import re
import shutil
import subprocess
from os.path import splitext
from . import compat, image
from . import image
from .settings import get_thumb
from .utils import call_subprocess
def call_subprocess(cmd):
"""Wrapper to call ``subprocess.Popen`` and return stdout & stderr."""
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if not compat.PY2:
stderr = stderr.decode('utf8')
stdout = stdout.decode('utf8')
return p.returncode, stdout, stderr
def check_subprocess(cmd, error_msg=''):
"""Wrapper to call ``subprocess.Popen`` which log errors and raise
a ``subprocess.CalledProcessError`` if the return code is not 0.
def check_subprocess(cmd, source, outname):
"""Run the command to resize the video and remove the output file if the
processing fails.
"""
returncode, stdout, stderr = call_subprocess(cmd)
logger = logging.getLogger(__name__)
try:
returncode, stdout, stderr = call_subprocess(cmd)
except KeyboardInterrupt:
logger.debug('Process terminated, removing file %s', outname)
os.remove(outname)
raise
if returncode:
logger = logging.getLogger(__name__)
logger.error(error_msg)
logger.error('Failed to process ' + source)
logger.debug('STDOUT:\n %s', stdout)
logger.debug('STDERR:\n %s', stderr)
raise subprocess.CalledProcessError(returncode, cmd)
logger.debug('Process failed, removing file %s', outname)
os.remove(outname)
def video_size(source):
@ -74,10 +70,11 @@ def video_size(source):
def generate_video(source, outname, size, options=None):
"""Video processor
"""Video processor.
:param source: path to a video
:param outname: name of the generated video
:param outname: path to the generated video
:param size: size of the resized video `(width, height)`
:param options: array of options passed to ffmpeg
"""
@ -89,8 +86,8 @@ def generate_video(source, outname, size, options=None):
w_dst, h_dst = size
logger.debug('Video size: %i, %i -> %i, %i', w_src, h_src, w_dst, h_dst)
base, src_ext = os.path.splitext(source)
base, dst_ext = os.path.splitext(outname)
base, src_ext = splitext(source)
base, dst_ext = splitext(outname)
if dst_ext == src_ext and w_src <= w_dst and h_src <= h_dst:
logger.debug('Video is smaller than the max size, copying it instead')
shutil.copy(source, outname)
@ -117,16 +114,7 @@ def generate_video(source, outname, size, options=None):
cmd += resize_opt + [outname]
logger.debug('Processing video: %s', ' '.join(cmd))
try:
check_subprocess(cmd, error_msg='Failed to process ' + source)
except subprocess.CalledProcessError:
logger.debug('Process failed, removing file %s', outname)
os.remove(outname)
return
except KeyboardInterrupt:
logger.debug('Process terminated, removing file %s', outname)
os.remove(outname)
raise
check_subprocess(cmd, source, outname)
def generate_thumbnail(source, outname, box, fit=True, options=None):
@ -135,21 +123,11 @@ def generate_thumbnail(source, outname, box, fit=True, options=None):
logger = logging.getLogger(__name__)
tmpfile = outname + ".tmp.jpg"
try:
# dump an image of the video
cmd = ['ffmpeg', '-i', source, '-an', '-r', '1',
'-vframes', '1', '-y', tmpfile]
logger.debug('Create thumbnail for video: %s', ' '.join(cmd))
check_subprocess(cmd, error_msg='Failed to create a thumbnail for ' +
source)
except subprocess.CalledProcessError:
logger.debug('Process failed, removing file %s', outname)
os.remove(outname)
return
except KeyboardInterrupt:
logger.debug('Process terminated, removing file %s', outname)
os.remove(outname)
raise
# dump an image of the video
cmd = ['ffmpeg', '-i', source, '-an', '-r', '1',
'-vframes', '1', '-y', tmpfile]
logger.debug('Create thumbnail for video: %s', ' '.join(cmd))
check_subprocess(cmd, source, outname)
# use the generate_thumbnail function from sigal.image
image.generate_thumbnail(tmpfile, outname, box, fit, options)
@ -161,7 +139,7 @@ def process_video(filepath, outpath, settings):
"""Process a video: resize, create thumbnail."""
filename = os.path.split(filepath)[1]
basename = os.path.splitext(filename)[0]
basename = splitext(filename)[0]
outname = os.path.join(outpath, basename + '.webm')
generate_video(filepath, outname, settings['video_size'],

10
tests/test_utils.py

@ -52,3 +52,13 @@ def test_read_markdown():
assert m['meta']['location'][0] == "Bavaria"
assert m['description'] == \
"<p>This is a funny description of this image</p>"
def test_call_subprocess():
returncode, stdout, stderr = utils.call_subprocess(['echo', 'ok'])
assert returncode == 0
assert stdout == 'ok\n'
assert stderr == ''
returncode, stdout, stderr = utils.call_subprocess(['/usr/bin/false'])
assert returncode == 1

Loading…
Cancel
Save