Source code for ciowarehouse.models.populate

"""Function to import and export database from and into XML files."""

from lxml import etree
from sqlalchemy import desc

from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from chrysalio.models.populate import element2db
from cioservice.models.dbjob import DBJob
from ..relaxng import RELAXNG_CIOWAREHOUSE
from .dbmetafield import DBMetafield
from .dbindexfield import INDEXFIELD_BUILTIN, DBIndexfield
from .dbhandler import DBHandler
from .dbwarehouse import DBWarehouse
from .dbsharing import DBSharing
from .dbinput import DBInputStream, DBInputRule


# =============================================================================
[docs]def xml2db(dbsession, root_elt, only=None, error_if_exists=True, modules=None): """Load an XML configuration file for an included module. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type root_elt: lxml.etree.Element :param root_elt: XML element with the namespace of the module. :param str only: (optional) If not ``None``, only the items of type ``only`` are loaded. :param bool error_if_exists: (default=True) It returns an error if an item already exists. :type modules: collections.OrderedDict :param modules: (optional) Dictionary of modules to use to complete the loading. :rtype: list :return: A list of error messages. """ # pylint: disable = unused-argument # Detail fields errors = element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'metafield', 'class': DBMetafield, 'relaxng': RELAXNG_CIOWAREHOUSE}) metarefs = [k[0] for k in dbsession.query(DBMetafield.metafield_id)] # Index fields errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'indexfield', 'class': DBIndexfield, 'relaxng': RELAXNG_CIOWAREHOUSE}) indexrefs = [(k[0], k[1]) for k in dbsession.query( DBIndexfield.indexfield_id, DBIndexfield.stored)] # Handlers errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'handler', 'class': DBHandler, 'relaxng': RELAXNG_CIOWAREHOUSE, 'kwargs': {'indexrefs': [k[0] for k in indexrefs]}}) # Warehouses users = dict(dbsession.query(DBUser.login, DBUser.user_id)) groups = [k[0] for k in dbsession.query(DBGroup.group_id)] jobs = [k[0] for k in dbsession.query(DBJob.job_id)] errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'warehouse', 'class': DBWarehouse, 'relaxng': RELAXNG_CIOWAREHOUSE, 'kwargs': { 'metarefs': metarefs, 'indexrefs': [k[0] for k in indexrefs if k[1] or k[0] in INDEXFIELD_BUILTIN], 'users': users, 'groups': groups, 'jobs': jobs}}) warehouse_ids = [k[0] for k in dbsession.query(DBWarehouse.warehouse_id)] # Sharings errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'sharing', 'class': DBSharing, 'relaxng': RELAXNG_CIOWAREHOUSE, 'kwargs': {'warehouse_ids': warehouse_ids}}) # Inputs errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'input-stream', 'class': DBInputStream, 'relaxng': RELAXNG_CIOWAREHOUSE}) errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'input-rule', 'class': DBInputRule, 'relaxng': RELAXNG_CIOWAREHOUSE}) return [k for k in errors if k is not None]
# =============================================================================
[docs]def db2xml(dbsession, root_elt): """Fill ``root_elt`` with the XML configuration of the module. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type root_elt: lxml.etree.Element :param root_elt: XML element with the namespace of the module. """ # pylint: disable = too-many-branches # Detail fields dbmetafields = dbsession.query(DBMetafield).order_by('position').all() if dbmetafields: root_elt.append(etree.Comment('{0:.^64}'.format('metafields'))) group_elt = etree.SubElement(root_elt, 'metafields') for dbitem in dbmetafields: group_elt.append(dbitem.db2xml()) # Index fields dbindexfields = dbsession.query(DBIndexfield).order_by('in_filter').all() if dbindexfields: root_elt.append(etree.Comment('{0:.^64}'.format('indexfields'))) group_elt = etree.SubElement(root_elt, 'indexfields') for dbitem in dbindexfields: group_elt.append(dbitem.db2xml()) # File handlers dbhandlers = dbsession.query(DBHandler).all() if dbhandlers: root_elt.append(etree.Comment('{0:.^64}'.format('handlers'))) group_elt = etree.SubElement(root_elt, 'handlers') for dbitem in dbhandlers: group_elt.append(dbitem.db2xml()) # Warehouses dbwarehouses = dbsession.query(DBWarehouse).order_by('warehouse_id').all() if dbwarehouses: root_elt.append(etree.Comment('{0:.^64}'.format('warehouses'))) group_elt = etree.SubElement(root_elt, 'warehouses') for dbitem in dbwarehouses: group_elt.append(dbitem.db2xml(dbsession)) # Sharings dbsharings = dbsession.query(DBSharing).order_by('sharing_id').all() if dbsharings: root_elt.append(etree.Comment('{0:.^64}'.format('sharings'))) group_elt = etree.SubElement(root_elt, 'sharings') for dbitem in dbsharings: group_elt.append(dbitem.db2xml()) # Inputs dbinputs = dbsession.query(DBInputStream).order_by('stream_id').all() if dbinputs: root_elt.append(etree.Comment('{0:.^64}'.format('input-streams'))) group_elt = etree.SubElement(root_elt, 'input-streams') for dbitem in dbinputs: group_elt.append(dbitem.db2xml()) dbinputs = dbsession.query(DBInputRule).order_by(desc('priority')).all() if dbinputs: root_elt.append(etree.Comment('{0:.^64}'.format('input-rules'))) group_elt = etree.SubElement(root_elt, 'input-rules') for dbitem in dbinputs: group_elt.append(dbitem.db2xml())