Source code for ciowarehouse.services.inputs
"""A service to periodically retrieve inputs (file, FTP, email), apply rules
and possibly import new documents."""
from __future__ import annotations
from os.path import exists
from transaction import manager
from sqlalchemy import desc
from chrysalio.models import get_tm_dbsession
from cioservice.lib.service import Service
from ..lib.i18n import _
from ..lib.warehouse import Warehouse
from ..inputs.stream_file import InputStreamFile
from ..inputs.stream_ftp import InputStreamFtp
from ..inputs.stream_email import InputStreamEmail
from ..inputs.rule_basic import InputRuleBasic
from ..models.dbwarehouse import DBWarehouse
from ..models.dbinput import DBInputStream, DBInputRule
# =============================================================================
[docs]def includeme(configurator):
"""Function to include `inputs` service.
:type configurator: pyramid.config.Configurator
:param configurator:
Object used to do configuration declaration within the application.
"""
Service.register(configurator, ServiceInputs)
# =============================================================================
[docs]class ServiceInputs(Service):
"""Class to manage `inputs` service.
See: :class:`chrysalio.lib.service.Service`
This service loops over input streams. Each stream returns a list of
documents. Documents are submitted to the input rules. The first rule that
matches with a document determines its destination.
Regardless of its origin, a document is represented by a dictionary with
the following keys:
* ``'stream-id'``
* ``'stream-type'``
* ``'from'``
* ``'to'``
* ``'title'``
* ``'organization'``
* ``'language'``
* ``'datetime'``
* ``'path'``
* ``'filename'``
* ``'dirname'``
* ``'source'``
* ``'body'``
"""
label = _('Inputs service')
archives: dict = {}
_warehouses: dict = {}
# -------------------------------------------------------------------------
def __init__(self, registry):
"""Constructor method."""
super(ServiceInputs, self).__init__(registry)
if 'input_streams' not in self._registry:
self._registry['input_streams'] = {}
self._registry['input_streams']['file'] = InputStreamFile(
self.warehouse)
self._registry['input_streams']['ftp'] = InputStreamFtp(
self.warehouse)
self._registry['input_streams']['email'] = InputStreamEmail(
self.warehouse)
if 'input_rules' not in self._registry:
self._registry['input_rules'] = {}
self._registry['input_rules']['basic'] = InputRuleBasic
# -------------------------------------------------------------------------
[docs] def warehouse(self, build, warehouse_id):
"""Return the warehouse with ID ``warehouse_id`` or ``None``.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param str warehouse_id:
ID of the warehouse to return.
:rtype: :class:`.lib.warehouse.Warehouse` or ``None``
"""
if not warehouse_id:
return None
if warehouse_id in self._warehouses:
return self._warehouses[warehouse_id]
locations = self._registry['modules']['ciowarehouse'].locations
dbwarehouse = build.dbsession.query(DBWarehouse).filter_by(
warehouse_id=warehouse_id).first()
if dbwarehouse is None or dbwarehouse.location not in locations:
return None
warehouse = Warehouse(self._registry, dbwarehouse, locations)
if not exists(warehouse.root):
return None
self._warehouses[warehouse_id] = warehouse
return self._warehouses[warehouse_id]
# -------------------------------------------------------------------------
def _run(self, build):
"""Execute the service on the build ``build``.
See: :meth:`chrysalio.lib.service.Service._run`
"""
# Find the DB session
if build.dbsession is not None:
self._browse_input_streams(build)
self.write_traces(build)
elif self._registry.get('dbsession_factory'):
with manager:
build.dbsession = get_tm_dbsession(
self._registry['dbsession_factory'], manager)
self._browse_input_streams(build)
self.write_traces(build)
# -------------------------------------------------------------------------
def _browse_input_streams(self, build):
"""Load rules and browse all input streams.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
"""
# Load rules
rules = []
for dbrule in build.dbsession.query(DBInputRule).order_by(
desc('priority')):
if dbrule.rule_type in self._registry['input_rules']:
rules.append(self._registry['input_rules'][dbrule.rule_type](
self.warehouse, dbrule))
else:
build.error(
_('Unknown type of rule "${r}"', {'r': dbrule.rule_type}))
if not rules:
return
# Loop over input streams
for dbstream in build.dbsession.query(DBInputStream).order_by(
'stream_id'):
if dbstream.stream_type not in self._registry['input_streams']:
build.error(_('Unknown type of stream "${s}".', {
's': dbstream.stream_type}))
continue
for document in self._registry['input_streams'][
dbstream.stream_type].documents(build, dbstream):
for rule in rules:
applied = rule.apply(build, document)
if applied and rule.exclusive:
break