Source code for ciowarehouse.lib.handler

"""A file handler is a class to make thumbnail and to provide index values for
a particular type of file.
"""

from __future__ import annotations
from os import stat, makedirs
from os.path import join, exists, getmtime, isfile, dirname, basename, splitext
from os.path import normpath, isabs, isdir
from logging import getLogger
from re import compile as re_compile
from time import time
from shutil import rmtree, copy

from chameleon import PageTemplateFile
from pytomlpp import loads as toml_loads

from pyramid.asset import abspath_from_asset_spec

from chrysalio.lib.utils import copy_content_re
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.log import log_error, log_warning
from chrysalio.includes.themes import theme_static_prefix, theme_has_static
from ..lib.i18n import _, translate
from ..lib.utils import CIOWAREHOUSE_NS, CACHE_REGION_USER, CACHE_REGION_GLOBAL
from ..lib.utils import make_file_id, cache_user_renderings
from ..models.dbhandler import DBHandler

LOG = getLogger(__name__)
EXCLUDED_FILES_RE = re_compile('(__pycache__|\\.pyc?$)')
LAYOUT_VIEW_PT = 'ciowarehouse:Templates/handler_layout_view.pt'
LAYOUT_EDIT_PT = 'ciowarehouse:Templates/handler_layout_edit.pt'
CHRYSALIO_JS = ('/js/js.cookie.js', '/js/jquery.js', '/js/chrysalio.js')


