Source code for ciowarehouse.lib.utils

"""Some various utilities."""
# pylint: disable = too-many-lines

from logging import getLogger
from os import sep, makedirs, rename, remove, walk, scandir
from os.path import isfile, basename, dirname, exists, join, relpath, normpath
from os.path import splitext, commonprefix, isdir
from io import open as io_open, BytesIO
from shutil import rmtree, copy
from datetime import date, datetime
from collections import namedtuple
from unicodedata import normalize, combining
from zipfile import ZIP_DEFLATED, ZipFile, LargeZipFile
from tempfile import NamedTemporaryFile
from re import MULTILINE, UNICODE, sub as re_sub
from urllib.parse import urlparse
from hashlib import md5

from lxml import etree
from transaction import manager

from pyramid.response import FileResponse
from pyramid.httpexceptions import HTTPNotFound, HTTPForbidden

from chrysalio.lib.utils import EXCLUDED_FILES as CIOEXCLUDED_FILES
from chrysalio.lib.utils import copy_content, tostr, tounicode
from chrysalio.lib.utils import mimetype_get, convert_value
from chrysalio.lib.utils import make_id, execute
from chrysalio.lib.xml import load_xml2, validate_xml, relaxng4validation
from chrysalio.lib.restful import restful_call
from chrysalio.lib.log import log_error
from chrysalio.models import get_tm_dbsession
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.includes.cache import cache_namespace
from ..routes import WAREHOUSE_FULL_REFRESH_PATH, WAREHOUSE_REFRESH_PATH
from ..relaxng import RELAXNG_CIOWAREHOUSE, RELAXNG_METAFIELD_TYPES
from .i18n import _, translate

LOG = getLogger(__name__)
CIOWAREHOUSE_NS = 'ciowrh'
CACHE_REGION_USER = 'ciowarehouse_user'
CACHE_REGION_GLOBAL = 'ciowarehouse_global'
INFOS_DIR = '.infos'
LOCAL_DIR = '.local'
IGNORED_PREFIX = '('
IGNORED_SUFFIX = ')'
INDEX_DIR = join(LOCAL_DIR, 'Index')
THUMBNAILS_DIR = join(LOCAL_DIR, 'Thumbnails')
THUMBNAIL_LARGE = 'thumbnail_large'
THUMBNAIL_SMALL = 'thumbnail_small'
THUMBNAIL_MAX_WIDTH = 800
LOCKS_DIR = join(LOCAL_DIR, 'Locks')
REFRESHED_FILE = join(LOCAL_DIR, 'refreshed')
TOREFRESH_FILE = join(LOCAL_DIR, 'torefresh')
REFRESHING_FILE = join(LOCAL_DIR, 'refreshing')
HERE = '__here__'
EXCLUDED_FILES = CIOEXCLUDED_FILES + (INFOS_DIR, LOCAL_DIR, '.info')
AVAILABLE_FILE_TYPES = (
    'css', 'csv', 'directory', 'epub+zip', 'font-eot', 'font-sfnt', 'font-ttf',
    'font-woff', 'gif', 'html', 'indesign-indd', 'indesign-idml', 'javascript',
    'jpeg', 'json', 'mp4', 'mpeg', 'msword', 'ogg', 'pdf', 'plain', 'png',
    'postscript', 'regex', 'svg+xml', 'tiff', 'toml', 'vnd.lotus-organizer',
    'vnd.microsoft.icon', 'vnd.ms-excel', 'vnd.oasis.opendocument.text',
    'vnd.oasis.opendocument.spreadsheet', 'webm', 'x-canon-cr2', 'x-msvideo',
    'x-nikon-nef', 'x-org', 'x-photoshop', 'x-python', 'x-tex', 'x-tar',
    'x-wav', 'xhtml+xml', 'xml', 'xml-cioset', 'xml-dtd', 'xml-relaxng',
    'xslt+xml', 'zip')
