Source code for ciowarehouse.models.dbinput

"""SQLAlchemy-powered model definitions for input streams (file, FTP or email)
and input rules."""

from sqlalchemy import Column, ForeignKey, Enum, String, Integer, Boolean, Text
from sqlalchemy.orm import relationship
from lxml import etree

from chrysalio.lib.utils import make_id, tostr, encrypt
from chrysalio.models import ID_LEN, EMAIL_LEN, VALUE_LEN, DBDeclarativeClass
from . import URL_LEN, PATH_LEN
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..lib.i18n import _

VARIABLE_TRANSFORM_LABELS = {
    'capitalize': _('Capitalize'), 'lower': _('Lower'), 'upper': _('Upper'),
    'replace': _('Replace')}
DEFAULT_ARCHIVE_TTL = 720  # in hours = 30 days


# =============================================================================
[docs]class DBInputStream(DBDeclarativeClass): """SQLAlchemy-powered stream class.""" suffix = 'cioinp' __tablename__ = 'wrh_input_streams' __table_args__ = {'mysql_engine': 'InnoDB'} stream_id = Column(String(EMAIL_LEN), primary_key=True) stream_type = Column(String(ID_LEN), nullable=False) host = Column(String(URL_LEN), nullable=False) path = Column(String(PATH_LEN)) port = Column(Integer) ssl = Column(Boolean(name='ssl'), default=False) user = Column(String(ID_LEN)) password = Column(String(128)) archive_id = Column(String(ID_LEN), nullable=False) archive_path = Column(String(255)) archive_ttl = Column(Integer) # -------------------------------------------------------------------------
[docs] @classmethod def xml2db(cls, dbsession, stream_elt, error_if_exists=True, kwargs=None): """Load an stream description from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type stream_elt: lxml.etree.Element :param stream_elt: Stream XML element. :param bool error_if_exists: (default=True) It returns an error if stream already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # pylint: disable = unused-argument # Check if already exists stream_id = make_id(stream_elt.get('id'), 'token', EMAIL_LEN) dbstream = dbsession.query(cls).filter_by(stream_id=stream_id).first() if dbstream is not None: if error_if_exists: return _('Stream "${s}" already exists.', {'s': stream_id}) return None # Create stream record = cls.record_from_xml(stream_id, stream_elt) error = cls.record_format(record) if error: return error dbsession.add(cls(**record)) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, stream_id, stream_elt): """Convert a stream XML element into a dictionary. :param str stream_id: Stream ID (identifier or mail address). :type stream_elt: lxml.etree.Element Stream XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE['namespace'] archive_elt = stream_elt.find('{{{0}}}archive'.format(namespace)) return { 'stream_id': stream_id, 'stream_type': stream_elt.get('type'), 'host': stream_elt.findtext('{{{0}}}host'.format(namespace)), 'path': stream_elt.findtext('{{{0}}}path'.format(namespace)), 'port': stream_elt.findtext('{{{0}}}port'.format(namespace)), 'ssl': stream_elt.findtext( '{{{0}}}ssl'.format(namespace)) == 'true', 'user': stream_elt.findtext('{{{0}}}user'.format(namespace)), 'password': stream_elt.findtext( '{{{0}}}password'.format(namespace)), 'archive_id': archive_elt is not None and ( archive_elt.get('warehouse')), 'archive_path': archive_elt is not None and archive_elt.text, 'archive_ttl': archive_elt is not None and archive_elt.get('ttl')}
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record): """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Stream ID if not record.get('stream_id'): return _('Stream ID is missing.') # Stream type if not record.get('stream_type'): return _('Stream type is missing.') # Path if record.get('path'): record['path'] = record['path'].strip() # Port if record.get('port'): record['port'] = int(record['port']) # Password password = record.get('password') if password and not record.get('user'): del record['password'] elif password and ( not tostr(password).endswith('=') or len(password) < 32): record['password'] = encrypt(password, 'warehouse') # Archive if not record.get('archive_id'): return _('Archive is missing.') record['archive_ttl'] = int(record['archive_ttl']) \ if 'archive_ttl' in record else DEFAULT_ARCHIVE_TTL return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession=None): """Serialize a file stream to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument stream_elt = etree.Element('input-stream') stream_elt.set('id', self.stream_id) stream_elt.set('type', self.stream_type) etree.SubElement(stream_elt, 'host').text = self.host if self.path: etree.SubElement(stream_elt, 'path').text = self.path if self.port: etree.SubElement(stream_elt, 'port').text = str(self.port) if self.ssl: etree.SubElement(stream_elt, 'ssl').text = 'true' if self.user: etree.SubElement(stream_elt, 'user').text = self.user if self.password: etree.SubElement(stream_elt, 'password').text = self.password elt = etree.SubElement( stream_elt, 'archive', warehouse=self.archive_id) if self.archive_ttl is not None \ and self.archive_ttl != DEFAULT_ARCHIVE_TTL: elt.set('ttl', str(self.archive_ttl)) elt.text = self.archive_path or '' return stream_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments, directory): """Copy from attachments directory the file corresponding to the user. :param str attachments: Absolute path to the attachments directory. :param str directory: The backup directory. """
# =============================================================================
[docs]class DBInputRule(DBDeclarativeClass): """SQLAlchemy-powered input rule class.""" suffix = 'cioinp' __tablename__ = 'wrh_input_rules' __table_args__ = {'mysql_engine': 'InnoDB'} rule_id = Column(String(ID_LEN), primary_key=True) rule_type = Column(String(ID_LEN), nullable=False, default='basic') priority = Column(Integer, default=0, index=True) exclusive = Column(Boolean(name='exclusive'), default=False) warehouse_id = Column(String(2 * ID_LEN), nullable=False) path = Column(String(PATH_LEN)) flat = Column(Boolean(name='flat'), default=False) variables = relationship( 'DBInputRuleVariable', order_by='DBInputRuleVariable.order', cascade='all, delete') conditions = relationship( 'DBInputRuleCondition', order_by='DBInputRuleCondition.condition_id', cascade='all, delete') # -------------------------------------------------------------------------
[docs] @classmethod def xml2db(cls, dbsession, rule_elt, error_if_exists=True, kwargs=None): """Load a user rule from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type rule_elt: lxml.etree.Element :param rule_elt: User rule XML element. :param bool error_if_exists: (default=True) It returns an error if user rule already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # pylint: disable = unused-argument # Check if already exists rule_id = make_id(rule_elt.get('id'), 'token', ID_LEN) dbrule = dbsession.query(cls).filter_by(rule_id=rule_id).first() if dbrule is not None: if error_if_exists: return _('Rule "${r}" already exists.', {'s': rule_id}) return None # Create rule record = cls.record_from_xml(rule_id, rule_elt) error = cls.record_format(record) if error: return error dbrule = cls(**record) dbsession.add(dbrule) # Add variables dbsession.flush() for number, elt in enumerate(rule_elt.findall( '{{{0}}}variable'.format(RELAXNG_CIOWAREHOUSE['namespace']))): dbrule.variables.append( DBInputRuleVariable( name=elt.get('name'), order=number + 1, key=elt.get('key'), pattern=elt.text.strip() if elt.text is not None else None, transform=elt.get('transform'), args=elt.get('args'))) # Add conditions dbsession.flush() for number, elt in enumerate(rule_elt.findall( '{{{0}}}condition'.format(RELAXNG_CIOWAREHOUSE['namespace']))): dbrule.conditions.append( DBInputRuleCondition( condition_id=number + 1, key=elt.get('key'), pattern=elt.text.strip() if elt.text else None)) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, rule_id, rule_elt): """Convert an user rule XML element into a dictionary. :param str rule_id: Rule ID. :type rule_elt: lxml.etree.Element :param rule_elt: Rule XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE['namespace'] destination_elt = rule_elt.find('{{{0}}}destination'.format(namespace)) path = destination_elt.text.strip() \ if destination_elt is not None else None return { 'rule_id': rule_id, 'rule_type': rule_elt.get('type'), 'priority': int(rule_elt.get('priority', '0')), 'exclusive': rule_elt.get('exclusive') == 'true', 'warehouse_id': destination_elt.get('warehouse') if path else None, 'path': path, 'flat': destination_elt.get('flat') == 'true' if path else False}
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record): """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Rule ID if not record.get('rule_id'): return _('Rule ID is missing.') # Rule type if not record.get('rule_type'): record['rule_type'] = 'basic' # Destination if not record.get('warehouse_id'): return _('Rule without warehouse.') return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession=None): """Serialize an user rule to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument rule_elt = etree.Element('input-rule') rule_elt.set('id', self.rule_id) if self.rule_type != 'basic': rule_elt.set('type', self.rule_type) if self.priority: rule_elt.set('priority', str(self.priority)) if self.exclusive: rule_elt.set('exclusive', 'true') # Variables for dbvariable in self.variables: elt = etree.SubElement(rule_elt, 'variable') elt.set('name', dbvariable.name) elt.set('key', dbvariable.key) if dbvariable.transform: elt.set('transform', dbvariable.transform) if dbvariable.args: elt.set('args', dbvariable.args) if dbvariable.pattern is not None: elt.text = dbvariable.pattern # Conditions for dbcondition in self.conditions: elt = etree.SubElement(rule_elt, 'condition') elt.set('key', dbcondition.key) elt.text = dbcondition.pattern # Destination elt = etree.SubElement( rule_elt, 'destination', warehouse=self.warehouse_id) elt.text = self.path if self.flat: elt.set('flat', 'true') return rule_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments, directory): """Copy from attachments directory the file corresponding to the user. :param str attachments: Absolute path to the attachments directory. :param str directory: The backup directory. """
# =============================================================================
[docs]class DBInputRuleVariable(DBDeclarativeClass): """SQLAlchemy-powered input rule variable class.""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_input_rules_variables' __table_args__ = {'mysql_engine': 'InnoDB'} rule_id = Column( String(ID_LEN), ForeignKey('wrh_input_rules.rule_id', ondelete='CASCADE'), primary_key=True) name = Column(String(ID_LEN), primary_key=True) order = Column(Integer, default=0) key = Column(String(ID_LEN), nullable=False) pattern = Column(String(VALUE_LEN)) transform = Column( Enum(*VARIABLE_TRANSFORM_LABELS.keys(), name='rule_transform')) args = Column(Text)
# =============================================================================
[docs]class DBInputRuleCondition(DBDeclarativeClass): """SQLAlchemy-powered input rule condition class.""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_input_rules_conditions' __table_args__ = {'mysql_engine': 'InnoDB'} rule_id = Column( String(ID_LEN), ForeignKey('wrh_input_rules.rule_id', ondelete='CASCADE'), primary_key=True) condition_id = Column(Integer, primary_key=True) key = Column(String(VALUE_LEN), nullable=False) pattern = Column(String(VALUE_LEN))