Source code for ciowarehouse.inputs.rule_basic

"""Class for basic input rule."""

from os import makedirs
from os.path import join, normpath, relpath, exists, isfile, basename, dirname
from logging import getLogger
from shutil import copy
from re import compile as re_compile, error as re_error, sub as re_sub
from collections import OrderedDict
from functools import reduce

from chrysalio.lib.utils import copy_content
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES, INFOS_DIR
from ..lib.utils import infos_load, infos_save, infos_set_metadata


LOG = getLogger(__name__)


# =============================================================================
[docs]class InputRuleBasic(object): """Class to manage basic input rule. :param get_warehouse: Function to retrieve a warehouse. :type dbrule: .models.dbinput.DBInputRule :param dbrule: SqlAlchemy object to build this rule. ``self.conditions`` is a list of conditions. Each condition is a tuple such as ``(key, value_regex)``. """ # ------------------------------------------------------------------------- def __init__(self, get_warehouse, dbrule): """Constructor method.""" self._get_warehouse = get_warehouse self.exclusive = dbrule.exclusive self.variables = [] for dbvariable in dbrule.variables: try: self.variables.append(( dbvariable.name, (dbvariable.key, re_compile( dbvariable.pattern) if dbvariable.pattern else None, dbvariable.transform, [k.split('=') for k in (dbvariable.args or '').split()]))) except re_error as error: LOG.error(error) self.variables = OrderedDict(self.variables) self.conditions = [] for dbcondition in dbrule.conditions: try: self.conditions.append(( dbcondition.key, re_compile(dbcondition.pattern) if dbcondition.pattern else None)) except re_error as error: LOG.error(error) self.warehouse_id = dbrule.warehouse_id self.path = dbrule.path self.flat = dbrule.flat # -------------------------------------------------------------------------
[docs] def apply(self, build, document): """Check if the document matches with this rule and possibly send the document to its destination. :type build: cioservice.lib.build.Build :param build: Current build object. :param dict document: Current document. :rtype: bool """ # Values values = document.copy() for name in self.variables: if self.variables[name][0] in values: if self.variables[name][1] is not None: matches = self.variables[name][1].search( values[self.variables[name][0]]) if matches and matches.groups(): values[name] = self._transform( name, ''.join(matches.groups())) else: values[name] = self._transform( name, values[self.variables[name][0]]) # Conditions for condition in self.conditions: if condition[0] not in values \ or (condition[1] is None and not values[condition[0]]) \ or (condition[1] is not None and condition[1].search( values[condition[0]])) is None: return False # Destination try: warehouse_id = self.warehouse_id.format(**values) path = normpath((self.path or '.').format(**values)) except KeyError as error: build.error(error) return False warehouse = self._get_warehouse(build, warehouse_id) if warehouse is None: return False return self.document2destination( build, values, document, warehouse, path)
# -------------------------------------------------------------------------
[docs] def document2destination(self, build, values, document, warehouse, path): """Copy the document into the destination warehouse. :type build: cioservice.lib.build.Build :param build: Current build object. :param dict values: Values of the variables. :param dict document: Current document. :type warehouse: ciowarehouse.lib.warehouse.Warehouse :param warehouse: Object for destination warehouse. :param str path: Path of the document inside the warehouse. :rtype: bool """ # Check document if not document.get('source'): return False destination = normpath(join( warehouse.root, path, '.' if self.flat else dirname(document['path']))) if not destination.startswith(warehouse.root): build.error(translate( _('Directory ${d} is outside warehouse!', {'d': path}), lang=build.lang)) return False # Import document makedirs(destination, exist_ok=True) if isfile(document['source']): copy(document['source'], destination) elif exists(document['source']): copy_content(document['source'], destination, EXCLUDED_FILES) warehouse.vcs.add(destination) # Update metadata self._update_metafields(document, warehouse, path, values) # Commit and refresh the warehouse self._commit(build, document, warehouse, path, destination) return True
# ------------------------------------------------------------------------- @classmethod def _update_metafields(cls, document, warehouse, path, values): """Update metadata fields. :param dict document: Current document. :type warehouse: ciowarehouse.lib.warehouse.Warehouse :param warehouse: Object for destination warehouse. :param str path: Path of the document inside the warehouse. :param dict values: Values of the variables. """ # Retrieve metadata metafields = [] for name in values: if name in warehouse.metafields: metafields.append( (name, warehouse.metafields[name]['type'], values[name])) if not metafields: return # Load or create infos file infos_file = join( warehouse.root, INFOS_DIR, path, '{0}.xml'.format(basename(document['source']))) root_elt = infos_load(infos_file) # Update fields infos_set_metadata(root_elt, metafields) # Save XML infos_save(root_elt, infos_file) warehouse.vcs.add(join(warehouse.root, INFOS_DIR, path)) # ------------------------------------------------------------------------- @classmethod def _commit(cls, build, document, warehouse, path, destination): """Commit and refresh the warehouse. :type build: cioservice.lib.build.Build :param build: Current build object. :param dict document: Current document. :type warehouse: ciowarehouse.lib.warehouse.Warehouse :param warehouse: Object for destination warehouse. :param str path: Path of the document inside the warehouse. :param str destination: Absolute path of the destination directory. """ error = warehouse.vcs.commit( translate(_('Automatic import')), document.get('from', '')) if error: # pragma: nocover build.error(error) else: build.info(translate(_('"${s}" imported from ${f} into ${t}.', { 's': basename(document['source']), 'f': document['from'], 't': join( warehouse.uid, relpath(destination, warehouse.root))}))) warehouse.refresh( None, (path,), recursive=True, dbsession=build.dbsession) warehouse.unrefreshed() # ------------------------------------------------------------------------- def _transform(self, name, value): """Apply the transformation (capitalize, lower or upper) of the variable named ``name`` to the ``value``. :param str name: Name of the variable. :param str value: Value to transform. :rtype: str """ if self.variables[name][2] is None: return value if self.variables[name][2] == 'capitalize': return value.capitalize() if self.variables[name][2] == 'upper': return value.upper() if self.variables[name][2] == 'lower': return value.lower() if self.variables[name][2] == 'replace' and self.variables[name][3]: return reduce( lambda a, kv: re_sub(kv[0], kv[1], a), self.variables[name][3], value) return value # pragma: nocover