# =============================================================================
[docs] class Handler(object): """Base class for file handlers. This object provides thumbnail, index values and rendering for a particular type of file. ``viewings`` and ``editings`` attributes are lists of dictionaries with the following keys: * ``'label'``: (required) label of the rendering * ``'template'``: (required) Chameleon template to use * ``'css'``: list of paths to CSS file * ``'js'``: list of path to Javascript file * ``'only_groups'``: set of groups of authorized users for editing """ # pylint: disable = too-many-instance-attributes uid: str | None = None label: str | None = None extensions: tuple = () file_regex: str | None = None content_regex = None imports: tuple = () seeds: dict = {} viewings: tuple = () editings: tuple = () thumbnailing: dict | None = None panel = None _home = normpath(join(dirname(__file__), '..', 'handlers')) # ------------------------------------------------------------------------- def __init__(self): """Constructor method.""" self.home = self._home self._develop = False self._indexers = None # Regular expressions if self.file_regex is not None: self.file_regex = re_compile(self.file_regex) if self.content_regex is not None: self.content_regex = re_compile(self.content_regex) # -------------------------------------------------------------------------
[docs] @classmethod def register(cls, environment, handler_class, **kwargs): """Method to register the handler. :type environment: :class:`pyramid.config.Configurator` or :class:`dict` :param environment: Object used to do configuration declaration within the application or a ScriptRegistry to simulate the application registry. :param handler_class: Handler class. :param dict kwargs: Keyworded arguments :rtype: .lib.handler.Handler """ # Server mode (environment == configurator) handler = handler_class(**kwargs) if hasattr(environment, 'registry'): if 'handlers' not in environment.registry: environment.registry['handlers'] = [] environment.registry['handlers'].insert(0, handler) # Populate/backup/execute mode (environment == ScriptRegistry) else: if 'handlers' not in environment: environment['handlers'] = [] environment['handlers'].insert(0, handler) return handler
# -------------------------------------------------------------------------
[docs] def initialize(self, dbsession, config): """Initialize the handler reading its possibly parameters in database. :param dbsession: SQLAlchemy session. :param dict config: Dictionary with keys ``'root'`` and ``'develop'`` where ``'root'`` is the absolute path to the root directory for handler with imports. """ self._develop = config.get('develop') == 'true' # Handler directory if config.get('root') and self.imports: self.home = normpath(join(config['root'], self.uid)) # Handler parameters if self._indexers is not None: return self._indexers = {} dbhandler = dbsession.query(DBHandler).filter_by( handler_id=self.uid).first() if dbhandler is None: return # Indexer characteristics for dbindexer in dbhandler.indexers: indexer = {} if dbindexer.argument: indexer['argument'] = dbindexer.argument if dbindexer.limit: indexer['limit'] = dbindexer.limit self._indexers[dbindexer.indexfield_id] = indexer
# -------------------------------------------------------------------------
[docs] def install(self, force=False): """Install a handler in an alternative home directory with its imports. :param bool force: If ``True``, force installation. """ if self._home == self.home: return if exists(self.home): if not force: return rmtree(self.home) makedirs(self.home, exist_ok=True) # Copy imports for import_ in self.imports: directory = normpath(join(self.home, import_[1])) if not directory.startswith(self.home): continue path = import_[0] if ':' in path: path = abspath_from_asset_spec(path) if not isabs(path): path = join(self._home, path) if isdir(path): copy_content_re( path, join(self.home, directory), EXCLUDED_FILES_RE) elif EXCLUDED_FILES_RE.search( basename(path)) is None and exists(path): copy(path, join(self.home, directory)) # Copy original copy_content_re(self._home, self.home, EXCLUDED_FILES_RE)
# -------------------------------------------------------------------------
[docs] def match(self, extension, filename, content=None, extensions_subset=None): """Check whether this file handler matches with the file ``filename``. :param str extension: File extension. :param str filename: Absolute path to the file. :param str content: (optional) Content of the file. :param tuple extensions_subset: (optional) Subset of extensions (for instance, only Web-compatible extensions). :rtype: tuple :return: A tuple such as ``(match, content)`` where ``match`` is ``True`` if the handler matches. """ extensions = extensions_subset or self.extensions if extensions and extension.lower() not in extensions: return False, content if self.file_regex is not None and self.file_regex.search( basename(filename)) is None: return False, content if self.content_regex is None: return True, content if not isfile(filename): return False, content if content is None: with open(filename, 'rb') as hdl: content = hdl.read() return self.content_regex.search(content) is not None, content
# -------------------------------------------------------------------------
[docs] def infos_complete_fields( self, warehouse, path, abs_path, whoosh_fields, request=None): """Complete the ``whoosh_fields`` dictionary with information found in the infos file. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str path: Relative path of the file. :param str abs_path: Absolute path of the file. :param dict whoosh_fields: Dictionary of Whoosh fields to complete. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. """ # pylint: disable = unused-argument # Look for metadata fields to process meta_ids = \ (set(warehouse.indexfields.keys()) - set(whoosh_fields.keys())) \ & set(warehouse.metafields.keys()) # Check for field compatibility for field_id in tuple(meta_ids): metafield = warehouse.metafields[field_id] indexfield = warehouse.indexfields[field_id] not_compatible = \ (metafield['type'] == 'string' and indexfield['whoosh_type'] not in ( 'ID', 'TEXT', 'KEYWORD', 'STEMS', 'BOOLEAN')) or \ (metafield['type'] == 'text' and indexfield['whoosh_type'] not in ( 'TEXT', 'STEMS', 'BOOLEAN')) or \ (metafield['type'] in ('integer', 'decimal') and indexfield['whoosh_type'] not in ('NUMERIC', 'BOOLEAN')) or \ (metafield['type'] == 'boolean' and indexfield['whoosh_type'] != 'BOOLEAN') or \ (metafield['type'] in ('datetime', 'date') and indexfield['whoosh_type'] != 'DATETIME') or \ (metafield['type'] in ('list', 'palette') and indexfield['whoosh_type'] not in ( 'ID', 'TEXT', 'KEYWORD', 'STEMS', 'BOOLEAN')) if not_compatible: meta_ids.remove(field_id) # Complete fields with whoosh_fields.update( warehouse.infos_read(path, meta_ids, True, request))
# -------------------------------------------------------------------------
[docs] @classmethod def thumbnails_obsolete(cls, abs_file, thumb_dir): """Check if thumbnails are obsolete. :param str abs_file: Absolute path to the source file. :param str thumb_dir: Absolute path to the directory containing the thumbnail. :rtype: bool """ return not exists(thumb_dir) \ or getmtime(abs_file) > getmtime(thumb_dir)
# -------------------------------------------------------------------------
[docs] def thumbnails( self, warehouse, abs_file, thumb_dir, request=None, registry=None): """Create the small and large thumbnails representing the file. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str abs_file: Absolute path to the source file. :param str thumb_dir: Absolute path to the directory containing the thumbnail. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. :param registry: (optional) class:`pyramid.registry.Registry` or class:`chrysalio.scripts.ScriptRegistry` if called by populate script. """
# -------------------------------------------------------------------------
[docs] def view(self, request, warehouse, content=None, ts_factory=None): """Return a string containing HTML to display the file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param content: (optional) Content of the file. :param ts_factory: (optional) Translation String Factory fucntion. :rtype: :class:`str` or ``None`` """
# -------------------------------------------------------------------------
[docs] def can_edit(self): """Return ``True`` if it can produce an editor. :rtype: bool """ return bool(self.editings)
# -------------------------------------------------------------------------
[docs] def edit(self, request, warehouse, content=None, ts_factory=None): """Return a string containing HTML to edit the file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param content: (optional) Content of the file. :param ts_factory: (optional) Translation String Factory fucntion. :rtype: :class:`str` or ``None`` """
# -------------------------------------------------------------------------
[docs] def save(self, request, warehouse, original, values, go_on): """Save the file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: ciowarehouse.lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type original: :class:`str` or :class:`lxml.etree._ElementTree` :param original: Initial content of the file. :param dict values: Modified values. :param bool go_on: ``True`` if the modification continues after saving. :rtype: :class:`str` or ``None`` :return: An error message or ``None``. """ # pylint: disable = unused-argument return 'Not implemented!'
# -------------------------------------------------------------------------
[docs] def edit_finalization(self, request, warehouse, path, message=None): """Commit changes, unlock files and refresh warehouse. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str path: Relative path to the file. :param str message: (optional) Commit meesage. :rtype: bool """ if warehouse.vcs.is_dirty(): if message is None: editing = self.current_rendering(request, warehouse, 'editing') message = _( 'Online editing in "${m}" mode', {'m': translate(editing['label'], request=request)}) warehouse.vcs.add(path) warehouse.vcs.commit( translate(message, request=request), request.session['user']['name'], request.session['user']['email']) self.file_unlock(request, warehouse, path) warehouse.refresh( request, (path, ), dbsession=request.dbsession, keep_cache=True) self._update_paging(request, warehouse, path)
# -------------------------------------------------------------------------
[docs] @classmethod def file_lock(cls, request, warehouse, path=None): """Lock a file for the purpose of editing it. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str path: (optional) Relative path of the file inside the warehouse. :rtype: bool """ if 'ciowarehouse' not in request.session: request.session['ciowarehouse'] = {} if 'editing' not in request.session['ciowarehouse']: request.session['ciowarehouse']['editing'] = {} path = path or join(*request.matchdict['path']) file_id = normpath(join(warehouse.uid, path)) if file_id in request.session['ciowarehouse']['editing'] and \ request.session['ciowarehouse']['editing'][file_id] > time(): warehouse.lock(join(warehouse.root, path), relock=True) request.session['ciowarehouse']['editing'][file_id] = \ time() + warehouse.lock_ttl return True locked = warehouse.lock(join(warehouse.root, path)) if locked: request.session['ciowarehouse']['editing'][file_id] = \ time() + warehouse.lock_ttl elif file_id in request.session['ciowarehouse']['editing']: del request.session['ciowarehouse']['editing'][file_id] return locked
# -------------------------------------------------------------------------
[docs] @classmethod def file_unlock(cls, request, warehouse, path=None): """Unlock a file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str path: (optional) Relative path of the file inside the warehouse. """ path = path or join(*request.matchdict['path']) warehouse.unlock(join(warehouse.root, path)) file_id = normpath(join(warehouse.uid, path)) if 'ciowarehouse' in request.session and \ 'editing' in request.session['ciowarehouse'] and \ file_id in request.session['ciowarehouse']['editing']: del request.session['ciowarehouse']['editing'][file_id]
# -------------------------------------------------------------------------
[docs] def abspath_from_home(self, path): """Return an absolute path. :param str path: Absolute path or relative path to the home directory of the handler. :rtype: str """ if ':' in path: path = abspath_from_asset_spec(path) return path if isabs(path) else join(self.home, path)
# -------------------------------------------------------------------------
[docs] @classmethod def themed_urls(cls, request, rendering, name): """Return a list of URLs possibly completed with theme prefix. :type request: pyramid.request.Request :param request: Current request. :param dict rendering: Dictionary defining the rendering. :param str name: Name of the bunch of URLs ('css', 'js') :rtype: list """ url_list = [] theme = theme_static_prefix(request) for url in rendering.get(name) or '': url_list.append( url if url.startswith('http') or url.startswith('/file') else '{0}{1}'.format(theme, url)) return url_list
# -------------------------------------------------------------------------
[docs] def seed(self, seed_id): """Return a tuple like ``(icon_url, label, absfile)``. :param str seed_id: ID of the seed :rtype: :class:`dict` or ``None`` """ return self.seeds[seed_id][:2] + ( self.abspath_from_home(self.seeds[seed_id][2]),) \ if seed_id in self.seeds else None
# -------------------------------------------------------------------------
[docs] def current_rendering(self, request, warehouse, rendering): """Find the current rendering. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str rendering: ('viewing' or 'editing') Type of rendering. :rtype: dict """ renderings = self._available_renderings(request, warehouse) renderings = renderings.get('editings') if rendering == 'editing' \ else renderings.get('viewings') if not renderings: return None if 'handlers' not in request.session: request.session['handlers'] = {} if self.uid not in request.session['handlers']: request.session['handlers'][self.uid] = {} index = int(request.params['rendering']) \ if 'rendering' in request.params \ and request.params['rendering'].isdigit() \ else request.session['handlers'][self.uid].get(rendering, 0) index = index if index < len(renderings) else 0 request.session['handlers'][self.uid][rendering] = index return renderings[index]
# ------------------------------------------------------------------------- def _chameleon_render( self, request, warehouse, rendering, ts_factory, values): """Execute a Chameleon render and return the result. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param dict rendering: Dictionary defining the rendering. :param ts_factory: Translation String Factory fucntion. :param dict values: Dictionary of values for template variables. :rtype: :class:`str` or ``None`` """ my_domain = rendering['template'].partition(':')[0] \ if ':' in rendering['template'] else 'ciowarehouse' def _translate( msgid, domain=my_domain, mapping=None, context=None, target_language=None): """Translation for Chameleon.""" # pylint: disable = unused-argument return request.localizer.translate( msgid, domain=domain or my_domain, mapping=mapping) path = join(*request.matchdict['path']) file_id = make_file_id(join(warehouse.uid, path)) can_edit = self.can_edit() and \ request.registry['modules']['ciowarehouse'].warehouse_file_writer( request, warehouse) values.update({ # yapf: disable 'request': request, '_': ts_factory, 'local_translate': request.localizer.translate, 'global_class': ' {0}'.format( request.GET['class']) if request.GET.get('class') else '', 'local_class': ' {0}'.format( values['class']) if values.get('class') else '', 'action': get_action(request)[0], 'title': request.registry['settings']['title'], 'theme': theme_static_prefix(request), 'theme_has': theme_has_static, 'handler': self, 'rendering': rendering, 'warehouse_id': warehouse.uid, 'directory': dirname(join(*request.matchdict['path'])) or '.', 'filename': request.matchdict['path'][-1], 'fullpath': join(*request.matchdict['path']), 'route_view': request.route_path( 'file_view', warehouse_id=warehouse.uid, path=path), 'route_download': request.route_path( 'file_download', warehouse_id=warehouse.uid, path=path), 'route_close': self._route_close(request, file_id)}) values['route_previous'], values['route_next'] = self._routes_around( request, warehouse, file_id) values.update(self._available_renderings(request, warehouse)) values['can_edit'] = bool(can_edit and values['editings']) if 'form' not in values: values['form'] = Form(request) if 'rendering_num' not in values: values['rendering_num'] = 0 if 'content' not in values: values['content'] = '' try: return PageTemplateFile( abspath_from_asset_spec(rendering['template']), translate=_translate).render(**values) except (SyntaxError, UnboundLocalError, AssertionError, TypeError, NameError, AttributeError, KeyError) as error: # pragma: nocover self._log_error(error, request) return None # ------------------------------------------------------------------------- @classmethod def _route_close(cls, request, file_id): """Return the URL to close the view and return to the previous page. :type request: pyramid.request.Request :param request: Current request. :param str file_id: ID of the current file. :rtype: str """ return '{0}#{1}_'.format(request.breadcrumbs.back_path(), file_id) # ------------------------------------------------------------------------- def _routes_around( self, request, warehouse, file_id, extensions_subset=None): """Return a tuple of URL to go to the previous and the next file according to the file type. :type request: pyramid.request.Request :param request: Current request. :param str file_id: ID of the current file. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param tuple extensions_subset: (optional) Subset of extensions. :rtype: tuple """ def _item_match(item): """Return an URL to view the item if it matches.""" path = join(item['directory'], item['file_name']) if self.match(splitext(item['file_name'])[1], join( ciowarehouse.warehouse_root(request, item['warehouse_id']), path), extensions_subset=extensions_subset)[0]: return request.route_path( 'file_view', warehouse_id=item['warehouse_id'], path=path) return None if 'ciowarehouse' not in request.session \ or 'current_cache' not in request.session['ciowarehouse']: return None, None # Retrieve file list files = request.registry['cache_global'].get( request.session['ciowarehouse']['current_cache'][0], request.session['ciowarehouse']['current_cache'][1], CACHE_REGION_GLOBAL) if not files: return None, None # Possibly, filter by groups ciowarehouse = request.registry['modules']['ciowarehouse'] if not ciowarehouse.warehouse_admin(request, warehouse): user_groups = set(request.session['user']['groups']) files = [ k for k in files if not k['only_groups'] or (user_groups & k['only_groups']) ] # Find the current file in the list index = -1 for k, item in enumerate(files): if item['file_id'] == file_id: index = k break if index == -1: return None, None # Find the previous one route_previous = None for k in range(index - 1, -1, -1): route_previous = _item_match(files[k]) if route_previous is not None: break # Find the next one route_next = None for k in range(index + 1, len(files)): route_next = _item_match(files[k]) if route_next is not None: break return route_previous, route_next # ------------------------------------------------------------------------- @classmethod def _update_paging(cls, request, warehouse, path): """If exists, update the paging entry of the path ``path``. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str path: Relative path of the file inside the warehouse. """ if 'ciowarehouse' not in request.session \ or 'current_cache' not in request.session['ciowarehouse']: return paging = request.registry['cache_global'].get( request.session['ciowarehouse']['current_cache'][0], request.session['ciowarehouse']['current_cache'][1], CACHE_REGION_GLOBAL) if not paging: return # Find the current file in the paging index = -1 for k, item in enumerate(paging): if normpath(join(item['directory'], item['file_name'])) == path: index = k break if index == -1: return # Update the paging abs_path = join(warehouse.root, path) stat_info = stat(abs_path) paging[index]['file_size'] = 0 \ if paging[index]['file_type'] == 'directory' else stat_info.st_size paging[index]['file_date'] = int(stat_info.st_mtime) handler = warehouse.get_handler(abs_path)[0] if handler is not None: fields = {} handler.infos_complete_fields( warehouse, path, abs_path, fields, request) for item in set(paging[index].keys()) & set(fields.keys()): paging[index][item] = fields[item] request.registry['cache_global'].set( request.session['ciowarehouse']['current_cache'][0], paging, request.session['ciowarehouse']['current_cache'][1], CACHE_REGION_GLOBAL) # ------------------------------------------------------------------------- def _remaining_fields(self, warehouse, whoosh_fields): """Return the set of index fields to complete. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param dict whoosh_fields: Dictionary of Whoosh fields to complete. :rtype: set """ if not self._indexers: return None return \ (set(warehouse.indexfields.keys()) - set(whoosh_fields.keys())) \ & set(self._indexers.keys()) # ------------------------------------------------------------------------- @cache_user_renderings(CIOWAREHOUSE_NS, CACHE_REGION_USER) def _available_renderings(self, request, warehouse): """Return a dictionary of available renderings. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param war/ehouse: Object describing the warehouse containing the file. :rtype: dictionary """ available = {} user_groups = set(request.session['user']['groups']) toml = None if warehouse.rendering_dir: toml = join(warehouse.rendering_dir, '{}.toml'.format(self.uid)) if exists(toml): with open(toml, 'r', encoding='utf8') as hdl: toml = toml_loads(hdl.read()) else: toml = None for renderings in ('viewings', 'editings'): available[renderings] = [] done = set() # From a custom directory if toml is not None: for rendering in toml.get(renderings, ''): rendering = self._check_rendering( request, user_groups, toml.get(f'default_{renderings}'), rendering) if rendering is not None: available[renderings].append(rendering) done.add(rendering['name']) # Hard coded rendering_list = self.editings \ if renderings == 'editings' else self.viewings available[renderings] += [ k for k in rendering_list if k['name'] not in done and ( k.get('only_groups') is None or k['only_groups'] & user_groups) ] return self._fix_renderings(request, available) # ------------------------------------------------------------------------- def _check_rendering(self, request, user_groups, defaults, rendering): """Check and complete a rendering. :type request: pyramid.request.Request :param request: Current request. :param set user_groups: Groups the user belongs to. :param dict defaults: Default values. :param dict rendering: Rendering to process. :rtype: dictionary """ if rendering.get('only_groups') and \ not set(rendering['only_groups']) & user_groups: return None if 'name' not in rendering: self._log_error( _('Name is missing for a dynaming rendering'), request) return None # Default values checked = dict(defaults) if defaults else {} checked.update(rendering) # Handler UID checked['handler_uid'] = checked.get('handler_uid', self.uid) # Label checked['label'] = checked['label'].get( request.locale_name, checked['label'].get('en', checked['name'])) \ if 'label' in checked else checked['name'] # Check if 'template' not in checked or 'css' not in checked: self._log_error( _('Rendering "${n}" is incorrect', {'n': checked['name']}), request) return None return checked # ------------------------------------------------------------------------- def _fix_renderings(self, request, available): """Possibly Fix URLs. :type request: pyramid.request.Request :param request: Current request. :param dict available: Available renderings. :rtype: dictionary """ # pylint: disable = unused-argument return available # ------------------------------------------------------------------------- @classmethod def _log_error(cls, error, request=None): """Log an error message. :param str error: Error message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.error(translate(error)) else: log_error(request, translate(error, lang='en')) # ------------------------------------------------------------------------- @classmethod def _log_warning(cls, warning, request=None): """Log an warning message. :param str warning: Warning message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.warning(translate(warning)) else: log_warning(request, translate(warning, lang='en'))