"""A file handler is a class to make thumbnail and to provide index values for
a particular type of file.
"""
from __future__ import annotations
from os import stat, makedirs
from os.path import join, exists, getmtime, isfile, dirname, basename, splitext
from os.path import normpath, isabs, isdir
from logging import getLogger
from re import compile as re_compile
from time import time
from shutil import rmtree, copy
from chameleon import PageTemplateFile
from pytomlpp import loads as toml_loads
from pyramid.asset import abspath_from_asset_spec
from chrysalio.lib.utils import copy_content_re
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.log import log_error, log_warning
from chrysalio.includes.themes import theme_static_prefix, theme_has_static
from ..lib.i18n import _, translate
from ..lib.utils import CIOWAREHOUSE_NS, CACHE_REGION_USER, CACHE_REGION_GLOBAL
from ..lib.utils import make_file_id, cache_user_renderings
from ..models.dbhandler import DBHandler
LOG = getLogger(__name__)
EXCLUDED_FILES_RE = re_compile('(__pycache__|\\.pyc?$)')
LAYOUT_VIEW_PT = 'ciowarehouse:Templates/handler_layout_view.pt'
LAYOUT_EDIT_PT = 'ciowarehouse:Templates/handler_layout_edit.pt'
CHRYSALIO_JS = ('/js/js.cookie.js', '/js/jquery.js', '/js/chrysalio.js')
# =============================================================================
[docs]
class Handler(object):
"""Base class for file handlers.
This object provides thumbnail, index values and rendering for a particular
type of file.
``viewings`` and ``editings`` attributes are lists of dictionaries with
the following keys:
* ``'label'``: (required) label of the rendering
* ``'template'``: (required) Chameleon template to use
* ``'css'``: list of paths to CSS file
* ``'js'``: list of path to Javascript file
* ``'only_groups'``: set of groups of authorized users for editing
"""
# pylint: disable = too-many-instance-attributes
uid: str | None = None
label: str | None = None
extensions: tuple = ()
file_regex: str | None = None
content_regex = None
imports: tuple = ()
seeds: dict = {}
viewings: tuple = ()
editings: tuple = ()
thumbnailing: dict | None = None
panel = None
_home = normpath(join(dirname(__file__), '..', 'handlers'))
# -------------------------------------------------------------------------
def __init__(self):
"""Constructor method."""
self.home = self._home
self._develop = False
self._indexers = None
# Regular expressions
if self.file_regex is not None:
self.file_regex = re_compile(self.file_regex)
if self.content_regex is not None:
self.content_regex = re_compile(self.content_regex)
# -------------------------------------------------------------------------
[docs]
@classmethod
def register(cls, environment, handler_class, **kwargs):
"""Method to register the handler.
:type environment: :class:`pyramid.config.Configurator` or
:class:`dict`
:param environment:
Object used to do configuration declaration within the application
or a ScriptRegistry to simulate the application registry.
:param handler_class:
Handler class.
:param dict kwargs:
Keyworded arguments
:rtype: .lib.handler.Handler
"""
# Server mode (environment == configurator)
handler = handler_class(**kwargs)
if hasattr(environment, 'registry'):
if 'handlers' not in environment.registry:
environment.registry['handlers'] = []
environment.registry['handlers'].insert(0, handler)
# Populate/backup/execute mode (environment == ScriptRegistry)
else:
if 'handlers' not in environment:
environment['handlers'] = []
environment['handlers'].insert(0, handler)
return handler
# -------------------------------------------------------------------------
[docs]
def initialize(self, dbsession, config):
"""Initialize the handler reading its possibly parameters in database.
:param dbsession:
SQLAlchemy session.
:param dict config:
Dictionary with keys ``'root'`` and ``'develop'`` where ``'root'``
is the absolute path to the root directory for handler with
imports.
"""
self._develop = config.get('develop') == 'true'
# Handler directory
if config.get('root') and self.imports:
self.home = normpath(join(config['root'], self.uid))
# Handler parameters
if self._indexers is not None:
return
self._indexers = {}
dbhandler = dbsession.query(DBHandler).filter_by(
handler_id=self.uid).first()
if dbhandler is None:
return
# Indexer characteristics
for dbindexer in dbhandler.indexers:
indexer = {}
if dbindexer.argument:
indexer['argument'] = dbindexer.argument
if dbindexer.limit:
indexer['limit'] = dbindexer.limit
self._indexers[dbindexer.indexfield_id] = indexer
# -------------------------------------------------------------------------
[docs]
def install(self, force=False):
"""Install a handler in an alternative home directory with its
imports.
:param bool force:
If ``True``, force installation.
"""
if self._home == self.home:
return
if exists(self.home):
if not force:
return
rmtree(self.home)
makedirs(self.home, exist_ok=True)
# Copy imports
for import_ in self.imports:
directory = normpath(join(self.home, import_[1]))
if not directory.startswith(self.home):
continue
path = import_[0]
if ':' in path:
path = abspath_from_asset_spec(path)
if not isabs(path):
path = join(self._home, path)
if isdir(path):
copy_content_re(
path, join(self.home, directory), EXCLUDED_FILES_RE)
elif EXCLUDED_FILES_RE.search(
basename(path)) is None and exists(path):
copy(path, join(self.home, directory))
# Copy original
copy_content_re(self._home, self.home, EXCLUDED_FILES_RE)
# -------------------------------------------------------------------------
[docs]
def match(self, extension, filename, content=None, extensions_subset=None):
"""Check whether this file handler matches with the file ``filename``.
:param str extension:
File extension.
:param str filename:
Absolute path to the file.
:param str content: (optional)
Content of the file.
:param tuple extensions_subset: (optional)
Subset of extensions (for instance, only Web-compatible
extensions).
:rtype: tuple
:return:
A tuple such as ``(match, content)`` where ``match`` is ``True``
if the handler matches.
"""
extensions = extensions_subset or self.extensions
if extensions and extension.lower() not in extensions:
return False, content
if self.file_regex is not None and self.file_regex.search(
basename(filename)) is None:
return False, content
if self.content_regex is None:
return True, content
if not isfile(filename):
return False, content
if content is None:
with open(filename, 'rb') as hdl:
content = hdl.read()
return self.content_regex.search(content) is not None, content
# -------------------------------------------------------------------------
[docs]
def infos_complete_fields(
self, warehouse, path, abs_path, whoosh_fields, request=None):
"""Complete the ``whoosh_fields`` dictionary with information found
in the infos file.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path of the file.
:param str abs_path:
Absolute path of the file.
:param dict whoosh_fields:
Dictionary of Whoosh fields to complete.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
"""
# pylint: disable = unused-argument
# Look for metadata fields to process
meta_ids = \
(set(warehouse.indexfields.keys()) - set(whoosh_fields.keys())) \
& set(warehouse.metafields.keys())
# Check for field compatibility
for field_id in tuple(meta_ids):
metafield = warehouse.metafields[field_id]
indexfield = warehouse.indexfields[field_id]
not_compatible = \
(metafield['type'] == 'string' and
indexfield['whoosh_type'] not in (
'ID', 'TEXT', 'KEYWORD', 'STEMS', 'BOOLEAN')) or \
(metafield['type'] == 'text' and
indexfield['whoosh_type'] not in (
'TEXT', 'STEMS', 'BOOLEAN')) or \
(metafield['type'] in ('integer', 'decimal') and
indexfield['whoosh_type'] not in ('NUMERIC', 'BOOLEAN')) or \
(metafield['type'] == 'boolean' and
indexfield['whoosh_type'] != 'BOOLEAN') or \
(metafield['type'] in ('datetime', 'date') and
indexfield['whoosh_type'] != 'DATETIME') or \
(metafield['type'] in ('list', 'palette') and
indexfield['whoosh_type'] not in (
'ID', 'TEXT', 'KEYWORD', 'STEMS', 'BOOLEAN'))
if not_compatible:
meta_ids.remove(field_id)
# Complete fields with
whoosh_fields.update(
warehouse.infos_read(path, meta_ids, True, request))
# -------------------------------------------------------------------------
[docs]
@classmethod
def thumbnails_obsolete(cls, abs_file, thumb_dir):
"""Check if thumbnails are obsolete.
:param str abs_file:
Absolute path to the source file.
:param str thumb_dir:
Absolute path to the directory containing the thumbnail.
:rtype: bool
"""
return not exists(thumb_dir) \
or getmtime(abs_file) > getmtime(thumb_dir)
# -------------------------------------------------------------------------
[docs]
def thumbnails(
self, warehouse, abs_file, thumb_dir, request=None, registry=None):
"""Create the small and large thumbnails representing the file.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str abs_file:
Absolute path to the source file.
:param str thumb_dir:
Absolute path to the directory containing the thumbnail.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
:param registry: (optional)
class:`pyramid.registry.Registry` or
class:`chrysalio.scripts.ScriptRegistry` if called by populate
script.
"""
# -------------------------------------------------------------------------
[docs]
def view(self, request, warehouse, content=None, ts_factory=None):
"""Return a string containing HTML to display the file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param content: (optional)
Content of the file.
:param ts_factory: (optional)
Translation String Factory fucntion.
:rtype: :class:`str` or ``None``
"""
# -------------------------------------------------------------------------
[docs]
def can_edit(self):
"""Return ``True`` if it can produce an editor.
:rtype: bool
"""
return bool(self.editings)
# -------------------------------------------------------------------------
[docs]
def edit(self, request, warehouse, content=None, ts_factory=None):
"""Return a string containing HTML to edit the file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param content: (optional)
Content of the file.
:param ts_factory: (optional)
Translation String Factory fucntion.
:rtype: :class:`str` or ``None``
"""
# -------------------------------------------------------------------------
[docs]
def save(self, request, warehouse, original, values, go_on):
"""Save the file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type original: :class:`str` or :class:`lxml.etree._ElementTree`
:param original:
Initial content of the file.
:param dict values:
Modified values.
:param bool go_on:
``True`` if the modification continues after saving.
:rtype: :class:`str` or ``None``
:return:
An error message or ``None``.
"""
# pylint: disable = unused-argument
return 'Not implemented!'
# -------------------------------------------------------------------------
[docs]
def edit_finalization(self, request, warehouse, path, message=None):
"""Commit changes, unlock files and refresh warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:param str message: (optional)
Commit meesage.
:rtype: bool
"""
if warehouse.vcs.is_dirty():
if message is None:
editing = self.current_rendering(request, warehouse, 'editing')
message = _(
'Online editing in "${m}" mode',
{'m': translate(editing['label'], request=request)})
warehouse.vcs.add(path)
warehouse.vcs.commit(
translate(message,
request=request), request.session['user']['name'],
request.session['user']['email'])
self.file_unlock(request, warehouse, path)
warehouse.refresh(
request, (path, ), dbsession=request.dbsession, keep_cache=True)
self._update_paging(request, warehouse, path)
# -------------------------------------------------------------------------
[docs]
@classmethod
def file_lock(cls, request, warehouse, path=None):
"""Lock a file for the purpose of editing it.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path: (optional)
Relative path of the file inside the warehouse.
:rtype: bool
"""
if 'ciowarehouse' not in request.session:
request.session['ciowarehouse'] = {}
if 'editing' not in request.session['ciowarehouse']:
request.session['ciowarehouse']['editing'] = {}
path = path or join(*request.matchdict['path'])
file_id = normpath(join(warehouse.uid, path))
if file_id in request.session['ciowarehouse']['editing'] and \
request.session['ciowarehouse']['editing'][file_id] > time():
warehouse.lock(join(warehouse.root, path), relock=True)
request.session['ciowarehouse']['editing'][file_id] = \
time() + warehouse.lock_ttl
return True
locked = warehouse.lock(join(warehouse.root, path))
if locked:
request.session['ciowarehouse']['editing'][file_id] = \
time() + warehouse.lock_ttl
elif file_id in request.session['ciowarehouse']['editing']:
del request.session['ciowarehouse']['editing'][file_id]
return locked
# -------------------------------------------------------------------------
[docs]
@classmethod
def file_unlock(cls, request, warehouse, path=None):
"""Unlock a file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path: (optional)
Relative path of the file inside the warehouse.
"""
path = path or join(*request.matchdict['path'])
warehouse.unlock(join(warehouse.root, path))
file_id = normpath(join(warehouse.uid, path))
if 'ciowarehouse' in request.session and \
'editing' in request.session['ciowarehouse'] and \
file_id in request.session['ciowarehouse']['editing']:
del request.session['ciowarehouse']['editing'][file_id]
# -------------------------------------------------------------------------
[docs]
def abspath_from_home(self, path):
"""Return an absolute path.
:param str path:
Absolute path or relative path to the home directory of the
handler.
:rtype: str
"""
if ':' in path:
path = abspath_from_asset_spec(path)
return path if isabs(path) else join(self.home, path)
# -------------------------------------------------------------------------
[docs]
@classmethod
def themed_urls(cls, request, rendering, name):
"""Return a list of URLs possibly completed with theme prefix.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict rendering:
Dictionary defining the rendering.
:param str name:
Name of the bunch of URLs ('css', 'js')
:rtype: list
"""
url_list = []
theme = theme_static_prefix(request)
for url in rendering.get(name) or '':
url_list.append(
url if url.startswith('http') or url.startswith('/file') else
'{0}{1}'.format(theme, url))
return url_list
# -------------------------------------------------------------------------
[docs]
def seed(self, seed_id):
"""Return a tuple like ``(icon_url, label, absfile)``.
:param str seed_id:
ID of the seed
:rtype: :class:`dict` or ``None``
"""
return self.seeds[seed_id][:2] + (
self.abspath_from_home(self.seeds[seed_id][2]),) \
if seed_id in self.seeds else None
# -------------------------------------------------------------------------
[docs]
def current_rendering(self, request, warehouse, rendering):
"""Find the current rendering.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str rendering: ('viewing' or 'editing')
Type of rendering.
:rtype: dict
"""
renderings = self._available_renderings(request, warehouse)
renderings = renderings.get('editings') if rendering == 'editing' \
else renderings.get('viewings')
if not renderings:
return None
if 'handlers' not in request.session:
request.session['handlers'] = {}
if self.uid not in request.session['handlers']:
request.session['handlers'][self.uid] = {}
index = int(request.params['rendering']) \
if 'rendering' in request.params \
and request.params['rendering'].isdigit() \
else request.session['handlers'][self.uid].get(rendering, 0)
index = index if index < len(renderings) else 0
request.session['handlers'][self.uid][rendering] = index
return renderings[index]
# -------------------------------------------------------------------------
def _chameleon_render(
self, request, warehouse, rendering, ts_factory, values):
"""Execute a Chameleon render and return the result.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param dict rendering:
Dictionary defining the rendering.
:param ts_factory:
Translation String Factory fucntion.
:param dict values:
Dictionary of values for template variables.
:rtype: :class:`str` or ``None``
"""
my_domain = rendering['template'].partition(':')[0] \
if ':' in rendering['template'] else 'ciowarehouse'
def _translate(
msgid, domain=my_domain, mapping=None, context=None,
target_language=None):
"""Translation for Chameleon."""
# pylint: disable = unused-argument
return request.localizer.translate(
msgid, domain=domain or my_domain, mapping=mapping)
path = join(*request.matchdict['path'])
file_id = make_file_id(join(warehouse.uid, path))
can_edit = self.can_edit() and \
request.registry['modules']['ciowarehouse'].warehouse_file_writer(
request, warehouse)
values.update({ # yapf: disable
'request': request,
'_': ts_factory,
'local_translate': request.localizer.translate,
'global_class': ' {0}'.format(
request.GET['class']) if request.GET.get('class') else '',
'local_class': ' {0}'.format(
values['class']) if values.get('class') else '',
'action': get_action(request)[0],
'title': request.registry['settings']['title'],
'theme': theme_static_prefix(request),
'theme_has': theme_has_static,
'handler': self,
'rendering': rendering,
'warehouse_id': warehouse.uid,
'directory': dirname(join(*request.matchdict['path'])) or '.',
'filename': request.matchdict['path'][-1],
'fullpath': join(*request.matchdict['path']),
'route_view': request.route_path(
'file_view', warehouse_id=warehouse.uid, path=path),
'route_download': request.route_path(
'file_download', warehouse_id=warehouse.uid, path=path),
'route_close': self._route_close(request, file_id)})
values['route_previous'], values['route_next'] = self._routes_around(
request, warehouse, file_id)
values.update(self._available_renderings(request, warehouse))
values['can_edit'] = bool(can_edit and values['editings'])
if 'form' not in values:
values['form'] = Form(request)
if 'rendering_num' not in values:
values['rendering_num'] = 0
if 'content' not in values:
values['content'] = ''
try:
return PageTemplateFile(
abspath_from_asset_spec(rendering['template']),
translate=_translate).render(**values)
except (SyntaxError, UnboundLocalError, AssertionError, TypeError,
NameError, AttributeError,
KeyError) as error: # pragma: nocover
self._log_error(error, request)
return None
# -------------------------------------------------------------------------
@classmethod
def _route_close(cls, request, file_id):
"""Return the URL to close the view and return to the previous page.
:type request: pyramid.request.Request
:param request:
Current request.
:param str file_id:
ID of the current file.
:rtype: str
"""
return '{0}#{1}_'.format(request.breadcrumbs.back_path(), file_id)
# -------------------------------------------------------------------------
def _routes_around(
self, request, warehouse, file_id, extensions_subset=None):
"""Return a tuple of URL to go to the previous and the next file
according to the file type.
:type request: pyramid.request.Request
:param request:
Current request.
:param str file_id:
ID of the current file.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param tuple extensions_subset: (optional)
Subset of extensions.
:rtype: tuple
"""
def _item_match(item):
"""Return an URL to view the item if it matches."""
path = join(item['directory'], item['file_name'])
if self.match(splitext(item['file_name'])[1], join(
ciowarehouse.warehouse_root(request, item['warehouse_id']),
path), extensions_subset=extensions_subset)[0]:
return request.route_path(
'file_view', warehouse_id=item['warehouse_id'], path=path)
return None
if 'ciowarehouse' not in request.session \
or 'current_cache' not in request.session['ciowarehouse']:
return None, None
# Retrieve file list
files = request.registry['cache_global'].get(
request.session['ciowarehouse']['current_cache'][0],
request.session['ciowarehouse']['current_cache'][1],
CACHE_REGION_GLOBAL)
if not files:
return None, None
# Possibly, filter by groups
ciowarehouse = request.registry['modules']['ciowarehouse']
if not ciowarehouse.warehouse_admin(request, warehouse):
user_groups = set(request.session['user']['groups'])
files = [
k for k in files
if not k['only_groups'] or (user_groups & k['only_groups'])
]
# Find the current file in the list
index = -1
for k, item in enumerate(files):
if item['file_id'] == file_id:
index = k
break
if index == -1:
return None, None
# Find the previous one
route_previous = None
for k in range(index - 1, -1, -1):
route_previous = _item_match(files[k])
if route_previous is not None:
break
# Find the next one
route_next = None
for k in range(index + 1, len(files)):
route_next = _item_match(files[k])
if route_next is not None:
break
return route_previous, route_next
# -------------------------------------------------------------------------
@classmethod
def _update_paging(cls, request, warehouse, path):
"""If exists, update the paging entry of the path ``path``.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path of the file inside the warehouse.
"""
if 'ciowarehouse' not in request.session \
or 'current_cache' not in request.session['ciowarehouse']:
return
paging = request.registry['cache_global'].get(
request.session['ciowarehouse']['current_cache'][0],
request.session['ciowarehouse']['current_cache'][1],
CACHE_REGION_GLOBAL)
if not paging:
return
# Find the current file in the paging
index = -1
for k, item in enumerate(paging):
if normpath(join(item['directory'], item['file_name'])) == path:
index = k
break
if index == -1:
return
# Update the paging
abs_path = join(warehouse.root, path)
stat_info = stat(abs_path)
paging[index]['file_size'] = 0 \
if paging[index]['file_type'] == 'directory' else stat_info.st_size
paging[index]['file_date'] = int(stat_info.st_mtime)
handler = warehouse.get_handler(abs_path)[0]
if handler is not None:
fields = {}
handler.infos_complete_fields(
warehouse, path, abs_path, fields, request)
for item in set(paging[index].keys()) & set(fields.keys()):
paging[index][item] = fields[item]
request.registry['cache_global'].set(
request.session['ciowarehouse']['current_cache'][0], paging,
request.session['ciowarehouse']['current_cache'][1],
CACHE_REGION_GLOBAL)
# -------------------------------------------------------------------------
def _remaining_fields(self, warehouse, whoosh_fields):
"""Return the set of index fields to complete.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param dict whoosh_fields:
Dictionary of Whoosh fields to complete.
:rtype: set
"""
if not self._indexers:
return None
return \
(set(warehouse.indexfields.keys()) - set(whoosh_fields.keys())) \
& set(self._indexers.keys())
# -------------------------------------------------------------------------
@cache_user_renderings(CIOWAREHOUSE_NS, CACHE_REGION_USER)
def _available_renderings(self, request, warehouse):
"""Return a dictionary of available renderings.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param war/ehouse:
Object describing the warehouse containing the file.
:rtype: dictionary
"""
available = {}
user_groups = set(request.session['user']['groups'])
toml = None
if warehouse.rendering_dir:
toml = join(warehouse.rendering_dir, '{}.toml'.format(self.uid))
if exists(toml):
with open(toml, 'r', encoding='utf8') as hdl:
toml = toml_loads(hdl.read())
else:
toml = None
for renderings in ('viewings', 'editings'):
available[renderings] = []
done = set()
# From a custom directory
if toml is not None:
for rendering in toml.get(renderings, ''):
rendering = self._check_rendering(
request, user_groups,
toml.get(f'default_{renderings}'), rendering)
if rendering is not None:
available[renderings].append(rendering)
done.add(rendering['name'])
# Hard coded
rendering_list = self.editings \
if renderings == 'editings' else self.viewings
available[renderings] += [
k for k in rendering_list if k['name'] not in done and (
k.get('only_groups') is None or k['only_groups']
& user_groups)
]
return self._fix_renderings(request, available)
# -------------------------------------------------------------------------
def _check_rendering(self, request, user_groups, defaults, rendering):
"""Check and complete a rendering.
:type request: pyramid.request.Request
:param request:
Current request.
:param set user_groups:
Groups the user belongs to.
:param dict defaults:
Default values.
:param dict rendering:
Rendering to process.
:rtype: dictionary
"""
if rendering.get('only_groups') and \
not set(rendering['only_groups']) & user_groups:
return None
if 'name' not in rendering:
self._log_error(
_('Name is missing for a dynaming rendering'), request)
return None
# Default values
checked = dict(defaults) if defaults else {}
checked.update(rendering)
# Handler UID
checked['handler_uid'] = checked.get('handler_uid', self.uid)
# Label
checked['label'] = checked['label'].get(
request.locale_name,
checked['label'].get('en', checked['name'])) \
if 'label' in checked else checked['name']
# Check
if 'template' not in checked or 'css' not in checked:
self._log_error(
_('Rendering "${n}" is incorrect', {'n': checked['name']}),
request)
return None
return checked
# -------------------------------------------------------------------------
def _fix_renderings(self, request, available):
"""Possibly Fix URLs.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict available:
Available renderings.
:rtype: dictionary
"""
# pylint: disable = unused-argument
return available
# -------------------------------------------------------------------------
@classmethod
def _log_error(cls, error, request=None):
"""Log an error message.
:param str error:
Error message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.error(translate(error))
else:
log_error(request, translate(error, lang='en'))
# -------------------------------------------------------------------------
@classmethod
def _log_warning(cls, warning, request=None):
"""Log an warning message.
:param str warning:
Warning message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.warning(translate(warning))
else:
log_warning(request, translate(warning, lang='en'))