Source code for ciowarehouse.models.dbwarehouse

# -*- coding: utf-8 -*-
"""SQLAlchemy-powered model definitions for warehouses."""
# pylint: disable = too-many-lines

from os.path import expanduser, abspath
from json import dumps
from collections import OrderedDict
from urllib.parse import urlparse

from sqlalchemy import Column, ForeignKey, String, Enum, Integer, Boolean
from sqlalchemy import PickleType, Text
from sqlalchemy.orm import relationship
from lxml import etree
import colander

from chrysalio.lib.i18n import record_format_i18n
from chrysalio.lib.i18n import schema_i18n_labels, defaults_i18n_labels
from chrysalio.lib.i18n import view_i18n_labels, edit_i18n_labels
from chrysalio.lib.utils import make_id, tostr, encrypt, size_label
from chrysalio.lib.utils import deltatime_label
from chrysalio.lib.xml import i18n_xml_text, db2xml_i18n_labels
from chrysalio.helpers.literal import Literal
from chrysalio.helpers.builder import Builder
from chrysalio.models import ID_LEN, LABEL_LEN, DESCRIPTION_LEN
from chrysalio.models import DBDeclarativeClass
from chrysalio.models.dbbase import DBBaseClass
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from cioservice.models.dbjob import DBJob
from . import URL_LEN, PATH_LEN
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..lib.i18n import _


WAREHOUSE_NORMALIZE = {'simple': _('simple'), 'strict': _('strict')}
WAREHOUSE_ACCESSES = {
    'free': _('free'), 'readonly': _('read only'),
    'restricted': _('restricted'),
    'restricted-readonly': _('restricted and read only'),
    'closed': _('closed')}
FILE_RIGHTS_LABELS = OrderedDict((
    ('reader', _('reading')), ('writer', _('writing')),
    ('writer-admin', _('administration'))))
META_RIGHTS_LABELS = OrderedDict((
    ('reader', _('reading')), ('writer', _('writing'))))
THUMBNAIL_SIZE_LARGE_DEFAULT = '256x256'
THUMBNAIL_SIZE_SMALL_DEFAULT = '128x96'
THUMBNAIL_SIZE_HINT = _('0X0 = no thumbnail')
LOCK_TTL = 600
LOCK_TTL_HINT = _('in seconds, default: 600 (10 minutes)')
REFRESH_PERIOD = 14400  # 4 hours
REFRESH_PERIOD_HINT = _('in seconds, default: 14400 (4 hours)')
DOWNLOAD_MAX_SIZE = 10485760  # 10 Mio
DOWNLOAD_MAX_SIZE_HINT = _('in bytes (ex. 10485760 = 10Mio, 0 = no limit)')


