Source code for ciowarehouse.services.inputs

"""A service to periodically retrieve inputs (file, FTP, email), apply rules
and possibly import new documents."""

from __future__ import annotations
from os.path import exists

from transaction import manager
from sqlalchemy import desc

from chrysalio.models import get_tm_dbsession
from cioservice.lib.service import Service
from ..lib.i18n import _
from ..lib.warehouse import Warehouse
from ..inputs.stream_file import InputStreamFile
from ..inputs.stream_ftp import InputStreamFtp
from ..inputs.stream_email import InputStreamEmail
from ..inputs.rule_basic import InputRuleBasic
from ..models.dbwarehouse import DBWarehouse
from ..models.dbinput import DBInputStream, DBInputRule


# =============================================================================
[docs]def includeme(configurator): """Function to include `inputs` service. :type configurator: pyramid.config.Configurator :param configurator: Object used to do configuration declaration within the application. """ Service.register(configurator, ServiceInputs)
# =============================================================================
[docs]class ServiceInputs(Service): """Class to manage `inputs` service. See: :class:`chrysalio.lib.service.Service` This service loops over input streams. Each stream returns a list of documents. Documents are submitted to the input rules. The first rule that matches with a document determines its destination. Regardless of its origin, a document is represented by a dictionary with the following keys: * ``'stream-id'`` * ``'stream-type'`` * ``'from'`` * ``'to'`` * ``'title'`` * ``'organization'`` * ``'language'`` * ``'datetime'`` * ``'path'`` * ``'filename'`` * ``'dirname'`` * ``'source'`` * ``'body'`` """ label = _('Inputs service') archives: dict = {} _warehouses: dict = {} # ------------------------------------------------------------------------- def __init__(self, registry): """Constructor method.""" super(ServiceInputs, self).__init__(registry) if 'input_streams' not in self._registry: self._registry['input_streams'] = {} self._registry['input_streams']['file'] = InputStreamFile( self.warehouse) self._registry['input_streams']['ftp'] = InputStreamFtp( self.warehouse) self._registry['input_streams']['email'] = InputStreamEmail( self.warehouse) if 'input_rules' not in self._registry: self._registry['input_rules'] = {} self._registry['input_rules']['basic'] = InputRuleBasic # -------------------------------------------------------------------------
[docs] def warehouse(self, build, warehouse_id): """Return the warehouse with ID ``warehouse_id`` or ``None``. :type build: cioservice.lib.build.Build :param build: Current build object. :param str warehouse_id: ID of the warehouse to return. :rtype: :class:`.lib.warehouse.Warehouse` or ``None`` """ if not warehouse_id: return None if warehouse_id in self._warehouses: return self._warehouses[warehouse_id] locations = self._registry['modules']['ciowarehouse'].locations dbwarehouse = build.dbsession.query(DBWarehouse).filter_by( warehouse_id=warehouse_id).first() if dbwarehouse is None or dbwarehouse.location not in locations: return None warehouse = Warehouse(self._registry, dbwarehouse, locations) if not exists(warehouse.root): return None self._warehouses[warehouse_id] = warehouse return self._warehouses[warehouse_id]
# ------------------------------------------------------------------------- def _run(self, build): """Execute the service on the build ``build``. See: :meth:`chrysalio.lib.service.Service._run` """ # Find the DB session if build.dbsession is not None: self._browse_input_streams(build) self.write_traces(build) elif self._registry.get('dbsession_factory'): with manager: build.dbsession = get_tm_dbsession( self._registry['dbsession_factory'], manager) self._browse_input_streams(build) self.write_traces(build) # ------------------------------------------------------------------------- def _browse_input_streams(self, build): """Load rules and browse all input streams. :type build: cioservice.lib.build.Build :param build: Current build object. """ # Load rules rules = [] for dbrule in build.dbsession.query(DBInputRule).order_by( desc('priority')): if dbrule.rule_type in self._registry['input_rules']: rules.append(self._registry['input_rules'][dbrule.rule_type]( self.warehouse, dbrule)) else: build.error( _('Unknown type of rule "${r}"', {'r': dbrule.rule_type})) if not rules: return # Loop over input streams for dbstream in build.dbsession.query(DBInputStream).order_by( 'stream_id'): if dbstream.stream_type not in self._registry['input_streams']: build.error(_('Unknown type of stream "${s}".', { 's': dbstream.stream_type})) continue for document in self._registry['input_streams'][ dbstream.stream_type].documents(build, dbstream): for rule in rules: applied = rule.apply(build, document) if applied and rule.exclusive: break