Source code for ciowarehouse.lib.fileinfo

"""File information panel class."""

from os.path import join, basename
from datetime import datetime

from lxml import etree

from chrysalio.lib.utils import age, size_label
from chrysalio.lib.i18n import translate_field
from chrysalio.lib.panel import Panel
from .i18n import _
from .utils import THUMBNAIL_LARGE, HERE, INFOS_DIR
from .utils import FILE_RIGHTS_INDEX, META_RIGHTS_INDEX
from .utils import mimetype_url, make_file_id
from .utils import infos_load, infos_save, infos_set_metadata
from ..models.dbwarehouse import FILE_RIGHTS_LABELS, META_RIGHTS_LABELS
from ..models.dbsharing import DBSharing, DBSharingFile


# =============================================================================
[docs]class FileInfo(Panel): """Class to manage file information and file information panel. See: :class:`.lib.panel.Panel` """ uid = 'fileinfo' label = _('File information') icon = '/ciowarehouse/images/panel_fileinfo.png' css = ('/ciowarehouse/css/panel_fileinfo.css',) javascripts = ('/ciowarehouse/js/panel_fileinfo.js',) template = 'ciowarehouse:Templates/panel_fileinfo.pt' constants = { 'HERE': HERE, 'THUMBNAIL_LARGE': THUMBNAIL_LARGE, 'join': join, 'mimetype_url': mimetype_url, 'age': age, 'translate_field': translate_field} need_form = True # -------------------------------------------------------------------------
[docs] def prepare_rendering(self, request, warehouse, pfile, reset=False): """Fill the file information panel with informations about ``pfile`` file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :param dict pfile: Item paging for the current file. :param bool reset: (default=False) If ``True``, reset values. ``self.values(request)`` is a dictionary containing informations on the file: * Whoosh index fields: - ``'warehouse_id'`` - ``'directory'`` - ``'file_name'`` - ``'file_id'`` - ``'file_type'`` - ``'file_size'`` - ``'file_date'`` - ``'only_groups'`` - ``'shared'`` - ``'thumbnail'`` * ``'file_age'`` (localized) * ``'file_size_label'`` (localized) * ``'file_rights'`` * ``'file_rights_label'`` (localized) * ``'warehouse_name'`` (localized) * ``'meta_rights'`` * ``'meta_rights_label'`` (localized) * ``'metafields'`` * ``'metadata'`` * ``'sharing_links'`` """ # Check if pfile is None: self.close(request) return if not reset and self.file_id(request) == pfile['file_id']: return self.clear_values(request) # Warehouse if warehouse is None: warehouse = request.registry['modules']['ciowarehouse']\ .warehouse(request, pfile['warehouse_id']) if warehouse is None: self.close(request) return # Details on file translate = request.localizer.translate access = request.registry['modules']['ciowarehouse']\ .warehouse_access(request, warehouse) values = { 'warehouse_id': pfile['warehouse_id'], 'directory': pfile['directory'], 'file_name': pfile['file_name'], 'file_id': pfile['file_id'], 'file_type': pfile['file_type'], 'file_size': pfile['file_size'], 'file_date': pfile['file_date'], 'only_groups': pfile['only_groups'], 'shared': pfile['shared'], 'thumbnail': pfile['thumbnail']} values.update({ 'file_age': translate(age( datetime.fromtimestamp(values['file_date']))), 'file_date': datetime.fromtimestamp( values['file_date']).isoformat(' '), 'file_size_label': translate(size_label( values['file_size'], values['file_type'] == 'directory')), 'file_size': '{0}'.format(values['file_size']) if values[ 'file_type'] != 'directory' else '', 'file_rights': access[FILE_RIGHTS_INDEX], 'file_rights_label': translate(FILE_RIGHTS_LABELS.get( access[FILE_RIGHTS_INDEX], FILE_RIGHTS_LABELS['reader'])), 'warehouse_name': warehouse.label(request)}) # Metadata values.update({ 'meta_rights': access[META_RIGHTS_INDEX], 'meta_rights_label': translate(META_RIGHTS_LABELS[ access[META_RIGHTS_INDEX]]), 'metafields': warehouse.metafields}) values['metadata'] = warehouse.infos_read( join(pfile['directory'], pfile['file_name']), warehouse.metafields.keys(), False, request) values['only_groups'] = tuple(values['metadata']['only_groups']) # Sharing values['sharing_links'] = [ (k.sharing_id, request.route_url('sharing_download', sharing_id=k.sharing_id), k.message or '', k.password is not None, k.expiration, len(k.files)) for k in request.dbsession.query(DBSharing).join( DBSharingFile).filter( DBSharingFile.file_id == pfile['file_id'])] # VCS log values['log'] = warehouse.vcs.log( pfile['directory'], pfile['file_name']) # Handler panel handler = warehouse.get_handler(join( warehouse.root, pfile['directory'], pfile['file_name']))[0] values['handler_panel'] = handler.panel if handler else None self.set_values(request, values)
# -------------------------------------------------------------------------
[docs] def action(self, request, warehouse, paging, form, action): """Execute actions of the file information panel. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :type paging: chrysalio.lib.paging.Paging :param paging: Paging containing all the files. :type form: chrysalio.lib.form.Form :param form: Current form. :param str action: Current action. :rtype: str :return: Return a possibly new action. """ # Open panel if action[:4] in ('nfo+', 'shr+'): self.prepare_rendering( request, warehouse, paging.get_item('file_id', action[4:].partition('-')[0])) self.open(request) # Rename file elif action[:4] == 'ren!': if form.validate(): self._rename_file( request, warehouse, paging, form.values['new_name']) else: action = 'ren?{0}'.format(action[4:]) # Open handler panel elif action[:4] == 'pnl!' and action[4:] in request.registry['panels']: handler_panel = request.registry['panels'][action[4:]] if handler_panel.prepare_rendering( request, warehouse, paging.get_item('file_id', self.file_id(request))): handler_panel.open(request) # Update groups elif action[:4] == 'grp!': form.validate() self._update_groups(request, warehouse, paging, form.values) # Update metadata elif action[:4] == 'mta!': if form.validate(): self._update_metadata(request, warehouse, paging, form.values) else: action = 'mta?{0}'.format(action[4:]) # Remove sharing elif action[:4] == 'shr~': DBSharing.delete(request, action[4:].partition('-')[2], paging) self.prepare_rendering( request, warehouse, paging.get_item('file_id', action[4:].partition('-')[0]), True) return action
# -------------------------------------------------------------------------
[docs] def file_id(self, request): """Return the current file ID. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ return self.value(request, 'file_id')
# -------------------------------------------------------------------------
[docs] def current(self, request, pitem): """Return ``'current'`` if the item is the current one for the file information panel. :type request: pyramid.request.Request :param request: Current request. :param dict pitem: Paging item. :rtype: ``'current'`` or ``None`` """ return 'current' if self.is_open(request) and \ pitem['file_id'] == self.file_id(request) else None
# ------------------------------------------------------------------------- def _rename_file(self, request, warehouse, paging, new_name): """Rename a file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :type paging: chrysalio.lib.paging.Paging :param paging: Current paging. :param str file_id: Digest representation of the file. """ pfile = paging.get_item('file_id', self.file_id(request)) if pfile is None: return # Access values = self.values(request) if (values['file_rights'] or '')[:6] != 'writer': request.session.flash(_( 'You cannot rename a file of "${l}".', {'l': warehouse.label(request)}), 'alert') return # Rename new_name = warehouse.file_normalize( new_name, values['file_type'] == 'directory') error = warehouse.vcs.move( (values['directory'], values['file_name']), (values['directory'], basename(new_name))) if error is not None: request.session.flash(error, 'alert') return # Commit warehouse.vcs.commit( request.localizer.translate(_('Renaming')), request.session['user']['name'], request.session['user']['email']) # Refresh warehouse and clear cache file_list = [ (values['directory'], values['file_name']), (values['directory'], new_name)] + warehouse.directory_file_list( join(values['directory'], new_name)) warehouse.index_update(request.dbsession, file_list, request) request.registry['modules']['ciowarehouse'].cache_clear( request, warehouse.uid) # Update fields values['file_id'] = make_file_id(join( values['warehouse_id'], values['directory'], new_name)) values['file_name'] = new_name pfile['file_id'] = values['file_id'] pfile['file_name'] = new_name request.session['panels'][self.uid]['values'] = values # ------------------------------------------------------------------------- def _update_groups(self, request, warehouse, paging, new_values): """Update authorized groups. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :type paging: chrysalio.lib.paging.Paging :param paging: Current paging. :param dict new_values: New values. """ pfile = paging.get_item('file_id', self.file_id(request)) if pfile is None: return # Access if not request.registry['modules']['ciowarehouse'].warehouse_admin( request, warehouse): request.session.flash(_( 'You cannot update authorized groups of a file of "${l}".', {'l': warehouse.label(request)}), 'alert') return # Load or create XML file values = self.values(request) infos_file = join( warehouse.root, INFOS_DIR, values['directory'], '{0}.xml'.format(values['file_name'])) root_elt = infos_load(infos_file, remove_groups=True) # Update group list groups_elt = etree.SubElement(root_elt[0], 'groups') only_groups = [] for group_id in new_values: if new_values[group_id]: only_groups.append(group_id) etree.SubElement(groups_elt, 'group').text = group_id values['only_groups'] = tuple(only_groups) request.session['panels'][self.uid]['values'] = values # Save XML if not infos_save(root_elt, infos_file, request): return # pragma: nocover self._commit_and_index( request, warehouse, values, _('Authorized groups updated')) # ------------------------------------------------------------------------- def _update_metadata(self, request, warehouse, paging, new_values): """Update metadata. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :type paging: chrysalio.lib.paging.Paging :param paging: Current paging. :param dict new_values: New values. """ pfile = paging.get_item('file_id', self.file_id(request)) if pfile is None: return # Access values = self.values(request) if values['meta_rights'] != 'writer': request.session.flash(_( 'You cannot update metadata of a file of "${l}".', {'l': warehouse.label(request)}), 'alert') return # Load or create XML file infos_file = join( warehouse.root, INFOS_DIR, values['directory'], '{0}.xml'.format(values['file_name'])) root_elt = infos_load(infos_file, remove_metafields=True) # Update metadata fields metafields = [] for meta_id, meta in self.value(request, 'metafields').items(): if new_values.get(meta_id) is not None: if isinstance(new_values[meta_id], datetime): new_values[meta_id] = new_values[meta_id].replace( microsecond=0, tzinfo=None) metafields.append((meta_id, meta['type'], new_values[meta_id])) values['metadata'][meta_id] = new_values[meta_id] pfile[meta_id] = values['metadata'][meta_id] else: if meta_id in values['metadata']: del values['metadata'][meta_id] if meta_id in pfile: del pfile[meta_id] infos_set_metadata(root_elt, metafields) request.session['panels'][self.uid]['values'] = values # Save XML if not infos_save(root_elt, infos_file, request): return # pragma: nocover self._commit_and_index( request, warehouse, values, _('Metadata updated')) # ------------------------------------------------------------------------- @classmethod def _commit_and_index(cls, request, warehouse, values, message): """Update commit the change and update the index. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :param dict values: Panel values. :param str message: Commit message """ # Commit change warehouse.vcs.add() warehouse.vcs.commit( request.localizer.translate(message), request.session['user']['name'], request.session['user']['email']) # Update index and clear cache warehouse.index_update( request.dbsession, ((values['directory'], values['file_name']),), request=request, force=True) request.registry['modules']['ciowarehouse'].cache_clear( request, warehouse.uid)