Source code for ciowarehouse.inputs.rule_basic
"""Class for basic input rule."""
from os import makedirs
from os.path import join, normpath, relpath, exists, isfile, basename, dirname
from logging import getLogger
from shutil import copy
from re import compile as re_compile, error as re_error, sub as re_sub
from collections import OrderedDict
from functools import reduce
from chrysalio.lib.utils import copy_content
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES, INFOS_DIR
from ..lib.utils import infos_load, infos_save, infos_set_metadata
LOG = getLogger(__name__)
# =============================================================================
[docs]class InputRuleBasic(object):
"""Class to manage basic input rule.
:param get_warehouse:
Function to retrieve a warehouse.
:type dbrule: .models.dbinput.DBInputRule
:param dbrule:
SqlAlchemy object to build this rule.
``self.conditions`` is a list of conditions. Each condition is a tuple such
as ``(key, value_regex)``.
"""
# -------------------------------------------------------------------------
def __init__(self, get_warehouse, dbrule):
"""Constructor method."""
self._get_warehouse = get_warehouse
self.exclusive = dbrule.exclusive
self.variables = []
for dbvariable in dbrule.variables:
try:
self.variables.append((
dbvariable.name,
(dbvariable.key,
re_compile(
dbvariable.pattern) if dbvariable.pattern else None,
dbvariable.transform,
[k.split('=') for k in (dbvariable.args or '').split()])))
except re_error as error:
LOG.error(error)
self.variables = OrderedDict(self.variables)
self.conditions = []
for dbcondition in dbrule.conditions:
try:
self.conditions.append((
dbcondition.key,
re_compile(dbcondition.pattern)
if dbcondition.pattern else None))
except re_error as error:
LOG.error(error)
self.warehouse_id = dbrule.warehouse_id
self.path = dbrule.path
self.flat = dbrule.flat
# -------------------------------------------------------------------------
[docs] def apply(self, build, document):
"""Check if the document matches with this rule and possibly send the
document to its destination.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param dict document:
Current document.
:rtype: bool
"""
# Values
values = document.copy()
for name in self.variables:
if self.variables[name][0] in values:
if self.variables[name][1] is not None:
matches = self.variables[name][1].search(
values[self.variables[name][0]])
if matches and matches.groups():
values[name] = self._transform(
name, ''.join(matches.groups()))
else:
values[name] = self._transform(
name, values[self.variables[name][0]])
# Conditions
for condition in self.conditions:
if condition[0] not in values \
or (condition[1] is None and not values[condition[0]]) \
or (condition[1] is not None and condition[1].search(
values[condition[0]])) is None:
return False
# Destination
try:
warehouse_id = self.warehouse_id.format(**values)
path = normpath((self.path or '.').format(**values))
except KeyError as error:
build.error(error)
return False
warehouse = self._get_warehouse(build, warehouse_id)
if warehouse is None:
return False
return self.document2destination(
build, values, document, warehouse, path)
# -------------------------------------------------------------------------
[docs] def document2destination(self, build, values, document, warehouse, path):
"""Copy the document into the destination warehouse.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param dict values:
Values of the variables.
:param dict document:
Current document.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object for destination warehouse.
:param str path:
Path of the document inside the warehouse.
:rtype: bool
"""
# Check document
if not document.get('source'):
return False
destination = normpath(join(
warehouse.root, path,
'.' if self.flat else dirname(document['path'])))
if not destination.startswith(warehouse.root):
build.error(translate(
_('Directory ${d} is outside warehouse!', {'d': path}),
lang=build.lang))
return False
# Import document
makedirs(destination, exist_ok=True)
if isfile(document['source']):
copy(document['source'], destination)
elif exists(document['source']):
copy_content(document['source'], destination, EXCLUDED_FILES)
warehouse.vcs.add(destination)
# Update metadata
self._update_metafields(document, warehouse, path, values)
# Commit and refresh the warehouse
self._commit(build, document, warehouse, path, destination)
return True
# -------------------------------------------------------------------------
@classmethod
def _update_metafields(cls, document, warehouse, path, values):
"""Update metadata fields.
:param dict document:
Current document.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object for destination warehouse.
:param str path:
Path of the document inside the warehouse.
:param dict values:
Values of the variables.
"""
# Retrieve metadata
metafields = []
for name in values:
if name in warehouse.metafields:
metafields.append(
(name, warehouse.metafields[name]['type'], values[name]))
if not metafields:
return
# Load or create infos file
infos_file = join(
warehouse.root, INFOS_DIR, path,
'{0}.xml'.format(basename(document['source'])))
root_elt = infos_load(infos_file)
# Update fields
infos_set_metadata(root_elt, metafields)
# Save XML
infos_save(root_elt, infos_file)
warehouse.vcs.add(join(warehouse.root, INFOS_DIR, path))
# -------------------------------------------------------------------------
@classmethod
def _commit(cls, build, document, warehouse, path, destination):
"""Commit and refresh the warehouse.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param dict document:
Current document.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object for destination warehouse.
:param str path:
Path of the document inside the warehouse.
:param str destination:
Absolute path of the destination directory.
"""
error = warehouse.vcs.commit(
translate(_('Automatic import')), document.get('from', ''))
if error: # pragma: nocover
build.error(error)
else:
build.info(translate(_('"${s}" imported from ${f} into ${t}.', {
's': basename(document['source']), 'f': document['from'],
't': join(
warehouse.uid, relpath(destination, warehouse.root))})))
warehouse.refresh(
None, (path,), recursive=True, dbsession=build.dbsession)
warehouse.unrefreshed()
# -------------------------------------------------------------------------
def _transform(self, name, value):
"""Apply the transformation (capitalize, lower or upper) of the
variable named ``name`` to the ``value``.
:param str name:
Name of the variable.
:param str value:
Value to transform.
:rtype: str
"""
if self.variables[name][2] is None:
return value
if self.variables[name][2] == 'capitalize':
return value.capitalize()
if self.variables[name][2] == 'upper':
return value.upper()
if self.variables[name][2] == 'lower':
return value.lower()
if self.variables[name][2] == 'replace' and self.variables[name][3]:
return reduce(
lambda a, kv: re_sub(kv[0], kv[1], a),
self.variables[name][3], value)
return value # pragma: nocover