Source code for ciowarehouse.models.dbinput
"""SQLAlchemy-powered model definitions for input streams (file, FTP or email)
and input rules."""
from sqlalchemy import Column, ForeignKey, Enum, String, Integer, Boolean, Text
from sqlalchemy.orm import relationship
from lxml import etree
from chrysalio.lib.utils import make_id, tostr, encrypt
from chrysalio.models import ID_LEN, EMAIL_LEN, VALUE_LEN, DBDeclarativeClass
from . import URL_LEN, PATH_LEN
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..lib.i18n import _
VARIABLE_TRANSFORM_LABELS = {
'capitalize': _('Capitalize'), 'lower': _('Lower'), 'upper': _('Upper'),
'replace': _('Replace')}
DEFAULT_ARCHIVE_TTL = 720 # in hours = 30 days
# =============================================================================
[docs]class DBInputStream(DBDeclarativeClass):
"""SQLAlchemy-powered stream class."""
suffix = 'cioinp'
__tablename__ = 'wrh_input_streams'
__table_args__ = {'mysql_engine': 'InnoDB'}
stream_id = Column(String(EMAIL_LEN), primary_key=True)
stream_type = Column(String(ID_LEN), nullable=False)
host = Column(String(URL_LEN), nullable=False)
path = Column(String(PATH_LEN))
port = Column(Integer)
ssl = Column(Boolean(name='ssl'), default=False)
user = Column(String(ID_LEN))
password = Column(String(128))
archive_id = Column(String(ID_LEN), nullable=False)
archive_path = Column(String(255))
archive_ttl = Column(Integer)
# -------------------------------------------------------------------------
[docs] @classmethod
def xml2db(cls, dbsession, stream_elt, error_if_exists=True, kwargs=None):
"""Load an stream description from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type stream_elt: lxml.etree.Element
:param stream_elt:
Stream XML element.
:param bool error_if_exists: (default=True)
It returns an error if stream already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# pylint: disable = unused-argument
# Check if already exists
stream_id = make_id(stream_elt.get('id'), 'token', EMAIL_LEN)
dbstream = dbsession.query(cls).filter_by(stream_id=stream_id).first()
if dbstream is not None:
if error_if_exists:
return _('Stream "${s}" already exists.', {'s': stream_id})
return None
# Create stream
record = cls.record_from_xml(stream_id, stream_elt)
error = cls.record_format(record)
if error:
return error
dbsession.add(cls(**record))
return None
# -------------------------------------------------------------------------
[docs] @classmethod
def record_from_xml(cls, stream_id, stream_elt):
"""Convert a stream XML element into a dictionary.
:param str stream_id:
Stream ID (identifier or mail address).
:type stream_elt: lxml.etree.Element
Stream XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE['namespace']
archive_elt = stream_elt.find('{{{0}}}archive'.format(namespace))
return {
'stream_id': stream_id,
'stream_type': stream_elt.get('type'),
'host': stream_elt.findtext('{{{0}}}host'.format(namespace)),
'path': stream_elt.findtext('{{{0}}}path'.format(namespace)),
'port': stream_elt.findtext('{{{0}}}port'.format(namespace)),
'ssl': stream_elt.findtext(
'{{{0}}}ssl'.format(namespace)) == 'true',
'user': stream_elt.findtext('{{{0}}}user'.format(namespace)),
'password': stream_elt.findtext(
'{{{0}}}password'.format(namespace)),
'archive_id': archive_elt is not None and (
archive_elt.get('warehouse')),
'archive_path': archive_elt is not None and archive_elt.text,
'archive_ttl': archive_elt is not None and archive_elt.get('ttl')}
# -------------------------------------------------------------------------
[docs] @classmethod
def record_format(cls, record):
"""Check and possibly correct a record before inserting it in the
database.
:param dict record:
Dictionary of values to check.
:rtype: ``None`` or :class:`pyramid.i18n.TranslationString`
:return:
``None`` or error message.
"""
for k in [i for i in record if record[i] is None]:
del record[k]
# Stream ID
if not record.get('stream_id'):
return _('Stream ID is missing.')
# Stream type
if not record.get('stream_type'):
return _('Stream type is missing.')
# Path
if record.get('path'):
record['path'] = record['path'].strip()
# Port
if record.get('port'):
record['port'] = int(record['port'])
# Password
password = record.get('password')
if password and not record.get('user'):
del record['password']
elif password and (
not tostr(password).endswith('=') or len(password) < 32):
record['password'] = encrypt(password, 'warehouse')
# Archive
if not record.get('archive_id'):
return _('Archive is missing.')
record['archive_ttl'] = int(record['archive_ttl']) \
if 'archive_ttl' in record else DEFAULT_ARCHIVE_TTL
return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession=None):
"""Serialize a file stream to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
stream_elt = etree.Element('input-stream')
stream_elt.set('id', self.stream_id)
stream_elt.set('type', self.stream_type)
etree.SubElement(stream_elt, 'host').text = self.host
if self.path:
etree.SubElement(stream_elt, 'path').text = self.path
if self.port:
etree.SubElement(stream_elt, 'port').text = str(self.port)
if self.ssl:
etree.SubElement(stream_elt, 'ssl').text = 'true'
if self.user:
etree.SubElement(stream_elt, 'user').text = self.user
if self.password:
etree.SubElement(stream_elt, 'password').text = self.password
elt = etree.SubElement(
stream_elt, 'archive', warehouse=self.archive_id)
if self.archive_ttl is not None \
and self.archive_ttl != DEFAULT_ARCHIVE_TTL:
elt.set('ttl', str(self.archive_ttl))
elt.text = self.archive_path or ''
return stream_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments, directory):
"""Copy from attachments directory the file corresponding to the user.
:param str attachments:
Absolute path to the attachments directory.
:param str directory:
The backup directory.
"""
# =============================================================================
[docs]class DBInputRule(DBDeclarativeClass):
"""SQLAlchemy-powered input rule class."""
suffix = 'cioinp'
__tablename__ = 'wrh_input_rules'
__table_args__ = {'mysql_engine': 'InnoDB'}
rule_id = Column(String(ID_LEN), primary_key=True)
rule_type = Column(String(ID_LEN), nullable=False, default='basic')
priority = Column(Integer, default=0, index=True)
exclusive = Column(Boolean(name='exclusive'), default=False)
warehouse_id = Column(String(2 * ID_LEN), nullable=False)
path = Column(String(PATH_LEN))
flat = Column(Boolean(name='flat'), default=False)
variables = relationship(
'DBInputRuleVariable', order_by='DBInputRuleVariable.order',
cascade='all, delete')
conditions = relationship(
'DBInputRuleCondition', order_by='DBInputRuleCondition.condition_id',
cascade='all, delete')
# -------------------------------------------------------------------------
[docs] @classmethod
def xml2db(cls, dbsession, rule_elt, error_if_exists=True, kwargs=None):
"""Load a user rule from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type rule_elt: lxml.etree.Element
:param rule_elt:
User rule XML element.
:param bool error_if_exists: (default=True)
It returns an error if user rule already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# pylint: disable = unused-argument
# Check if already exists
rule_id = make_id(rule_elt.get('id'), 'token', ID_LEN)
dbrule = dbsession.query(cls).filter_by(rule_id=rule_id).first()
if dbrule is not None:
if error_if_exists:
return _('Rule "${r}" already exists.', {'s': rule_id})
return None
# Create rule
record = cls.record_from_xml(rule_id, rule_elt)
error = cls.record_format(record)
if error:
return error
dbrule = cls(**record)
dbsession.add(dbrule)
# Add variables
dbsession.flush()
for number, elt in enumerate(rule_elt.findall(
'{{{0}}}variable'.format(RELAXNG_CIOWAREHOUSE['namespace']))):
dbrule.variables.append(
DBInputRuleVariable(
name=elt.get('name'), order=number + 1, key=elt.get('key'),
pattern=elt.text.strip() if elt.text is not None else None,
transform=elt.get('transform'), args=elt.get('args')))
# Add conditions
dbsession.flush()
for number, elt in enumerate(rule_elt.findall(
'{{{0}}}condition'.format(RELAXNG_CIOWAREHOUSE['namespace']))):
dbrule.conditions.append(
DBInputRuleCondition(
condition_id=number + 1, key=elt.get('key'),
pattern=elt.text.strip() if elt.text else None))
return None
# -------------------------------------------------------------------------
[docs] @classmethod
def record_from_xml(cls, rule_id, rule_elt):
"""Convert an user rule XML element into a dictionary.
:param str rule_id:
Rule ID.
:type rule_elt: lxml.etree.Element
:param rule_elt:
Rule XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE['namespace']
destination_elt = rule_elt.find('{{{0}}}destination'.format(namespace))
path = destination_elt.text.strip() \
if destination_elt is not None else None
return {
'rule_id': rule_id,
'rule_type': rule_elt.get('type'),
'priority': int(rule_elt.get('priority', '0')),
'exclusive': rule_elt.get('exclusive') == 'true',
'warehouse_id': destination_elt.get('warehouse') if path else None,
'path': path,
'flat': destination_elt.get('flat') == 'true' if path else False}
# -------------------------------------------------------------------------
[docs] @classmethod
def record_format(cls, record):
"""Check and possibly correct a record before inserting it in the
database.
:param dict record:
Dictionary of values to check.
:rtype: ``None`` or :class:`pyramid.i18n.TranslationString`
:return:
``None`` or error message.
"""
for k in [i for i in record if record[i] is None]:
del record[k]
# Rule ID
if not record.get('rule_id'):
return _('Rule ID is missing.')
# Rule type
if not record.get('rule_type'):
record['rule_type'] = 'basic'
# Destination
if not record.get('warehouse_id'):
return _('Rule without warehouse.')
return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession=None):
"""Serialize an user rule to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
rule_elt = etree.Element('input-rule')
rule_elt.set('id', self.rule_id)
if self.rule_type != 'basic':
rule_elt.set('type', self.rule_type)
if self.priority:
rule_elt.set('priority', str(self.priority))
if self.exclusive:
rule_elt.set('exclusive', 'true')
# Variables
for dbvariable in self.variables:
elt = etree.SubElement(rule_elt, 'variable')
elt.set('name', dbvariable.name)
elt.set('key', dbvariable.key)
if dbvariable.transform:
elt.set('transform', dbvariable.transform)
if dbvariable.args:
elt.set('args', dbvariable.args)
if dbvariable.pattern is not None:
elt.text = dbvariable.pattern
# Conditions
for dbcondition in self.conditions:
elt = etree.SubElement(rule_elt, 'condition')
elt.set('key', dbcondition.key)
elt.text = dbcondition.pattern
# Destination
elt = etree.SubElement(
rule_elt, 'destination', warehouse=self.warehouse_id)
elt.text = self.path
if self.flat:
elt.set('flat', 'true')
return rule_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments, directory):
"""Copy from attachments directory the file corresponding to the user.
:param str attachments:
Absolute path to the attachments directory.
:param str directory:
The backup directory.
"""
# =============================================================================
[docs]class DBInputRuleVariable(DBDeclarativeClass):
"""SQLAlchemy-powered input rule variable class."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh_input_rules_variables'
__table_args__ = {'mysql_engine': 'InnoDB'}
rule_id = Column(
String(ID_LEN),
ForeignKey('wrh_input_rules.rule_id', ondelete='CASCADE'),
primary_key=True)
name = Column(String(ID_LEN), primary_key=True)
order = Column(Integer, default=0)
key = Column(String(ID_LEN), nullable=False)
pattern = Column(String(VALUE_LEN))
transform = Column(
Enum(*VARIABLE_TRANSFORM_LABELS.keys(), name='rule_transform'))
args = Column(Text)
# =============================================================================
[docs]class DBInputRuleCondition(DBDeclarativeClass):
"""SQLAlchemy-powered input rule condition class."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh_input_rules_conditions'
__table_args__ = {'mysql_engine': 'InnoDB'}
rule_id = Column(
String(ID_LEN),
ForeignKey('wrh_input_rules.rule_id', ondelete='CASCADE'),
primary_key=True)
condition_id = Column(Integer, primary_key=True)
key = Column(String(VALUE_LEN), nullable=False)
pattern = Column(String(VALUE_LEN))