# =============================================================================
[docs] class DBWarehouse(DBDeclarativeClass, DBBaseClass): """SQLAlchemy-powered warehouse class.""" suffix = 'ciowrh' attachments_dir = 'Warehouses' _settings_tabs = ( _('Information'), _('File Handlers'), _('Actions'), _('Authorized users'), _('Authorized groups')) __tablename__ = 'wrh_warehouses' __table_args__ = {'mysql_engine': 'InnoDB'} warehouse_id = Column(String(ID_LEN), primary_key=True) i18n_label = Column(Text(), nullable=False) i18n_description = Column(PickleType(1)) attachments_key = Column(String(ID_LEN + 20)) picture = Column(String(ID_LEN + 4)) location = Column(String(ID_LEN), nullable=False) vcs = Column(Boolean(name='vcs'), default=False) vcs_url = Column(String(URL_LEN)) vcs_user = Column(String(ID_LEN)) vcs_password = Column(String(128)) normalize = Column( Enum(*WAREHOUSE_NORMALIZE.keys(), name='normalize_enum')) access = Column( Enum(*WAREHOUSE_ACCESSES.keys(), name='wrh_access_enum'), nullable=False) visible_url = Column(String(URL_LEN)) thumbnail_large = Column(String(10)) thumbnail_small = Column(String(10)) download_max_size = Column(Integer, default=DOWNLOAD_MAX_SIZE) lock_ttl = Column(Integer, default=LOCK_TTL) refresh_period = Column(Integer, default=REFRESH_PERIOD) rendering_dir = Column(String(PATH_LEN)) seeds_dir = Column(String(PATH_LEN)) metafields = relationship( 'DBMetafield', secondary='wrh_warehouses_metafields', order_by='DBWarehouseMetafield.position') indexfields = relationship('DBWarehouseIndexfield', cascade='all, delete') handlers = relationship('DBWarehouseHandler', cascade='all, delete') jobs = relationship( DBJob, secondary='wrh_warehouses_jobs', order_by='desc(DBJob.priority)') users = relationship('DBWarehouseUser', cascade='all, delete') groups = relationship('DBWarehouseGroup', cascade='all, delete') # -------------------------------------------------------------------------
[docs] @classmethod def xml2db( cls, dbsession, warehouse_elt, error_if_exists=True, kwargs=None): """Load a warehouse from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: Warehouse XML element. :param bool error_if_exists: (default=True) It returns an error if user warehouse already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # Check if already exists warehouse_id = make_id(warehouse_elt.get('id'), 'standard', ID_LEN) dbwarehouse = dbsession.query(cls).filter_by( warehouse_id=warehouse_id).first() if dbwarehouse is not None: if error_if_exists: return _( 'Warehouse "${w}" already exists.', {'w': warehouse_id}) return None # Create warehouse record = cls.record_from_xml(warehouse_id, warehouse_elt) error = cls.record_format(record) if error: return error dbwarehouse = cls(**record) dbsession.add(dbwarehouse) # Fill extra tables return dbwarehouse.xml2db_extra(dbsession, warehouse_elt, kwargs)
# -------------------------------------------------------------------------
[docs] def xml2db_extra(self, dbsession, warehouse_elt, kwargs): """Load extra information on a warehouse from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: User XML element. :param dict kwargs: Dictionary of keyword arguments with the key ``'profiles'``. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ dbsession.flush() warehouse_id = self.warehouse_id # Add metadata fields namespace = RELAXNG_CIOWAREHOUSE['namespace'] kwargs = {} if kwargs is None else kwargs refs = kwargs.get('metarefs', ()) done = set() for elt in warehouse_elt.xpath( 'ns0:metafields/ns0:metaref', namespaces={'ns0': namespace}): ref = elt.text if ref not in done and ref in refs: done.add(ref) dbsession.add(DBWarehouseMetafield( warehouse_id=warehouse_id, metafield_id=ref, position=len(done))) # Add index fields refs = kwargs.get('indexrefs', ()) done = set() for elt in warehouse_elt.xpath( 'ns0:indexfields/ns0:indexref', namespaces={'ns0': namespace}): ref = elt.text if ref not in done and ref in refs: done.add(ref) dbsession.add(DBWarehouseIndexfield( warehouse_id=warehouse_id, indexfield_id=ref, in_list=int(elt.get('in-list', '0') or None), in_cards=int(elt.get('in-cards', '0') or None))) # Add handlers done = set() for elt in warehouse_elt.xpath( 'ns0:handlers/ns0:ban', namespaces={'ns0': namespace}): item_id = elt.text.strip() if item_id not in done: done.add(item_id) dbsession.add(DBWarehouseHandler( warehouse_id=warehouse_id, handler_id=item_id)) # Add jobs if kwargs.get('jobs'): done = set() for elt in warehouse_elt.xpath( 'ns0:jobs/ns0:jobref', namespaces={'ns0': namespace}): item_id = elt.text.strip() if item_id not in done and item_id in kwargs['jobs']: done.add(item_id) dbsession.add(DBWarehouseJob( warehouse_id=warehouse_id, job_id=item_id)) # Add users and groups self.xml2db_extra_users_and_groups(dbsession, warehouse_elt, kwargs)
# -------------------------------------------------------------------------
[docs] def xml2db_extra_users_and_groups(self, dbsession, warehouse_elt, kwargs): """Load users and groups from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: User XML element. :param dict kwargs: Dictionary of keyword arguments with the key ``'profiles'``. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ warehouse_id = self.warehouse_id namespace = RELAXNG_CIOWAREHOUSE['namespace'] # Add users refs = kwargs.get('users', {}) done = set() for elt in warehouse_elt.xpath( 'ns0:users/ns0:user', namespaces={'ns0': namespace}): item_id = refs.get(elt.text) if item_id is not None and item_id not in done and ( elt.get('file') or elt.get('meta') or elt.get('favorite') == 'true'): done.add(item_id) file_rights = elt.get('file') or ( 'reader' if elt.get('meta') == 'writer' else None) meta_rights = 'writer' if file_rights == 'writer-admin' else \ elt.get('meta', 'reader' if file_rights else None) dbsession.add(DBWarehouseUser( warehouse_id=warehouse_id, user_id=item_id, file_rights=file_rights, meta_rights=meta_rights, favorite=elt.get('favorite') == 'true' or None)) # Add groups refs = kwargs.get('groups', ()) done = set() for elt in warehouse_elt.xpath( 'ns0:groups/ns0:group', namespaces={'ns0': namespace}): item_id = elt.text if item_id and item_id in refs and item_id not in done: done.add(item_id) is_admin = elt.get('file') == 'writer-admin' dbsession.add(DBWarehouseGroup( warehouse_id=warehouse_id, group_id=item_id, file_rights=elt.get('file', 'reader'), meta_rights='writer' if is_admin else elt.get( 'meta', 'reader')))
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, warehouse_id, warehouse_elt): """Convert a warehouse XML element into a dictionary. :param str warehouse_id: Warehouse ID. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: Warehouse XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE['namespace'] attachments_elt = warehouse_elt.find( '{{{0}}}attachments'.format(namespace)) vcs_elt = warehouse_elt.find('{{{0}}}vcs'.format(namespace)) thumb_elt = warehouse_elt.find('{{{0}}}thumbnails'.format(namespace)) return { 'warehouse_id': warehouse_id, 'i18n_label': dumps(i18n_xml_text( warehouse_elt, 'ns0:label', {'ns0': namespace})), 'i18n_description': i18n_xml_text( warehouse_elt, 'ns0:description', {'ns0': namespace}), 'attachments_key': attachments_elt is not None and attachments_elt.get( 'key') or None, 'picture': attachments_elt is not None and attachments_elt.findtext( '{{{0}}}picture'.format(namespace)) or None, 'location': warehouse_elt.findtext( '{{{0}}}location'.format(namespace)), 'vcs': vcs_elt is not None, 'vcs_url': (vcs_elt is not None and vcs_elt.findtext( '{{{0}}}url'.format(namespace))) or warehouse_elt.findtext( '{{{0}}}no-vcs'.format(namespace)), 'vcs_user': vcs_elt is not None and vcs_elt.findtext( '{{{0}}}user'.format(namespace)), 'vcs_password': vcs_elt is not None and vcs_elt.findtext( '{{{0}}}password'.format(namespace)), 'normalize': warehouse_elt.findtext( '{{{0}}}normalize'.format(namespace)), 'access': warehouse_elt.findtext( '{{{0}}}access'.format(namespace)), 'visible_url': warehouse_elt.findtext( '{{{0}}}visible-url'.format(namespace)), 'thumbnail_large': thumb_elt is not None and thumb_elt.get( 'large') or THUMBNAIL_SIZE_LARGE_DEFAULT, 'thumbnail_small': thumb_elt is not None and thumb_elt.get( 'small') or THUMBNAIL_SIZE_SMALL_DEFAULT, 'download_max_size': int( warehouse_elt.findtext( '{{{0}}}download-max-size'.format(namespace)) or DOWNLOAD_MAX_SIZE), 'lock_ttl': int(warehouse_elt.findtext( '{{{0}}}lock-ttl'.format(namespace)) or LOCK_TTL), 'refresh_period': int(warehouse_elt.findtext( '{{{0}}}refresh-period'.format(namespace)) or REFRESH_PERIOD), 'rendering_dir': warehouse_elt.findtext( '{{{0}}}rendering-dir'.format(namespace)), 'seeds_dir': warehouse_elt.findtext( '{{{0}}}seeds-dir'.format(namespace))}
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record): """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Warehouse ID if not record.get('warehouse_id'): return _('Warehouse without ID.') record['warehouse_id'] = make_id( record['warehouse_id'], 'standard', ID_LEN) # Labels and descriptions if not record_format_i18n(record): return _('Warehouse without label.') if not record.get('i18n_description'): record['i18n_description'] = {} # Location if not record.get('location'): return _('Warehouse without location.') # VCS if record.get('vcs'): password = record.get('vcs_password') if password and not record.get('vcs_user'): del record['vcs_password'] elif password and ( not tostr(password).endswith('=') or len(password) < 32): record['vcs_password'] = encrypt(password, 'warehouse') if record.get('vcs_url') and not urlparse(record['vcs_url']).scheme: record['vcs_url'] = abspath(expanduser(record['vcs_url'])) # Access if not record.get('access'): return _('Warehouse without access rights.') # Thumbnails record['thumbnail_large'] = record.get( 'thumbnail_large') or THUMBNAIL_SIZE_LARGE_DEFAULT record['thumbnail_small'] = record.get( 'thumbnail_small') or THUMBNAIL_SIZE_SMALL_DEFAULT return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession): """Serialize a warehouse to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :rtype: lxml.etree.Element """ warehouse_elt = etree.Element('warehouse') warehouse_elt.set('id', self.warehouse_id) # Labels and descriptions db2xml_i18n_labels(self, warehouse_elt, 6) # Attachment if self.attachments_key: elt = etree.SubElement( warehouse_elt, 'attachments', key=self.attachments_key) if self.picture: etree.SubElement(elt, 'picture').text = self.picture # Files self._db2xml_files(warehouse_elt) # Thumbnails if self.thumbnail_large != THUMBNAIL_SIZE_LARGE_DEFAULT \ or self.thumbnail_small != THUMBNAIL_SIZE_SMALL_DEFAULT: etree.SubElement( warehouse_elt, 'thumbnails', large=self.thumbnail_large or THUMBNAIL_SIZE_LARGE_DEFAULT, small=self.thumbnail_small or THUMBNAIL_SIZE_SMALL_DEFAULT) # Max size for a download if self.download_max_size is not None: etree.SubElement(warehouse_elt, 'download-max-size').text = \ str(self.download_max_size) # Lock TTL if self.lock_ttl != LOCK_TTL: etree.SubElement(warehouse_elt, 'lock-ttl').text = \ str(self.lock_ttl) # Refresh period if self.refresh_period != REFRESH_PERIOD: etree.SubElement(warehouse_elt, 'refresh-period').text = str( self.refresh_period) # Rendering directory if self.rendering_dir: etree.SubElement(warehouse_elt, 'rendering-dir').text = \ self.rendering_dir # Seeds directory if self.seeds_dir: etree.SubElement(warehouse_elt, 'seeds-dir').text = \ self.seeds_dir # Metadata and index fields self._db2xml_fields(warehouse_elt) # Handlers self._db2xml_handlers(warehouse_elt) # Jobs self._db2xml_jobs(warehouse_elt) # Users and groups self._db2xml_users_and_groups(dbsession, warehouse_elt) return warehouse_elt
# ------------------------------------------------------------------------- def _db2xml_files(self, warehouse_elt): """Add file information to the warehouse serialization. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: XML warehouse to complete. """ # Vcs etree.SubElement(warehouse_elt, 'location').text = self.location if self.vcs: elt = etree.SubElement(warehouse_elt, 'vcs') if self.vcs_url: etree.SubElement(elt, 'url').text = self.vcs_url if self.vcs_user: etree.SubElement(elt, 'user').text = self.vcs_user if self.vcs_password: etree.SubElement(elt, 'password').text = \ self.vcs_password elif self.vcs_url: etree.SubElement(warehouse_elt, 'no-vcs').text = self.vcs_url # File name normalization if self.normalize: etree.SubElement(warehouse_elt, 'normalize').text = self.normalize # File access etree.SubElement(warehouse_elt, 'access').text = self.access # Visible URL if self.visible_url: etree.SubElement(warehouse_elt, 'visible-url').text = \ self.visible_url # ------------------------------------------------------------------------- def _db2xml_fields(self, warehouse_elt): """Add fields to the warehouse serialization. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: XML warehouse to complete. """ # Metadata fields if self.metafields: elt = etree.SubElement(warehouse_elt, 'metafields') for dbitem in self.metafields: etree.SubElement(elt, 'metaref').text = dbitem.metafield_id # Index fields if self.indexfields: elt = etree.SubElement(warehouse_elt, 'indexfields') for dbitem in self.indexfields: subelt = etree.SubElement(elt, 'indexref') subelt.text = dbitem.indexfield_id if dbitem.in_list: subelt.set('in-list', str(dbitem.in_list)) if dbitem.in_cards: subelt.set('in-cards', str(dbitem.in_cards)) # ------------------------------------------------------------------------- def _db2xml_handlers(self, warehouse_elt): """Add handlers to the warehouse serialization. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: XML warehouse to complete. """ if not self.handlers: return handlers_elt = etree.SubElement(warehouse_elt, 'handlers') for dbhandler in self.handlers: etree.SubElement(handlers_elt, 'ban').text = dbhandler.handler_id # ------------------------------------------------------------------------- def _db2xml_jobs(self, warehouse_elt): """Add jobs to the warehouse serialization. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: XML warehouse to complete. """ if not self.jobs: return jobs_elt = etree.SubElement(warehouse_elt, 'jobs') for dbjob in self.jobs: etree.SubElement(jobs_elt, 'jobref').text = dbjob.job_id # ------------------------------------------------------------------------- def _db2xml_users_and_groups(self, dbsession, warehouse_elt): """Add users and groups to the warehouse serialization. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type warehouse_elt: lxml.etree.Element :param warehouse_elt: XML warehouse to complete. """ if self.users: users = {} users_elt = etree.SubElement(warehouse_elt, 'users') for dbuser in self.users: if dbuser.user_id not in users: users[dbuser.user_id] = dbsession.query( DBUser.login).filter_by( user_id=dbuser.user_id).first()[0] subelt = etree.SubElement(users_elt, 'user') subelt.text = users[dbuser.user_id] if dbuser.file_rights: subelt.set('file', str(dbuser.file_rights)) if dbuser.meta_rights == 'writer': subelt.set('meta', str(dbuser.meta_rights)) if dbuser.favorite: subelt.set('favorite', 'true') if self.groups: groups_elt = etree.SubElement(warehouse_elt, 'groups') for dbgroup in self.groups: subelt = etree.SubElement(groups_elt, 'group') subelt.text = dbgroup.group_id subelt.set('file', str(dbgroup.file_rights)) if dbgroup.meta_rights != 'reader' and \ dbgroup.file_rights != 'writer-admin': subelt.set('meta', str(dbgroup.meta_rights)) # -------------------------------------------------------------------------
[docs] def tab4view(self, request, tab_index, form, user_filter, user_paging): """Generate the tab content of a warehouse. :type request: pyramid.request.Request :param request: Current request. :param int index: Index of the tab. :type form: .lib.form.Form :param form: Current form object. :type user_filter: chrysalio.lib.filter.Filter :param user_filter: Filter for users. :type user_paging: chrysalio.lib.paging.Paging :param user_paging: Paging for warehouse users. :rtype: chrysalio.helpers.literal.Literal """ if tab_index == 0: return self._tab4view_information(request, form) if tab_index == 1: return self._tab4view_handlers(request) if tab_index == 2: return self._tab4view_jobs(request) if tab_index == 3: return self._tab4view_users( request, form, user_filter, user_paging) if tab_index == 4: return self._tab4view_groups(request) return ''
# ------------------------------------------------------------------------- def _tab4view_information(self, request, form): """Generate the information tab. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate html = form.grid_item( translate(_('Identifier:')), self.warehouse_id, clear=True) html += view_i18n_labels(request, form, self) html += form.grid_item( translate(_('Location:')), self.location, clear=True) html += form.grid_item( translate(_('Versioning:')), translate(_('yes')) if self.vcs else '', clear=True) html += form.grid_item( translate(_('VCS Remote URL:')), self.vcs_url if self.vcs else '', clear=True) html += form.grid_item( translate(_('VCS User:')), self.vcs_user if self.vcs else '', clear=True) html += form.grid_item( translate(_('Normalization:')), translate(WAREHOUSE_NORMALIZE.get( self.normalize)) if self.normalize else None, clear=True) html += form.grid_item( translate(_('Access:')), translate(WAREHOUSE_ACCESSES.get(self.access)), clear=True) html += form.grid_item( translate(_('Visible URL:')), self.visible_url, clear=True) html += form.grid_item( translate(_('Large thumbnail size:')), self.thumbnail_large, clear=True) html += form.grid_item( translate(_('Small thumbnail size:')), self.thumbnail_small, clear=True) if self.download_max_size > 0: html += form.grid_item( translate(_('Maximum size of download:')), size_label(self.download_max_size), clear=True) if self.lock_ttl != LOCK_TTL: html += form.grid_item( translate(_('Lock TTL:')), deltatime_label(self.lock_ttl), clear=True) if self.refresh_period != REFRESH_PERIOD: html += form.grid_item( translate(_('Refreshment every:')), deltatime_label(self.refresh_period), clear=True) if self.rendering_dir: html += form.grid_item( translate(_('Rendering directory:')), self.rendering_dir, clear=True) if self.seeds_dir: html += form.grid_item( translate(_('Seeds directory:')), self.seeds_dir, clear=True) return html # ------------------------------------------------------------------------- def _tab4view_handlers(self, request): """Generate the handlers tab. :type request: pyramid.request.Request :param request: Current request. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate all_handlers = {k.uid: k for k in request.registry.get('handlers', '')} if not all_handlers or not self.handlers: return translate(_('All file handlers.')) handlers = [] for dbhandler in self.handlers: if dbhandler.handler_id in all_handlers: handlers.append(all_handlers[dbhandler.handler_id]) html = Builder().div(translate(_('All file handlers EXCEPT:'))) for handler in handlers: html += Builder().div('✔ {0}'.format(translate(handler.label))) return html # ------------------------------------------------------------------------- def _tab4view_jobs(self, request): """Generate the jobs tab. :type request: pyramid.request.Request :param request: Current request. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if not request.registry.get('services') or not self.jobs: return translate(_('No action.')) html = '<table>\n<thead>\n'\ '<tr><th>{label}</th><th>{description}</th></tr>\n'\ '</thead>\n<tbody>\n'.format( label=translate(_('Label')), description=translate(_('Description'))) for dbjob in self.jobs: html += \ '<tr><td><a href="{ref}"><strong>{label}</strong></a></td>'\ '<td>{description}</td></tr>\n'.format( ref=request.route_path( 'job_view', job_id=dbjob.job_id), label=dbjob.label(request), description=dbjob.description(request)) html += '</tbody>\n</table>\n' return Literal(html) # ------------------------------------------------------------------------- def _tab4view_users(self, request, form, user_filter, user_paging): """Generate the users tab. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :type user_filter: chrysalio.lib.filter.Filter :param user_filter: Filter for users. :type user_paging: chrysalio.lib.paging.Paging :param user_paging: Paging for warehouse users. """ translate = request.localizer.translate if self.access == 'free': return translate(_('Access to this warehouse is not restricted.')) if self.access == 'closed': return translate(_('This warehouse is closed.')) if not self.users: return translate(_('No user is authorized.')) html = DBUser.paging_filter(request, form, user_filter, user_paging) html += self._user_thead(request, user_paging) users = { k.user_id: (k.file_rights, k.meta_rights, k.favorite) for k in self.users} for dbuser in user_paging: html += \ '<tr><th><a href="{user_view}">{login}</a></th>'\ '<td class="cioOptional">{fname}</td><td>{lname}</td>'\ '<td class="cioSelect">{file_rights}</td>'\ '<td class="cioSelect">{meta_rights}</td>'\ '<td class="cioBoolean">{favorite}</td></tr>\n'.format( user_view=request.route_path( 'user_view', user_id=dbuser.user_id), login=dbuser.login, fname=dbuser.first_name or '', lname=dbuser.last_name, file_rights=translate( FILE_RIGHTS_LABELS[users[dbuser.user_id][0]] if users[dbuser.user_id][0] else ''), meta_rights=translate( META_RIGHTS_LABELS[users[dbuser.user_id][1]] if users[dbuser.user_id][1] else ''), favorite='✔' if users[dbuser.user_id][2] else '') html += '</tbody>\n</table>\n' return Literal(html) # ------------------------------------------------------------------------- def _tab4view_groups(self, request): """Generate the users tab. :type request: pyramid.request.Request :param request: Current request. """ translate = request.localizer.translate if self.access == 'free': return translate(_('Access to this warehouse is not restricted.')) if self.access == 'closed': return translate(_('This warehouse is closed.')) if not self.groups: return translate(_('No group is authorized.')) dbgroups = request.dbsession.query(DBGroup).filter( DBGroup.group_id.in_([k.group_id for k in self.groups])).order_by( 'group_id') groups = { k.group_id: (k.file_rights, k.meta_rights) for k in self.groups} html = '<table>\n<thead>\n'\ '<tr><th>{label}</th><th>{description}</th>'\ '<th class="cioSelect">{file_rights}</th>'\ '<th class="cioSelect">{meta_rights}</th></tr>\n'\ '</thead>\n<tbody>\n'.format( label=translate(_('Label')), description=translate(_('Description')), file_rights=translate(_('File rights')), meta_rights=translate(_('Metadata rights'))) for dbgroup in dbgroups: html += \ '<tr><th><a href="{group_view}">{label}</a></th>'\ '<td>{description}</td>'\ '<td class="cioSelect">{file_rights}</td>'\ '<td class="cioSelect">{meta_rights}</td></tr>\n'.format( group_view=request.route_path( 'group_view', group_id=dbgroup.group_id), label=dbgroup.label(request), description=dbgroup.description(request), file_rights=translate( FILE_RIGHTS_LABELS[groups[dbgroup.group_id][0]]), meta_rights=translate( META_RIGHTS_LABELS[groups[dbgroup.group_id][1]])) html += '</tbody>\n</table>\n' return Literal(html) # -------------------------------------------------------------------------
[docs] @classmethod def settings_schema( cls, request, defaults, groups, jobs, dbwarehouse=None): """Return a Colander schema to edit a warehouse. :type request: pyramid.request.Request :param request: Current request. :param dict defaults: Default values for the form set by the user paging object. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict jobs: A dictionary such as ``{job_id: (label, description),...}``. :type dbwarehouse: DBWarehouse :param dbwarehouse: (optional) Current user warehouse SqlAlchemy object. :rtype: tuple :return: A tuple such as ``(schema, defaults)``. """ # Schema schema = colander.SchemaNode(colander.Mapping()) if dbwarehouse is None: schema.add(colander.SchemaNode( colander.String(), name='warehouse_id', validator=colander.Length(min=2, max=ID_LEN))) locations = { k: v for k, v in request.registry['modules'][ 'ciowarehouse'].locations.items() if k[0] != '/'} schema_i18n_labels(request, schema, LABEL_LEN, DESCRIPTION_LEN) if dbwarehouse is None: schema.add(colander.SchemaNode( colander.String(), name='location', validator=colander.OneOf(locations))) schema.add(colander.SchemaNode( colander.Boolean(), name='vcs', missing=False)) schema.add(colander.SchemaNode( colander.String(), name='vcs_url', validator=colander.Length(max=URL_LEN), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='vcs_user', validator=colander.Length(max=ID_LEN), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='vcs_password', validator=colander.Length(max=64), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='normalize', validator=colander.OneOf(WAREHOUSE_NORMALIZE.keys()), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='access', validator=colander.OneOf(WAREHOUSE_ACCESSES.keys()))) schema.add(colander.SchemaNode( colander.String(), name='visible_url', validator=colander.Length(max=URL_LEN), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='thumbnail_large', validator=colander.Regex(r'^\d{1,4}x\d{1,4}$'), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='thumbnail_small', validator=colander.Regex(r'^\d{1,3}x\d{1,3}$'), missing=None)) schema.add(colander.SchemaNode( colander.Integer(), name='download_max_size', validator=colander.Range(min=0), missing=DOWNLOAD_MAX_SIZE)) schema.add(colander.SchemaNode( colander.Integer(), name='lock_ttl', validator=colander.Range(min=0), missing=LOCK_TTL)) schema.add(colander.SchemaNode( colander.Integer(), name='refresh_period', validator=colander.Range(min=0), missing=REFRESH_PERIOD)) schema.add(colander.SchemaNode( colander.String(), name='rendering_dir', validator=colander.Regex( r'^(/?[a-zA-Z0-9_\-]+:)?[a-zA-Z0-9_.\-/]+$'), missing=None)) schema.add(colander.SchemaNode( colander.String(), name='seeds_dir', validator=colander.Regex( r'^(/?[a-zA-Z0-9_\-]+:)?[a-zA-Z0-9_.\-/]+$'), missing=None)) # File handlers for handler in request.registry.get('handlers', ()): schema.add(colander.SchemaNode( colander.Boolean(), name='hdl:{0}'.format(handler.uid), missing=False)) # Groups for group_id in groups: schema.add(colander.SchemaNode( colander.Boolean(), name='grp:{0}'.format(group_id), missing=False)) # Jobs for uid in jobs: schema.add(colander.SchemaNode( colander.Boolean(), name='job:{0}'.format(uid), missing=False)) # Defaults if dbwarehouse is None: defaults.update({ 'vcs': True, 'access': 'free', 'thumbnail_large': THUMBNAIL_SIZE_LARGE_DEFAULT, 'thumbnail_small': THUMBNAIL_SIZE_SMALL_DEFAULT, 'download_max_size': DOWNLOAD_MAX_SIZE, 'lock_ttl': LOCK_TTL, 'refresh_period': REFRESH_PERIOD}) else: defaults.update(defaults_i18n_labels(dbwarehouse)) for dbitem in dbwarehouse.users: defaults['usr:{0}'.format(dbitem.user_id)] = \ dbitem.file_rights is not None defaults['fil:{0}'.format(dbitem.user_id)] = \ dbitem.file_rights defaults['mta:{0}'.format(dbitem.user_id)] = \ dbitem.meta_rights defaults['fav:{0}'.format(dbitem.user_id)] = \ dbitem.favorite for dbitem in dbwarehouse.groups: defaults['grp:{0}'.format(dbitem.group_id)] = True defaults['gfil:{0}'.format(dbitem.group_id)] = \ dbitem.file_rights defaults['gmta:{0}'.format(dbitem.group_id)] = \ dbitem.meta_rights for dbitem in dbwarehouse.handlers: defaults['hdl:{0}'.format(dbitem.handler_id)] = True for dbitem in dbwarehouse.jobs: defaults['job:{0}'.format(dbitem.job_id)] = True return schema, defaults
# -------------------------------------------------------------------------
[docs] @classmethod def tab4edit(cls, request, tab_index, form, user_filter, user_paging, groups, jobs, dbwarehouse=None): """Generate the tab content of user warehouse for edition. :type request: pyramid.request.Request :param request: Current request. :param int tab_index: Index of the tab. :type form: chrysalio.lib.form.Form :param form: Current form object. :type user_filter: chrysalio.lib.filter.Filter :param user_filter: Filter for users. :type user_paging: chrysalio.lib.paging.Paging :param user_paging: Paging for all users. :param dict jobs: A dictionary such as ``{job_id: (label, description),...}``. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :type dbwarehouse: DBWarehouse :param dbwarehouse: (optional) Current user warehouse SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ # pylint: disable = too-many-arguments if tab_index == 0: return cls._tab4edit_information(request, form, dbwarehouse) if tab_index == 1: return cls._tab4edit_handlers(request, form) if tab_index == 2: return cls._tab4edit_jobs(request, form, jobs) if tab_index == 3: return cls._tab4edit_users( request, form, user_filter, user_paging, dbwarehouse) if tab_index == 4: return cls._tab4edit_groups(request, form, groups, dbwarehouse) return ''
# ------------------------------------------------------------------------- @classmethod def _tab4edit_information(cls, request, form, dbwarehouse): """Generate the information tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :type dbwarehouse: DBWarehouse :param dbwarehouse: Current user warehouse SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if dbwarehouse is None: html = form.grid_text( 'warehouse_id', translate(_('Identifier:')), required=True, maxlength=ID_LEN, clear=True) else: html = form.grid_item( translate(_('Identifier:')), dbwarehouse.warehouse_id, clear=True) html += edit_i18n_labels(request, form, LABEL_LEN, DESCRIPTION_LEN) if dbwarehouse is None: locations = { k: v for k, v in request.registry['modules'][ 'ciowarehouse'].locations.items() if k[0] != '/'} html += form.grid_select( 'location', translate(_('Location:')), [('', ' ')] + list(locations.keys()), required=True, clear=True) html += form.grid_custom_checkbox( 'vcs', translate(_('Versioning:')), clear=True) html += form.grid_text( 'vcs_url', translate(_('VCS Remote URL:')), maxlength=URL_LEN, clear=True) else: html += form.grid_item( translate(_('Location:')), dbwarehouse.location, clear=True) html += form.grid_item( translate(_('Versioning:')), translate(_('yes')) if dbwarehouse.vcs else '', clear=True) html += form.grid_item( translate(_('VCS Remote URL:')), dbwarehouse.vcs_url if dbwarehouse.vcs else '', clear=True) if dbwarehouse is None or dbwarehouse.vcs: html += form.grid_text( 'vcs_user', translate(_('VCS User:')), maxlength=ID_LEN, clear=True) html += form.grid_password( 'vcs_password', translate(_('VCS Password:')), maxlength=64, clear=True) html += form.grid_select( 'normalize', translate(_('Normalization:')), [('', ' ')] + list(WAREHOUSE_NORMALIZE.items()), clear=True) html += form.grid_select( 'access', translate(_('Access:')), WAREHOUSE_ACCESSES.items(), required=True, clear=True) html += form.grid_text( 'visible_url', translate(_('Visible URL:')), maxlength=URL_LEN, clear=True) html += form.grid_text( 'thumbnail_large', translate(_('Large thumbnail size:')), placeholder=translate( _('ex.: ${s}', {'s': THUMBNAIL_SIZE_LARGE_DEFAULT})), maxlength=10, hint=translate(THUMBNAIL_SIZE_HINT), clear=True) html += form.grid_text( 'thumbnail_small', translate(_('Small thumbnail size:')), placeholder=translate( _('ex.: ${s}', {'s': THUMBNAIL_SIZE_SMALL_DEFAULT})), maxlength=10, clear=True) html += form.grid_text( 'download_max_size', translate(_('Maximum size of download:')), maxlength=10, hint=translate(DOWNLOAD_MAX_SIZE_HINT), clear=True) html += form.grid_text( 'lock_ttl', translate(_('Lock TTL:')), maxlength=10, hint=translate(LOCK_TTL_HINT), clear=True) html += form.grid_text( 'refresh_period', translate(_('Refreshment every:')), maxlength=10, hint=translate(REFRESH_PERIOD_HINT), clear=True) html += form.grid_text( 'rendering_dir', translate(_('Rendering directory:')), maxlength=PATH_LEN, clear=True) html += form.grid_text( 'seeds_dir', translate(_('Seeds directory:')), maxlength=PATH_LEN, clear=True) return html # ------------------------------------------------------------------------- @classmethod def _tab4edit_handlers(cls, request, form): """Generate the jobs tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if not request.registry.get('handlers'): return translate(_('No file handler.')) html = Builder().div(translate(_('All file handlers EXCEPT:'))) for handler in request.registry['handlers']: html += Builder().div( form.custom_checkbox('hdl:{0}'.format(handler.uid)) + Literal(' <label for="hdl{0}">{1}</label>'.format( handler.uid, translate(handler.label)))) return html # ------------------------------------------------------------------------- @classmethod def _tab4edit_jobs(cls, request, form, jobs): """Generate the jobs tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :param dict jobs: A dictionary such as ``{job_id: (label, description),...}``. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if not request.registry.get('services') or not jobs: return translate(_('No available action.')) html = '<table>\n<thead>\n'\ '<tr><th></th><th>{label}</th>'\ '<th>{description}</th></tr>\n</thead>\n<tbody>\n'.format( label=translate(_('Label')), description=translate(_('Description'))) for job_id in jobs: html += \ '<tr><td>{selected}</td>'\ '<td><label for="{id}"><strong>{label}</strong></label></td>'\ '<td>{description}</td></tr>\n'.format( selected=form.custom_checkbox('job:{0}'.format(job_id)), id='job{0}'.format(job_id), label=jobs[job_id][0], description=jobs[job_id][1]) html += '</tbody>\n</table>\n' return Literal(html) # ------------------------------------------------------------------------- @classmethod def _tab4edit_users( cls, request, form, user_filter, user_paging, dbwarehouse): """Generate the user tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: chrysalio.lib.form.Form :param form: Current form object. :type user_filter: chrysalio.lib.filter.Filter :param user_filter: Filter for users. :type user_paging: chrysalio.lib.paging.Paging :param user_paging: Paging for all users. :type dbwarehouse: DBWarehouse :param dbwarehouse: Current user warehouse SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if dbwarehouse and dbwarehouse.access not in ( 'restricted', 'restricted-readonly'): return translate(_('Access to this warehouse is not restricted.')) html = DBUser.paging_filter(request, form, user_filter, user_paging) html += cls._user_thead(request, user_paging).replace( '<tr>', '<tr><th class="cioCheckbox" id="check_all"></th>') for dbuser in user_paging: html += \ '<tr><td class="cioCheckbox cioSelect">{check}{visible}</td>'\ '<td>{login}</td>'\ '<td class="cioOptional">{fname}</td><td>{lname}</td>'\ '<td class="cioSelect">{file_rights}</td>'\ '<td class="cioSelect">{meta_rights}</td>'\ '<td class="cioBoolean">{favorite}</td></tr>\n'.format( check=form.custom_checkbox( 'usr:{0}'.format(dbuser.user_id)), visible=form.hidden( 'set:{0}'.format(dbuser.user_id), '1'), login=dbuser.login, fname=dbuser.first_name or '', lname=dbuser.last_name, file_rights=form.select( 'fil:{0}'.format(dbuser.user_id), None, [('', ' ')] + list(FILE_RIGHTS_LABELS.items())), meta_rights=form.select( 'mta:{0}'.format(dbuser.user_id), None, [('', ' ')] + list(META_RIGHTS_LABELS.items())), favorite=form.custom_checkbox( 'fav:{0}'.format(dbuser.user_id))) html += '</tbody>\n</table>\n' return Literal(html) # ------------------------------------------------------------------------- @classmethod def _tab4edit_groups(cls, request, form, groups, dbwarehouse): """Generate the group tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :type dbwarehouse: DBWarehouse :param dbwarehouse: Current user warehouse SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if dbwarehouse and dbwarehouse.access not in ( 'restricted', 'restricted-readonly'): return translate(_('Access to this warehouse is not restricted.')) html = '<table>\n<thead>\n'\ '<tr><th></th><th>{label}</th>'\ '<th>{description}</th>'\ '<th class="cioSelect">{file_rights}</th>'\ '<th class="cioSelect">{meta_rights}</th></tr>\n'\ '</thead>\n<tbody>\n'.format( label=translate(_('Label')), description=translate(_('Description')), file_rights=translate(_('File rights')), meta_rights=translate(_('Metadata rights'))) for group_id in groups: html += \ '<tr><td>{selected}</td>'\ '<th><label for="{id}">{label}</label></th>'\ '<td>{description}</td>'\ '<td class="cioSelect">{file_rights}</td>'\ '<td class="cioSelect">{meta_rights}</td></tr>\n'.format( selected=form.custom_checkbox('grp:{0}'.format(group_id)), id='grp{0}'.format(group_id), label=groups[group_id][0], description=groups[group_id][1], file_rights=form.select( 'gfil:{0}'.format(group_id), None, [('', ' ')] + list(FILE_RIGHTS_LABELS.items())), meta_rights=form.select( 'gmta:{0}'.format(group_id), None, [('', ' ')] + list(META_RIGHTS_LABELS.items()))) html += '</tbody>\n</table>\n' return Literal(html) # ------------------------------------------------------------------------- @classmethod def _user_thead(cls, request, user_paging): """Table header for warehouse users. :type request: pyramid.request.Request :param request: Current request. :type user_paging: chrysalio.lib.paging.Paging :param user_paging: Paging for users. :rtype: str """ translate = request.localizer.translate return \ '<table class="cioPagingList">\n<thead><tr>'\ '<th>{login}</th>'\ '<th class="cioOptional">{fname}</th><th>{lname}</th>'\ '<th class="cioSelect">{file_rights}</th>'\ '<th class="cioSelect">{meta_rights}</th>'\ '<th class="cioBoolean">{favorite}</th>'\ '</tr></thead>\n<tbody>\n'.format( login=user_paging.sortable_column( translate(_('Login')), 'login'), fname=user_paging.sortable_column( translate(_('First name')), 'first_name'), lname=user_paging.sortable_column( translate(_('Last name')), 'last_name'), file_rights=translate(_('File rights')), meta_rights=translate(_('Metadata rights')), favorite=translate(_('Favorite')))
# =============================================================================
[docs] class DBWarehouseMetafield(DBDeclarativeClass): """Class to link warehouses with their metadata fields (many-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_warehouses_metafields' __table_args__ = {'mysql_engine': 'InnoDB'} warehouse_id = Column( String(ID_LEN), ForeignKey('wrh_warehouses.warehouse_id', ondelete='CASCADE'), primary_key=True) metafield_id = Column( String(ID_LEN), ForeignKey('wrh_metafields.metafield_id', ondelete='CASCADE'), primary_key=True) position = Column(Integer, default=0)
# =============================================================================
[docs] class DBWarehouseIndexfield(DBDeclarativeClass): """Class to link warehouses with their list index fields (many-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_warehouses_indexfields' __table_args__ = {'mysql_engine': 'InnoDB'} warehouse_id = Column( String(ID_LEN), ForeignKey( 'wrh_warehouses.warehouse_id', ondelete='CASCADE'), primary_key=True, nullable=False) indexfield_id = Column( String(ID_LEN), ForeignKey('wrh_indexfields.indexfield_id', ondelete='CASCADE'), primary_key=True) in_list = Column(Integer) in_cards = Column(Integer)
# =============================================================================
[docs] class DBWarehouseHandler(DBDeclarativeClass): """Class to link warehouses with their handler.""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_warehouses_handlers' __table_args__ = {'mysql_engine': 'InnoDB'} warehouse_id = Column( String(ID_LEN), ForeignKey( 'wrh_warehouses.warehouse_id', ondelete='CASCADE'), primary_key=True, nullable=False) handler_id = Column(String(ID_LEN), primary_key=True)
# =============================================================================
[docs] class DBWarehouseJob(DBDeclarativeClass): """Class to link warehouses with their jobs (one-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_warehouses_jobs' __table_args__ = {'mysql_engine': 'InnoDB'} warehouse_id = Column( String(ID_LEN), ForeignKey('wrh_warehouses.warehouse_id', ondelete='CASCADE'), primary_key=True) job_id = Column( String(ID_LEN), ForeignKey('srv_jobs.job_id', ondelete='CASCADE'), primary_key=True)
# =============================================================================
[docs] class DBWarehouseUser(DBDeclarativeClass): """Class to link warehouses with their authorized users (many-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_warehouses_users' __table_args__ = {'mysql_engine': 'InnoDB'} __mapper_args__ = {'confirm_deleted_rows': False} warehouse_id = Column( String(ID_LEN), ForeignKey('wrh_warehouses.warehouse_id', ondelete='CASCADE'), primary_key=True) user_id = Column( Integer, ForeignKey('users.user_id', ondelete='CASCADE'), primary_key=True) file_rights = Column( Enum(*FILE_RIGHTS_LABELS.keys(), name='file_rights_enum')) meta_rights = Column( Enum(*META_RIGHTS_LABELS.keys(), name='meta_rights_enum')) favorite = Column(Boolean(name='favorite'), default=False)
# =============================================================================
[docs] class DBWarehouseGroup(DBDeclarativeClass): """Class to link warehouses with their authorized groups (many-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_warehouses_groups' __table_args__ = {'mysql_engine': 'InnoDB'} __mapper_args__ = {'confirm_deleted_rows': False} warehouse_id = Column( String(ID_LEN), ForeignKey('wrh_warehouses.warehouse_id', ondelete='CASCADE'), primary_key=True) group_id = Column( String(ID_LEN), ForeignKey('groups.group_id', ondelete='CASCADE'), primary_key=True) file_rights = Column( Enum(*FILE_RIGHTS_LABELS.keys(), name='file_rights_enum'), default='reader') meta_rights = Column( Enum(*META_RIGHTS_LABELS.keys(), name='meta_rights_enum'), default='reader')