Source code for ciowarehouse.models.dbsharing

"""SQLAlchemy-powered model definitions for shared files."""

from os.path import join, normpath, dirname, basename
from datetime import datetime, date
from time import time
from bcrypt import hashpw, gensalt

from lxml import etree
from sqlalchemy import Column, ForeignKey, String, Date, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.schema import Index

from chrysalio.lib.utils import make_id
from chrysalio.models import DBDeclarativeClass, ID_LEN
from chrysalio.models.dbbase import DBBaseClass
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..lib.i18n import _
from ..lib.utils import make_file_id


SHARING_CHECK_PERIOD = 86400


# =============================================================================
[docs]class DBSharing(DBDeclarativeClass, DBBaseClass): """SQLAlchemy-powered sharing class.""" suffix = 'cioshr' __tablename__ = 'wrh_sharings' __table_args__ = {'mysql_engine': 'InnoDB'} __mapper_args__ = {'confirm_deleted_rows': False} sharing_id = Column(String(48), primary_key=True) message = Column(String(255)) password = Column(String(64)) expiration = Column(Date) creation = Column(DateTime, default=datetime.now) files = relationship('DBSharingFile', cascade='all, delete') # -------------------------------------------------------------------------
[docs] def set_password(self, password): """Set the password, possibly hashing it. :param str password: Password to set. If it does not begin with ``$``, we use bcrypt algorithm before setting. """ if not password: return if not password.startswith('$'): self.password = hashpw( password.encode('utf8'), gensalt()).decode('utf8') else: self.password = password
# -------------------------------------------------------------------------
[docs] def check_password(self, password): """Check the validy of the given password. :param str password: Clear password to check. :rtype: bool """ if password and self.password is not None: expected = self.password.encode('utf8') return expected == hashpw(password.encode('utf8'), expected) return not bool(self.password)
# -------------------------------------------------------------------------
[docs] @classmethod def xml2db(cls, dbsession, sharing_elt, error_if_exists=True, kwargs=None): """Load a shared file from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type sharing_elt: lxml.etree.Element :param sharing_elt: Sharing XML element. :param bool error_if_exists: (default=True) It returns an error if sharing already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # Check if already exists sharing_id = make_id(sharing_elt.get('id'), 'token', 48) dbsharing = dbsession.query(cls).filter_by( sharing_id=sharing_id).first() if dbsharing is not None: if error_if_exists: return _('Sharing "${i}" already exists.', {'i': sharing_id}) return None # Create sharing record = cls.record_from_xml(sharing_id, sharing_elt) cls.record_format(record) dbsharing = cls(**record) # Load files found = False namespace = RELAXNG_CIOWAREHOUSE['namespace'] warehouse_ids = kwargs.get('warehouse_ids', ()) if kwargs else () done = set() for elt in sharing_elt.xpath( 'ns0:file', namespaces={'ns0': namespace}): warehouse_id = elt.get('warehouse') if warehouse_id not in warehouse_ids: continue path = elt.text.strip() wpath = normpath(join(warehouse_id, path)) if wpath not in done and wpath.startswith(warehouse_id): done.add(wpath) dbsharing.files.append(DBSharingFile( file_id=make_file_id(wpath), warehouse_id=warehouse_id, directory=dirname(path) or '.', file_name=basename(path))) found = True if found: dbsession.add(dbsharing) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, sharing_id, sharing_elt): """Convert a sharing XML element into a dictionary. :param str sharing_id: Sharing ID. :type sharing_elt: lxml.etree.Element :param sharing_elt: Sharing XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE['namespace'] return { 'sharing_id': sharing_id, 'creation': sharing_elt.get('created'), 'message': sharing_elt.findtext( '{{{0}}}message'.format(namespace)), 'password': sharing_elt.findtext( '{{{0}}}password'.format(namespace)), 'expiration': sharing_elt.findtext( '{{{0}}}expiration'.format(namespace))}
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record): """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Detail field ID if not record.get('sharing_id'): return _('Sharing without ID.') # Password password = record.get('password') if password and not password.startswith('$'): record['password'] = hashpw( password.encode('utf8'), gensalt()).decode('utf8') # Creation if 'creation' in record and \ not isinstance(record['creation'], datetime): record['creation'] = datetime.strptime( record['creation'], '%Y-%m-%dT%H:%M:%S') # Expiration if 'expiration' in record and \ not isinstance(record['expiration'], date): record['expiration'] = datetime.strptime( record['expiration'], '%Y-%m-%d').date() return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession=None): """Serialize a sharing to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument if not self.files: return None sharing_elt = etree.Element('sharing') sharing_elt.set('id', self.sharing_id) sharing_elt.set('created', self.creation.isoformat().partition('.')[0]) if self.message: etree.SubElement(sharing_elt, 'message').text = self.message if self.password: etree.SubElement(sharing_elt, 'password').text = self.password if self.expiration: etree.SubElement(sharing_elt, 'expiration').text = \ self.expiration.isoformat() for dbfile in self.files: etree.SubElement( sharing_elt, 'file', warehouse=dbfile.warehouse_id).text = \ normpath(join(dbfile.directory, dbfile.file_name)) return sharing_elt
# -------------------------------------------------------------------------
[docs] @classmethod def delete(cls, request, sharing_id, paging=None): """Delete a sharing. :type request: pyramid.request.Request :param request: Current request. :param str sharing_id: ID of the sharing to delete. :type paging: chrysalio.lib.paging.Paging :param paging: (optional) Paging containing displayed files. """ dbsharing = request.dbsession.query(cls).filter_by( sharing_id=sharing_id).first() if dbsharing is None: return # Find files which lose their shared flag files = {} file_ids = [] for dbfile in dbsharing.files: if request.dbsession.query(DBSharingFile).filter_by( file_id=dbfile.file_id).count() > 1: continue file_ids.append(dbfile.file_id) if dbfile.warehouse_id not in files: files[dbfile.warehouse_id] = [ (dbfile.directory, dbfile.file_name)] else: files[dbfile.warehouse_id].append( (dbfile.directory, dbfile.file_name)) request.dbsession.delete(dbsharing) # Remove shared flag from paging for pitem in paging or (): if pitem['file_id'] in file_ids: pitem['shared'] = False # Update index for warehouse_id, file_location in files.items(): warehouse = request.registry['modules']['ciowarehouse']\ .warehouse(request, warehouse_id) if warehouse is not None: warehouse.index_update( request.dbsession, file_location, request)
# -------------------------------------------------------------------------
[docs] @classmethod def purge_expired(cls, request, dbsession): """Purge expired sharings. :type request: pyramid.request.Request :param request: Current request. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ if 'sharings' in request.registry and \ request.registry['sharings'] > time(): return request.registry['sharings'] = time() + SHARING_CHECK_PERIOD today = date.today() for sharing_id in dbsession.query(DBSharing.sharing_id).filter( cls.expiration < today): cls.delete(request, sharing_id[0])
# =============================================================================
[docs]class DBSharingFile(DBDeclarativeClass): """Class to link sharing token with their files (one-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh_sharings_files' __table_args__ = ( Index('warehouse_id', 'file_id'), {'mysql_engine': 'InnoDB'}) sharing_id = Column( String(48), ForeignKey('wrh_sharings.sharing_id', ondelete='CASCADE'), primary_key=True) file_id = Column(String(48), primary_key=True) warehouse_id = Column(String(ID_LEN)) directory = Column(String(255)) file_name = Column(String(128))