"""SQLAlchemy-powered model definitions for shared files."""
from os.path import join, normpath, dirname, basename
from datetime import datetime, date
from time import time
from bcrypt import hashpw, gensalt
from lxml import etree
from sqlalchemy import Column, ForeignKey, String, Date, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.schema import Index
from chrysalio.lib.utils import make_id
from chrysalio.models import DBDeclarativeClass, ID_LEN
from chrysalio.models.dbbase import DBBaseClass
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..lib.i18n import _
from ..lib.utils import make_file_id
SHARING_CHECK_PERIOD = 86400
# =============================================================================
[docs]class DBSharing(DBDeclarativeClass, DBBaseClass):
"""SQLAlchemy-powered sharing class."""
suffix = 'cioshr'
__tablename__ = 'wrh_sharings'
__table_args__ = {'mysql_engine': 'InnoDB'}
__mapper_args__ = {'confirm_deleted_rows': False}
sharing_id = Column(String(48), primary_key=True)
message = Column(String(255))
password = Column(String(64))
expiration = Column(Date)
creation = Column(DateTime, default=datetime.now)
files = relationship('DBSharingFile', cascade='all, delete')
# -------------------------------------------------------------------------
[docs] def set_password(self, password):
"""Set the password, possibly hashing it.
:param str password:
Password to set. If it does not begin with ``$``, we use bcrypt
algorithm before setting.
"""
if not password:
return
if not password.startswith('$'):
self.password = hashpw(
password.encode('utf8'), gensalt()).decode('utf8')
else:
self.password = password
# -------------------------------------------------------------------------
[docs] def check_password(self, password):
"""Check the validy of the given password.
:param str password:
Clear password to check.
:rtype: bool
"""
if password and self.password is not None:
expected = self.password.encode('utf8')
return expected == hashpw(password.encode('utf8'), expected)
return not bool(self.password)
# -------------------------------------------------------------------------
[docs] @classmethod
def xml2db(cls, dbsession, sharing_elt, error_if_exists=True, kwargs=None):
"""Load a shared file from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type sharing_elt: lxml.etree.Element
:param sharing_elt:
Sharing XML element.
:param bool error_if_exists: (default=True)
It returns an error if sharing already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# Check if already exists
sharing_id = make_id(sharing_elt.get('id'), 'token', 48)
dbsharing = dbsession.query(cls).filter_by(
sharing_id=sharing_id).first()
if dbsharing is not None:
if error_if_exists:
return _('Sharing "${i}" already exists.', {'i': sharing_id})
return None
# Create sharing
record = cls.record_from_xml(sharing_id, sharing_elt)
cls.record_format(record)
dbsharing = cls(**record)
# Load files
found = False
namespace = RELAXNG_CIOWAREHOUSE['namespace']
warehouse_ids = kwargs.get('warehouse_ids', ()) if kwargs else ()
done = set()
for elt in sharing_elt.xpath(
'ns0:file', namespaces={'ns0': namespace}):
warehouse_id = elt.get('warehouse')
if warehouse_id not in warehouse_ids:
continue
path = elt.text.strip()
wpath = normpath(join(warehouse_id, path))
if wpath not in done and wpath.startswith(warehouse_id):
done.add(wpath)
dbsharing.files.append(DBSharingFile(
file_id=make_file_id(wpath),
warehouse_id=warehouse_id,
directory=dirname(path) or '.',
file_name=basename(path)))
found = True
if found:
dbsession.add(dbsharing)
return None
# -------------------------------------------------------------------------
[docs] @classmethod
def record_from_xml(cls, sharing_id, sharing_elt):
"""Convert a sharing XML element into a dictionary.
:param str sharing_id:
Sharing ID.
:type sharing_elt: lxml.etree.Element
:param sharing_elt:
Sharing XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE['namespace']
return {
'sharing_id': sharing_id,
'creation': sharing_elt.get('created'),
'message': sharing_elt.findtext(
'{{{0}}}message'.format(namespace)),
'password': sharing_elt.findtext(
'{{{0}}}password'.format(namespace)),
'expiration': sharing_elt.findtext(
'{{{0}}}expiration'.format(namespace))}
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession=None):
"""Serialize a sharing to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
if not self.files:
return None
sharing_elt = etree.Element('sharing')
sharing_elt.set('id', self.sharing_id)
sharing_elt.set('created', self.creation.isoformat().partition('.')[0])
if self.message:
etree.SubElement(sharing_elt, 'message').text = self.message
if self.password:
etree.SubElement(sharing_elt, 'password').text = self.password
if self.expiration:
etree.SubElement(sharing_elt, 'expiration').text = \
self.expiration.isoformat()
for dbfile in self.files:
etree.SubElement(
sharing_elt, 'file', warehouse=dbfile.warehouse_id).text = \
normpath(join(dbfile.directory, dbfile.file_name))
return sharing_elt
# -------------------------------------------------------------------------
[docs] @classmethod
def delete(cls, request, sharing_id, paging=None):
"""Delete a sharing.
:type request: pyramid.request.Request
:param request:
Current request.
:param str sharing_id:
ID of the sharing to delete.
:type paging: chrysalio.lib.paging.Paging
:param paging: (optional)
Paging containing displayed files.
"""
dbsharing = request.dbsession.query(cls).filter_by(
sharing_id=sharing_id).first()
if dbsharing is None:
return
# Find files which lose their shared flag
files = {}
file_ids = []
for dbfile in dbsharing.files:
if request.dbsession.query(DBSharingFile).filter_by(
file_id=dbfile.file_id).count() > 1:
continue
file_ids.append(dbfile.file_id)
if dbfile.warehouse_id not in files:
files[dbfile.warehouse_id] = [
(dbfile.directory, dbfile.file_name)]
else:
files[dbfile.warehouse_id].append(
(dbfile.directory, dbfile.file_name))
request.dbsession.delete(dbsharing)
# Remove shared flag from paging
for pitem in paging or ():
if pitem['file_id'] in file_ids:
pitem['shared'] = False
# Update index
for warehouse_id, file_location in files.items():
warehouse = request.registry['modules']['ciowarehouse']\
.warehouse(request, warehouse_id)
if warehouse is not None:
warehouse.index_update(
request.dbsession, file_location, request)
# -------------------------------------------------------------------------
[docs] @classmethod
def purge_expired(cls, request, dbsession):
"""Purge expired sharings.
:type request: pyramid.request.Request
:param request:
Current request.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
if 'sharings' in request.registry and \
request.registry['sharings'] > time():
return
request.registry['sharings'] = time() + SHARING_CHECK_PERIOD
today = date.today()
for sharing_id in dbsession.query(DBSharing.sharing_id).filter(
cls.expiration < today):
cls.delete(request, sharing_id[0])
# =============================================================================
[docs]class DBSharingFile(DBDeclarativeClass):
"""Class to link sharing token with their files (one-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh_sharings_files'
__table_args__ = (
Index('warehouse_id', 'file_id'), {'mysql_engine': 'InnoDB'})
sharing_id = Column(
String(48), ForeignKey('wrh_sharings.sharing_id', ondelete='CASCADE'),
primary_key=True)
file_id = Column(String(48), primary_key=True)
warehouse_id = Column(String(ID_LEN))
directory = Column(String(255))
file_name = Column(String(128))