"""File information panel class."""
from os.path import join, basename
from datetime import datetime
from lxml import etree
from chrysalio.lib.utils import age, size_label
from chrysalio.lib.i18n import translate_field
from chrysalio.lib.panel import Panel
from .i18n import _
from .utils import THUMBNAIL_LARGE, HERE, INFOS_DIR
from .utils import FILE_RIGHTS_INDEX, META_RIGHTS_INDEX
from .utils import mimetype_url, make_file_id
from .utils import infos_load, infos_save, infos_set_metadata
from ..models.dbwarehouse import FILE_RIGHTS_LABELS, META_RIGHTS_LABELS
from ..models.dbsharing import DBSharing, DBSharingFile
# =============================================================================
[docs]class FileInfo(Panel):
"""Class to manage file information and file information panel.
See: :class:`.lib.panel.Panel`
"""
uid = 'fileinfo'
label = _('File information')
icon = '/ciowarehouse/images/panel_fileinfo.png'
css = ('/ciowarehouse/css/panel_fileinfo.css',)
javascripts = ('/ciowarehouse/js/panel_fileinfo.js',)
template = 'ciowarehouse:Templates/panel_fileinfo.pt'
constants = {
'HERE': HERE, 'THUMBNAIL_LARGE': THUMBNAIL_LARGE,
'join': join, 'mimetype_url': mimetype_url, 'age': age,
'translate_field': translate_field}
need_form = True
# -------------------------------------------------------------------------
[docs] def prepare_rendering(self, request, warehouse, pfile, reset=False):
"""Fill the file information panel with informations about ``pfile``
file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:param dict pfile:
Item paging for the current file.
:param bool reset: (default=False)
If ``True``, reset values.
``self.values(request)`` is a dictionary containing informations on the
file:
* Whoosh index fields:
- ``'warehouse_id'``
- ``'directory'``
- ``'file_name'``
- ``'file_id'``
- ``'file_type'``
- ``'file_size'``
- ``'file_date'``
- ``'only_groups'``
- ``'shared'``
- ``'thumbnail'``
* ``'file_age'`` (localized)
* ``'file_size_label'`` (localized)
* ``'file_rights'``
* ``'file_rights_label'`` (localized)
* ``'warehouse_name'`` (localized)
* ``'meta_rights'``
* ``'meta_rights_label'`` (localized)
* ``'metafields'``
* ``'metadata'``
* ``'sharing_links'``
"""
# Check
if pfile is None:
self.close(request)
return
if not reset and self.file_id(request) == pfile['file_id']:
return
self.clear_values(request)
# Warehouse
if warehouse is None:
warehouse = request.registry['modules']['ciowarehouse']\
.warehouse(request, pfile['warehouse_id'])
if warehouse is None:
self.close(request)
return
# Details on file
translate = request.localizer.translate
access = request.registry['modules']['ciowarehouse']\
.warehouse_access(request, warehouse)
values = {
'warehouse_id': pfile['warehouse_id'],
'directory': pfile['directory'],
'file_name': pfile['file_name'],
'file_id': pfile['file_id'],
'file_type': pfile['file_type'],
'file_size': pfile['file_size'],
'file_date': pfile['file_date'],
'only_groups': pfile['only_groups'],
'shared': pfile['shared'],
'thumbnail': pfile['thumbnail']}
values.update({
'file_age': translate(age(
datetime.fromtimestamp(values['file_date']))),
'file_date': datetime.fromtimestamp(
values['file_date']).isoformat(' '),
'file_size_label': translate(size_label(
values['file_size'], values['file_type'] == 'directory')),
'file_size': '{0}'.format(values['file_size']) if values[
'file_type'] != 'directory' else '',
'file_rights': access[FILE_RIGHTS_INDEX],
'file_rights_label': translate(FILE_RIGHTS_LABELS.get(
access[FILE_RIGHTS_INDEX], FILE_RIGHTS_LABELS['reader'])),
'warehouse_name': warehouse.label(request)})
# Metadata
values.update({
'meta_rights': access[META_RIGHTS_INDEX],
'meta_rights_label': translate(META_RIGHTS_LABELS[
access[META_RIGHTS_INDEX]]),
'metafields': warehouse.metafields})
values['metadata'] = warehouse.infos_read(
join(pfile['directory'], pfile['file_name']),
warehouse.metafields.keys(), False, request)
values['only_groups'] = tuple(values['metadata']['only_groups'])
# Sharing
values['sharing_links'] = [
(k.sharing_id,
request.route_url('sharing_download', sharing_id=k.sharing_id),
k.message or '', k.password is not None, k.expiration,
len(k.files))
for k in request.dbsession.query(DBSharing).join(
DBSharingFile).filter(
DBSharingFile.file_id == pfile['file_id'])]
# VCS log
values['log'] = warehouse.vcs.log(
pfile['directory'], pfile['file_name'])
# Handler panel
handler = warehouse.get_handler(join(
warehouse.root, pfile['directory'], pfile['file_name']))[0]
values['handler_panel'] = handler.panel if handler else None
self.set_values(request, values)
# -------------------------------------------------------------------------
[docs] def action(self, request, warehouse, paging, form, action):
"""Execute actions of the file information panel.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:type paging: chrysalio.lib.paging.Paging
:param paging:
Paging containing all the files.
:type form: chrysalio.lib.form.Form
:param form:
Current form.
:param str action:
Current action.
:rtype: str
:return:
Return a possibly new action.
"""
# Open panel
if action[:4] in ('nfo+', 'shr+'):
self.prepare_rendering(
request, warehouse,
paging.get_item('file_id', action[4:].partition('-')[0]))
self.open(request)
# Rename file
elif action[:4] == 'ren!':
if form.validate():
self._rename_file(
request, warehouse, paging, form.values['new_name'])
else:
action = 'ren?{0}'.format(action[4:])
# Open handler panel
elif action[:4] == 'pnl!' and action[4:] in request.registry['panels']:
handler_panel = request.registry['panels'][action[4:]]
if handler_panel.prepare_rendering(
request, warehouse,
paging.get_item('file_id', self.file_id(request))):
handler_panel.open(request)
# Update groups
elif action[:4] == 'grp!':
form.validate()
self._update_groups(request, warehouse, paging, form.values)
# Update metadata
elif action[:4] == 'mta!':
if form.validate():
self._update_metadata(request, warehouse, paging, form.values)
else:
action = 'mta?{0}'.format(action[4:])
# Remove sharing
elif action[:4] == 'shr~':
DBSharing.delete(request, action[4:].partition('-')[2], paging)
self.prepare_rendering(
request, warehouse,
paging.get_item('file_id', action[4:].partition('-')[0]), True)
return action
# -------------------------------------------------------------------------
[docs] def file_id(self, request):
"""Return the current file ID.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: str
"""
return self.value(request, 'file_id')
# -------------------------------------------------------------------------
[docs] def current(self, request, pitem):
"""Return ``'current'`` if the item is the current one for the file
information panel.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict pitem:
Paging item.
:rtype: ``'current'`` or ``None``
"""
return 'current' if self.is_open(request) and \
pitem['file_id'] == self.file_id(request) else None
# -------------------------------------------------------------------------
def _rename_file(self, request, warehouse, paging, new_name):
"""Rename a file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:type paging: chrysalio.lib.paging.Paging
:param paging:
Current paging.
:param str file_id:
Digest representation of the file.
"""
pfile = paging.get_item('file_id', self.file_id(request))
if pfile is None:
return
# Access
values = self.values(request)
if (values['file_rights'] or '')[:6] != 'writer':
request.session.flash(_(
'You cannot rename a file of "${l}".',
{'l': warehouse.label(request)}), 'alert')
return
# Rename
new_name = warehouse.file_normalize(
new_name, values['file_type'] == 'directory')
error = warehouse.vcs.move(
(values['directory'], values['file_name']),
(values['directory'], basename(new_name)))
if error is not None:
request.session.flash(error, 'alert')
return
# Commit
warehouse.vcs.commit(
request.localizer.translate(_('Renaming')),
request.session['user']['name'],
request.session['user']['email'])
# Refresh warehouse and clear cache
file_list = [
(values['directory'], values['file_name']),
(values['directory'], new_name)] + warehouse.directory_file_list(
join(values['directory'], new_name))
warehouse.index_update(request.dbsession, file_list, request)
request.registry['modules']['ciowarehouse'].cache_clear(
request, warehouse.uid)
# Update fields
values['file_id'] = make_file_id(join(
values['warehouse_id'], values['directory'], new_name))
values['file_name'] = new_name
pfile['file_id'] = values['file_id']
pfile['file_name'] = new_name
request.session['panels'][self.uid]['values'] = values
# -------------------------------------------------------------------------
def _update_groups(self, request, warehouse, paging, new_values):
"""Update authorized groups.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:type paging: chrysalio.lib.paging.Paging
:param paging:
Current paging.
:param dict new_values:
New values.
"""
pfile = paging.get_item('file_id', self.file_id(request))
if pfile is None:
return
# Access
if not request.registry['modules']['ciowarehouse'].warehouse_admin(
request, warehouse):
request.session.flash(_(
'You cannot update authorized groups of a file of "${l}".',
{'l': warehouse.label(request)}), 'alert')
return
# Load or create XML file
values = self.values(request)
infos_file = join(
warehouse.root, INFOS_DIR, values['directory'],
'{0}.xml'.format(values['file_name']))
root_elt = infos_load(infos_file, remove_groups=True)
# Update group list
groups_elt = etree.SubElement(root_elt[0], 'groups')
only_groups = []
for group_id in new_values:
if new_values[group_id]:
only_groups.append(group_id)
etree.SubElement(groups_elt, 'group').text = group_id
values['only_groups'] = tuple(only_groups)
request.session['panels'][self.uid]['values'] = values
# Save XML
if not infos_save(root_elt, infos_file, request):
return # pragma: nocover
self._commit_and_index(
request, warehouse, values, _('Authorized groups updated'))
# -------------------------------------------------------------------------
def _update_metadata(self, request, warehouse, paging, new_values):
"""Update metadata.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:type paging: chrysalio.lib.paging.Paging
:param paging:
Current paging.
:param dict new_values:
New values.
"""
pfile = paging.get_item('file_id', self.file_id(request))
if pfile is None:
return
# Access
values = self.values(request)
if values['meta_rights'] != 'writer':
request.session.flash(_(
'You cannot update metadata of a file of "${l}".',
{'l': warehouse.label(request)}), 'alert')
return
# Load or create XML file
infos_file = join(
warehouse.root, INFOS_DIR, values['directory'],
'{0}.xml'.format(values['file_name']))
root_elt = infos_load(infos_file, remove_metafields=True)
# Update metadata fields
metafields = []
for meta_id, meta in self.value(request, 'metafields').items():
if new_values.get(meta_id) is not None:
if isinstance(new_values[meta_id], datetime):
new_values[meta_id] = new_values[meta_id].replace(
microsecond=0, tzinfo=None)
metafields.append((meta_id, meta['type'], new_values[meta_id]))
values['metadata'][meta_id] = new_values[meta_id]
pfile[meta_id] = values['metadata'][meta_id]
else:
if meta_id in values['metadata']:
del values['metadata'][meta_id]
if meta_id in pfile:
del pfile[meta_id]
infos_set_metadata(root_elt, metafields)
request.session['panels'][self.uid]['values'] = values
# Save XML
if not infos_save(root_elt, infos_file, request):
return # pragma: nocover
self._commit_and_index(
request, warehouse, values, _('Metadata updated'))
# -------------------------------------------------------------------------
@classmethod
def _commit_and_index(cls, request, warehouse, values, message):
"""Update commit the change and update the index.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:param dict values:
Panel values.
:param str message:
Commit message
"""
# Commit change
warehouse.vcs.add()
warehouse.vcs.commit(
request.localizer.translate(message),
request.session['user']['name'],
request.session['user']['email'])
# Update index and clear cache
warehouse.index_update(
request.dbsession, ((values['directory'], values['file_name']),),
request=request, force=True)
request.registry['modules']['ciowarehouse'].cache_clear(
request, warehouse.uid)