FILE_RIGHTS_INDEX = 0
META_RIGHTS_INDEX = 1
MIMETYPES_DIR = normpath(join(dirname(__file__), '..', 'Static', 'MimeTypes'))
NOT_FOUND = normpath(
    join(dirname(__file__), '..', 'Static', 'Images', 'notfound.jpg'))
WKHTMLTOIMAGE = 'wkhtmltoimage'


# =============================================================================
[docs] def mimetype_icon(file_type): """Return the ID of the icon of file type ``file_type``. :param str file_type: File type. For instance ``plain`` . :rtype: str """ return file_type if file_type in AVAILABLE_FILE_TYPES else 'unknown'
# =============================================================================
[docs] def mimetype_url(theme, size, file_type): """Return the URL path of the icon of file type ``file_type``. :param str theme: Current theme, possibly ``''``. :param str size: Size of the icon: ``'normal'`` or ``'small'``. :param str file_type: File type. For instance ``plain`` . :rtype: str """ return '{theme}/ciowarehouse/mimetypes/{size}/{icon}.svg'.format( theme=theme, size=size, icon=mimetype_icon(file_type))
# =============================================================================
[docs] def flag_image(theme, flag_id, active): """Return an URL to an image ON or OFF according to ``active``. :param str theme: Current theme. :param str flag_id: ID of the flag. For instance, ``'favorite'``. :param bool active: State of the flag. :rtype: str """ return '{theme}/ciowarehouse/images/{flag_id}_{active}.png'.format( theme=theme, flag_id=flag_id, active='on' if active else 'off')
# =============================================================================
[docs] def sort_key(name): """Key for sorting according to a name. :param str name: Name to convert. :rtype: str """ if not name: return '' key = normalize('NFKD', name.lower()) return ''.join([k for k in key if not combining(k)])
# =============================================================================
[docs] def make_file_id(path): """Return a digest representation of the file. :param str path: The path of the file i.e. its warehouse_id and its relative path inside the warehouse. :rtype: str """ return md5( normpath(path).encode('utf8'), usedforsecurity=False).hexdigest()
# =============================================================================
[docs] def apply_regex(request, regex_file, content): """Apply al ist of regular expressions from a file. :type request: pyramid.request.Request :param request: Current request. :param str regex_file: Absolute path to the file containing regular expressions. :param str content: Content to process. :rtype: str """ if not regex_file: return content if not exists(regex_file): log_error( request, translate( _( 'Regular expression file "${f}" does not exist.', {'f': regex_file}), lang='en')) return content with io_open(regex_file, 'r', encoding='utf8') as lines: for line in lines: if not line or line[0] == '#' or line[0:7] == '[Regex]': continue pattern, replace = line.partition(' =')[::2] pattern = pattern.strip() if not pattern: continue if pattern[0] in '\'"' and pattern[-1] in '\'"': pattern = pattern[1:-1] replace = replace.strip() if replace and replace[0] in '\'"' and replace[-1] in '\'"': replace = replace[1:-1] # pylint: disable = eval-used if replace.startswith('lambda'): replace = eval(replace) content = re_sub( pattern, replace, content, flags=MULTILINE | UNICODE) return content
# =============================================================================
[docs] def normalize_filename(filename, mode='simple', is_dir=False): """Normalize file name. :param str filename: File name to normalize. :param str mode: (default='simple') Strategy to normalize file name (``'simple'`` or ``'strict'``). :param bool is_dir: (default=False) ``True`` if the file is a directory. """ result = re_sub('\\.+', '.', re_sub('[*?:<>]', '_', filename)) if not is_dir: result = '{0}{1}'.format( splitext(result)[0], splitext(result)[1].lower()) if mode == 'simple': return result result = result.split(sep) for i, chunk in enumerate(result): chunk = re_sub( '_+', '_', re_sub('[  !;:,"\'/«»()\\[\\]–&]', '_', chunk)) chunk = normalize('NFKD', chunk.encode('utf8').decode('utf8')) result[i] = ''.join([k for k in chunk if not combining(k)]) if not is_dir: result[-1] = result[-1].lower() return sep.join(result)
# =============================================================================
[docs] def files2response(request, filenames, common_path=None, download_name=None): """Prepare files for download and return a Pyramid response. :type request: pyramid.request.Request :param request: Current request. :param list filenames: List of absolute paths to files to download. :param str common_path: (optional) Root of all files. :param str download_name: (optional) Visible name during download. :rtype: pyramid.response.FileResponse :return: Return a FileResponse or raise a :class:`pyramid.httpexceptions.HTTPNotFound` exception. """ # Check list filenames = [ k for k in filenames if exists(k) and basename(k) not in EXCLUDED_FILES ] if not filenames: return None # Single file if len(filenames) == 1 and isfile(filenames[0]): return file2response(request, filenames[0], basename(filenames[0])) # Directory or several files common_path = commonprefix(filenames) if common_path is None \ else common_path with NamedTemporaryFile( dir=request.registry.settings.get('temporary')) as tmp: with ZipFile(tmp, 'w', ZIP_DEFLATED) as zip_file: for filename in filenames: # File if isfile(filename): try: zip_file.write( filename, relpath(filename, common_path)) except LargeZipFile: # pragma: nocover zip_file.close() tmp.close() raise HTTPForbidden(comment=_('This file is too big!')) continue # pragma: nocover # Directory for root, dirs, files in walk(filename): for name in dirs: if name in EXCLUDED_FILES: dirs.remove(name) for name in files: if name in EXCLUDED_FILES: continue name = join(root, name) try: zip_file.write(name, relpath(name, common_path)) except LargeZipFile: # pragma: nocover zip_file.close() raise HTTPForbidden( comment=_('This file is too big!')) except IOError as error: # pragma: nocover zip_file.close() tmp.close() raise HTTPNotFound(comment=error) download_name = download_name or '{0}{1}.zip'.format( len(filenames) == 1 and basename(filenames[0]) or basename(normpath(common_path)) or request.registry.settings['site.uid'], '-{0}'.format(datetime.now().isoformat(' ').partition(' ')[0]) if len(filenames) > 1 else '') response = file2response( request, tmp.name, download_name, 'application/zip') return response
# =============================================================================
[docs] def file2response(request, filename, download_name, content_type=None): """Return a Pyramid FileResponse containing the file ``filename``. :type request: pyramid.request.Request :param request: Current request. :param str filename: Absolute path to file to encapsulate. :param str download_name: Visible name during download. :rtype: pyramid.response.FileResponse """ response = FileResponse( filename, request=request, content_type=content_type or mimetype_get(filename)[0]) try: str(download_name).encode('latin-1') response.headerlist.append( ( 'Content-Disposition', 'attachment; filename="{0}"'.format(str(download_name)))) except UnicodeEncodeError: response.headerlist.append( ( 'Content-Disposition', 'attachment; filename="{0}"'.format( make_id(download_name, 'no_accent')))) return response
# =============================================================================
[docs] def file_check_move(warehouse1, file1, warehouse2, file2): """Check files for a move or copy. :type warehouse1: .lib.warehouse.Warehouse :param warehouse1: Warehouse of the source. :param tuple file1: Tuple describing the source file such as ``(directory1, filename1)``. :type warehouse2: .lib.warehouse.Warehouse :param warehouse2: Warehouse of the target. :param tuple file2: Tuple describing the target file such as ``(directory2, filename2)``. :rtype: tuple :return: A tuple such as ``(abs_path1, abs_path2, error)``. """ # Check old file abs_path1 = normpath(join(warehouse1.root, *file1)) if not exists(abs_path1): return None, None, _( 'File ${f} does not exist!', {'f': normpath(join(file1[0], file1[1]))}) # Check new file abs_path2 = normpath(join(warehouse2.root, *file2)) if abs_path1 == abs_path2 or abs_path1 == dirname(abs_path2): return None, None, None if not abs_path2.startswith(warehouse2.root): return None, None, _( 'File ${f} is outside warehouse!', {'f': normpath(join(file2[0], file2[1]))}) if exists(abs_path2): return None, None, _( 'File ${f} already exists!', {'f': normpath(join(file2[0], file2[1]))}) return abs_path1, abs_path2, None
# =============================================================================
[docs] def file_ids2fullnames(request, paging, file_ids): """Convert a list of file IDs into a list of absolute paths. :type request: pyramid.request.Request :param request: Current request. :type paging: chrysalio.lib.paging.Paging :param paging: Paging containing all the files. :param list file_ids: List of file IDs. :rtype: list """ abs_names = [] for pfile in paging: if pfile['file_id'] not in file_ids: continue root = request.registry['modules']['ciowarehouse']\ .warehouse_root(request, pfile['warehouse_id']) if root is not None: abs_names.append( normpath(join(root, pfile['directory'], pfile['file_name']))) return abs_names
# =============================================================================
[docs] def value2str(value): """Convert a value to a string for XML. :param value: Value to convert. :rtype: str """ if isinstance(value, bool): return 'true' if value else 'false' if isinstance(value, (int, float)): return str(value) if isinstance(value, datetime): return value.replace(microsecond=0, tzinfo=None).isoformat() if isinstance(value, date): return value.isoformat() return tostr(value)
# =============================================================================
[docs] def thumbnail_create(image, sizes, abs_thumb_large, watermark=None): """Create a large and a small thumbnail. :type image: PIL.Image :param image: Image to convert into thumbnails. :param tuple sizes: A tuple of two sizes: the size of the large and the one of the small. :param str abs_thumb_large: Absolute path to the large thumbnail file. :type watermark: PIL.Image :param watermark: (optional) Image to convert into thumbnails. :rtype: bool """ if sizes is None: return False thumb_dir = dirname(abs_thumb_large) if exists(thumb_dir): try: rmtree(thumb_dir) except OSError: return False makedirs(thumb_dir, exist_ok=True) try: image.thumbnail(sizes[0]) if watermark: image.paste( watermark, ( image.size[0] - watermark.size[0] - 5, image.size[1] - watermark.size[1] - 5), watermark) image.save(abs_thumb_large) except (IOError, ZeroDivisionError, FileNotFoundError): # pragma: nocover rmtree(thumb_dir) return False # Save small thumbnail image.thumbnail(sizes[1]) image.save( join( thumb_dir, '{0}.{1}'.format(THUMBNAIL_SMALL, abs_thumb_large[-3:]))) return True
# =============================================================================
[docs] def thumbnail_remove(warehouse_root, directory, filename): """Remove thumbnails of a file and prune empty ancestor directories. :param str warehouse_root: Absolute path of warehouse root. :param str directory: Relative path to the directory containing the file. :param str filename: Name of the file to remove. """ abs_thumb = join(warehouse_root, THUMBNAILS_DIR, directory, filename) if exists(abs_thumb): rmtree(abs_thumb) root = join(warehouse_root, THUMBNAILS_DIR) abs_thumb = normpath(dirname(abs_thumb)) while abs_thumb != root and not tuple(scandir(abs_thumb)): rmtree(abs_thumb) abs_thumb = dirname(abs_thumb)
# =============================================================================
[docs] def thumbnail_move( warehouse1_root, file1, warehouse2_root, file2, copy_only=False): """Move thumbnails of a file and prune empty ancestor directories. :param str warehouse1_root: Absolute path of warehouse 1 root. :param tuple file1: A tuple such as ``(directory1, name1)``. :param str warehouse2_root: Absolute path of warehouse 2 root. :param tuple file2: A tuple such as ``(directory2, name2)``. :param bool copy_only: (default=False) If ``True``, copy instead of moving the file. """ abs_thumb1 = join(warehouse1_root, THUMBNAILS_DIR, file1[0], file1[1]) if exists(abs_thumb1): abs_thumb2 = join(warehouse2_root, THUMBNAILS_DIR, file2[0], file2[1]) if exists(abs_thumb2): rmtree(abs_thumb2) makedirs(dirname(abs_thumb2), exist_ok=True) action = copy_content if copy_only else rename action(abs_thumb1, abs_thumb2) if not copy_only: root = join(warehouse1_root, THUMBNAILS_DIR) abs_thumb1 = normpath(dirname(abs_thumb1)) while abs_thumb1 != root and not tuple(scandir(abs_thumb1)): rmtree(abs_thumb1) abs_thumb1 = dirname(abs_thumb1)
# =============================================================================
[docs] def thumbnail_url(request, pfile, size='normal'): """Return the URL of a thumbnail of a file representing by a paging item dictionary. :type request: pyramid.request.Request :param request: Current request. :param dict pfile: A paging file item. :param str size: (default='normal') Size of the icon: ``'normal'`` or ``'small'``. :rtype: str """ if pfile['thumbnail']: path = join(pfile['directory'], pfile['file_name']) if pfile['file_type'] == 'directory': path = join(path, HERE) url = request.route_path( 'file_thumbnail', warehouse_id=pfile['warehouse_id'], path=join( path, '{0}{1}'.format( THUMBNAIL_SMALL if size == 'small' else THUMBNAIL_LARGE, pfile['thumbnail']))) else: url = mimetype_url( theme_static_prefix(request), size, pfile['file_type']) return url
# =============================================================================
[docs] def html2thumbnails(html_file, thumb_dir, sizes, timeout=None): """Convert a HTML file into thumbnails. :param str html_file: Absolute path to the HTML file. :param str thumb_dir: Absolute path to the directory containing the thumbnail. :param tuple sizes: A tuple such as ``((large_width, large_height), (small_width, small_height))``. :param float timeout: (default=None) If set and the process hasn't finished in that time (in seconds), exits with error. """ makedirs(thumb_dir, exist_ok=True) img_file = join(thumb_dir, f'{splitext(basename(html_file))[0]}.jpg') height = int(sizes[0][1] / sizes[0][0] * THUMBNAIL_MAX_WIDTH) cmd = [ 'nice', WKHTMLTOIMAGE, '--log-level', 'none', '--enable-local-file-access', '--width', str(THUMBNAIL_MAX_WIDTH), '--height', str(height), html_file, img_file ] error = execute(cmd, cwd=dirname(html_file), timeout=timeout)[1] if not exists(img_file): # pragma: nocover return error # Large thumbnail execute( [ 'nice', 'convert', img_file, '-geometry', '{0}x{1}>'.format(*sizes[0]), join(thumb_dir, f'{THUMBNAIL_LARGE}.jpg') ]) # Small thumbnail execute( [ 'nice', 'convert', join(thumb_dir, f'{THUMBNAIL_LARGE}.jpg'), '-geometry', '{0}x{1}>'.format(*sizes[1]), join(thumb_dir, f'{THUMBNAIL_SMALL}.jpg') ]) remove(img_file) return None
# =============================================================================
[docs] def restful_params(build, host=None, login=None, key=None): """Return RESTful parameters: host, login, key. :type build: cioservice.lib.build.Build :param build: Current build object. :param str host: (optional) Default RESTful host. :param str login: (optional) Default RESTful login. :param str key: (optional) Default RESTful key. """ host = build.settings.get('restful.host', host) login = build.settings.get('restful.login', login) key = build.settings.get('restful.key', key) or '-' if not host or not login: build.warning(_('Cannot refresh warehouses: incomplete information.')) return None, None, None host = host if urlparse(host).path else '{0}/'.format(host) return host, login, key
# =============================================================================
[docs] def full_refresh_warehouses( build, warehouse_ids, host=None, login=None, key=None): """Launch a full refresh action for each warehouse. :type build: cioservice.lib.build.Build :param build: Current build object. :param set warehouse_ids: Set of warehouses to refresh. :param str host: (optional) Default RESTful host. :param str login: (optional) Default RESTful login. :param str key: (optional) Default RESTful key. """ host, login, key = restful_params(build, host, login, key) if host is None: return for warehouse_id in warehouse_ids: url = '{0}{1}{2}'.format( host, WAREHOUSE_FULL_REFRESH_PATH, warehouse_id) warning = restful_call(url, login, key, {'in_thread': True})[1] if warning is not None: build.warning(warning)
# =============================================================================
[docs] def refresh_warehouse( build, warehouse_id, files, recursive=False, host=None, login=None, key=None): """Launch a full refresh action for some files. :type build: cioservice.lib.build.Build :param build: Current build object. :param set warehouse_id: ID of the warehouse. :param list files: List of paths of files relative to the warehouse root. :param bool recursive: (default=False) Refresh recursively. :param str host: (optional) Default RESTful host. :param str login: (optional) Default RESTful login. :param str key: (optional) Default RESTful key. """ # pylint: disable = too-many-arguments if not files: return host, login, key = restful_params(build, host, login, key) if host is None: return url = '{0}{1}{2}'.format(host, WAREHOUSE_REFRESH_PATH, warehouse_id) data = {'files': files, 'recursive': recursive, 'in_thread': True} warning = restful_call(url, login, key, data)[1] if warning is not None: build.warning(warning)
# =============================================================================
[docs] def move_inter_warehouses( warehouse1, pfile1, warehouse2, pfile2, copy_only=False): """Move or copy a file between 2 differents workspaces. :type warehouse1: .lib.warehouse.Warehouse :param warehouse1: Warehouse of the source. :param dict pfile1: Dictionary describing the source file. :type warehouse2: .lib.warehouse.Warehouse :param warehouse2: Warehouse of the target. :param dict pfile2: Dictionary describing the target file. :param bool copy_only: (default=False) If ``True``, copy instead of moving the file. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ # Prepare files abs_path1, abs_path2, error = file_check_move( warehouse1, (pfile1['directory'], pfile1['file_name']), warehouse2, (join(pfile2['directory'], pfile2['file_name']), pfile1['file_name'])) if abs_path2 is None: return error # Copy file is_dir = isdir(abs_path1) try: if not is_dir: copy(abs_path1, abs_path2) else: copy_content(abs_path1, abs_path2) except OSError: return _( 'File ${f} does not exist!', {'f': join(pfile1['directory'], pfile1['file_name'])}) warehouse2.vcs.add(abs_path2) # Copy metadata abs_path1 = join( warehouse1.root, INFOS_DIR, pfile1['directory'], '{0}.xml'.format(pfile1['file_name'])) if exists(abs_path1): abs_path2 = join( warehouse2.root, INFOS_DIR, pfile2['directory'], pfile2['file_name'], '{0}.xml'.format(pfile1['file_name'])) makedirs(dirname(abs_path2), exist_ok=True) copy(abs_path1, abs_path2) abs_path1 = join( warehouse1.root, INFOS_DIR, pfile1['directory'], pfile1['file_name']) if is_dir and isdir(abs_path1): abs_path2 = join( warehouse2.root, INFOS_DIR, pfile2['directory'], pfile2['file_name'], pfile1['file_name']) if exists(abs_path2): rmtree(abs_path2) copy_content(abs_path1, abs_path2) warehouse2.vcs.add(join(INFOS_DIR, pfile2['directory'])) # Copy thumbnails thumbnail_move( warehouse1.root, (pfile1['directory'], pfile1['file_name']), warehouse2.root, (join(pfile2['directory'], pfile2['file_name']), pfile1['file_name']), copy_only) # Remove from warehouse 1 if not copy_only: warehouse1.vcs.remove(pfile1['directory'], pfile1['file_name']) return None
# =============================================================================
[docs] def infos_load(infos_file, remove_metafields=False, remove_groups=False): """Load an infos XML file. :param str infos_file: Absolute path to the infos XML file. :param bool remove_metafields: if ``True`` metadata fields are removed. :param bool remove_groups: if ``True`` file groups are removed. :rtype: lxml.etree._Element :return: Root element of the infos XML file. """ root_elt = None namespace = RELAXNG_CIOWAREHOUSE['namespace'] if exists(infos_file): root_elt, err = load_xml2( infos_file, relaxng4validation(RELAXNG_CIOWAREHOUSE), parser=etree.XMLParser(remove_blank_text=True)) root_elt = root_elt.getroot() if err is None else None if root_elt is not None and remove_metafields: elt = root_elt.xpath( 'ns0:infos/ns0:metafields', namespaces={'ns0': namespace}) if elt: elt[0].getparent().remove(elt[0]) if root_elt is not None and remove_groups: elt = root_elt.xpath( 'ns0:infos/ns0:groups', namespaces={'ns0': namespace}) if elt: elt[0].getparent().remove(elt[0]) if root_elt is None: root_elt = etree.Element( RELAXNG_CIOWAREHOUSE['root'], version=RELAXNG_CIOWAREHOUSE['version'], nsmap={None: namespace}) etree.SubElement(root_elt, 'infos') return root_elt
# =============================================================================
[docs] def infos_get_metadata(root_elt, meta_id): """Return the value of a metadata in its correct type or ``None``. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param str meta_id: ID of the metadata to retrieve. :return: Value of metadata or ``None``. """ namespace = RELAXNG_CIOWAREHOUSE['namespace'] field_elt = root_elt.xpath( 'ns0:infos/ns0:metafields/ns0:metafield[@id="{0}"]'.format(meta_id), namespaces={'ns0': namespace}) if not field_elt: return None return convert_value(field_elt[0].get('type'), field_elt[0].text)
# =============================================================================
[docs] def infos_set_metadata(root_elt, metafields): """Update metadata fields. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param list metafields: List of metadata fields. Each item is a tuple such as ``(id, type, value)``. """ namespace = RELAXNG_CIOWAREHOUSE['namespace'] metafields_elt = root_elt.xpath( 'ns0:infos/ns0:metafields', namespaces={'ns0': namespace}) if not metafields_elt: metafields_elt = etree.SubElement(root_elt[0], 'metafields') else: metafields_elt = metafields_elt[0] for item in metafields: meta_type = { 'list': 'string', 'palette': 'color' }.get(item[1], item[1]) if meta_type not in RELAXNG_METAFIELD_TYPES and item[2] is not None: continue elt = metafields_elt.xpath( 'ns0:metafield[@id="{0}"]'.format(item[0]), namespaces={'ns0': namespace}) if item[2] is None: if elt: elt[0].getparent().remove(elt[0]) continue if not elt: elt = etree.SubElement(metafields_elt, 'metafield') elt.set('id', item[0]) else: elt = elt[0] elt.set('type', meta_type) elt.text = tounicode(value2str(item[2]))
# =============================================================================
[docs] def infos_save(root_elt, infos_file, request=None): """Save an infos XML file. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param str infos_file: Absolute path to the information file. :type request: pyramid.request.Request :param request: (optional) Current request. :rtype: bool """ # Clean up XML namespace = RELAXNG_CIOWAREHOUSE['namespace'] tree = etree.parse( BytesIO( etree.tostring( root_elt, pretty_print=True, encoding='utf-8', xml_declaration=True))) elt = tree.xpath('ns0:infos/ns0:metafields', namespaces={'ns0': namespace}) if elt and not elt[0].xpath('*'): elt[0].getparent().remove(elt[0]) elt = tree.xpath('ns0:infos/ns0:groups', namespaces={'ns0': namespace}) if elt and not elt[0].xpath('*'): elt[0].getparent().remove(elt[0]) if not tree.xpath('ns0:infos/*', namespaces={'ns0': namespace}): if exists(infos_file): remove(infos_file) return True return False # Validate XML error = validate_xml(tree, relaxng4validation(RELAXNG_CIOWAREHOUSE)) if error is not None: # pragma: nocover if request is not None: log_error( request, 'Informations on {0}: {1}'.format( basename(infos_file), request.localizer.translate(error))) request.session.flash(error, 'alert') else: LOG.error(error) return False # Save XML makedirs(dirname(infos_file), exist_ok=True) try: tree.write( infos_file, pretty_print=True, encoding='utf-8', xml_declaration=True) except IOError: # pragma: nocover return False return True
# =============================================================================
[docs] def build_callback(registry, build_env, result, request=None): """Function called after the build is completed. :param registry: Application registry. :param dict build_env: Build environment. :param dict result: Result of the processing. See: :class:`cioservice.lib.build.Build`. :type request: pyramid.request.Request :param request: (optional) Current request. """ # Something to do? params = build_env['params'] service = registry['services'].get(build_env['job']['service_id']) if service is None \ or not service.need_write_permission(build_env['job']['context']) \ or not result.get('output') \ or not params['settings'].get('output.home.id'): return # Get the warehouse ciowarehouse = registry['modules']['ciowarehouse'] if request is None: with manager: request = namedtuple( 'Request', 'registry session dbsession has_permission')( registry=registry, session={}, dbsession=get_tm_dbsession( registry['dbsession_factory'], manager), has_permission=lambda x: bool( 'caller' in params and 'warehouse.creator' in params[ 'caller']['principals'])) if 'caller' in params: request.session['user'] = params['caller'] warehouse = ciowarehouse.warehouse( request, params['settings']['output.home.id']) else: warehouse = ciowarehouse.warehouse( request, params['settings']['output.home.id']) if warehouse is None: return # Commit generator = translate( build_env['job']['context'] or service.label, params.get('lang')) caller = params.get('caller') or request.session.get('user') warehouse.vcs.pull() warehouse.vcs.add() warehouse.vcs.commit( translate( _('Generated by ${g}', {'g': generator}), params.get('lang')), caller['name'] if caller is not None else 'anonymous', caller['email'] if caller is not None else '') # Refresh the warehouse paths = warehouse.to_refresh( build_env['params'].get('files', []), (result['output'], ) + result.get('to_refresh', ())) warehouse.refresh( request, paths, recursive=True, dbsession=params.get('dbsession'))
# =============================================================================
[docs] def cache_user_renderings(namespace_prefix, region=None): """A decorator to retrieve in the user cache the dictionary renderings. :param str namespace_prefix: Prefix of the cache namespace. :param str region: (optional) Name of region. """ def _decorated(method): """Decoration of the method `method`.""" def _wrapper(class_, request, warehouse, *args, **kwargs): """Use of user cache.""" if not hasattr(class_, 'uid'): raise AttributeError('Class must have a "uid" attribute!') namespace = cache_namespace(namespace_prefix, warehouse.uid) key = 'renderings:{}'.format(class_.uid) renderings = request.registry['cache_user'].get( request, key, namespace) if renderings is not None: return renderings renderings_method = method.__func__ \ if isinstance(method, classmethod) else method renderings = renderings_method( class_, request, warehouse, *args, **kwargs) request.registry['cache_user'].set( request, key, renderings, namespace, region) return renderings return _wrapper return _decorated
# =============================================================================
[docs] def cache_user_seeds(namespace_prefix, region=None): """A decorator to retrieve in the user cache the dictionary of seeds. :param str namespace_prefix: Prefix of the cache namespace. :param str region: (optional) Name of region. """ def _decorated(method): """Decoration of the method `method`.""" def _wrapper(class_, request, *args, **kwargs): """Use of user cache.""" if not hasattr(class_, 'uid'): raise AttributeError('Class must have a "uid" attribute!') namespace = cache_namespace(namespace_prefix, class_.uid) key = 'seeds' seeds = request.registry['cache_user'].get(request, key, namespace) if seeds is not None: return seeds seeds_method = method.__func__ \ if isinstance(method, classmethod) else method seeds = seeds_method(class_, request, *args, **kwargs) request.registry['cache_user'].set( request, key, seeds, namespace, region) return seeds return _wrapper return _decorated