"""View callables to manage files of a warehouse."""
from os.path import join, exists, normpath, isdir, isabs, basename
from pyramid.view import view_config
from pyramid.response import Response, FileResponse
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from chrysalio.lib.utils import mimetype_get
from chrysalio.lib.form import get_action
from ..lib.i18n import _
from ..lib.utils import THUMBNAILS_DIR, THUMBNAIL_LARGE, THUMBNAIL_SMALL
from ..lib.utils import MIMETYPES_DIR, HERE, NOT_FOUND
from ..lib.utils import files2response, file2response, move_inter_warehouses
# =============================================================================
[docs]class FileView(object):
"""Class to manage files of a warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
def __init__(self, request):
"""Constructor method."""
self._request = request
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_view')
def view(self):
"""Display a file according to its type."""
# pylint: disable = too-many-return-statements
if not self._request.matchdict['path']:
return HTTPFound(self._request.route_path(
'browse_directory_root',
warehouse_id=self._request.matchdict['warehouse_id']))
warehouse, path = self._get_warehouse(
join(*self._request.matchdict['path']))
abs_path = join(warehouse.root, path)
if not exists(abs_path):
if self._request.GET.get('class') == 'cioNaked':
return FileResponse(
NOT_FOUND, request=self._request,
content_type=mimetype_get(NOT_FOUND)[0])
warehouse.full_refresh(self._request, in_thread=True, force=True)
return self.download()
handler, content = warehouse.get_handler(abs_path)
if handler is None:
return self.download()
action = get_action(self._request)[0]
if action[:4] == 'edt!':
return HTTPFound(self._request.route_path(
'file_edit', warehouse_id=warehouse.uid, path=path,
_query={'rendering': action[4:]}))
if isdir(join(warehouse.root, path)):
return HTTPFound(self._request.route_path(
'glance_directory', warehouse_id=warehouse.uid, path=path))
if self._request.GET.get('class') != 'cioNaked':
self._request.breadcrumbs(
_('View of ${f}', {'f': self._request.matchdict['path'][-1]}),
replace=self._request.route_path(
'file_edit', warehouse_id=warehouse.uid, path=path),
root_chunks=3,
compare_params=self._request.GET.get('crumb') == '1')
body = handler.view(self._request, warehouse, content)
if body is None:
self._request.breadcrumbs.pop()
return self.download()
return Response(body)
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_edit')
def edit(self):
"""Edit a file according to its type."""
warehouse, path = self._get_warehouse(
join(*self._request.matchdict['path']))
handler, content = warehouse.get_handler(join(warehouse.root, path))
if handler is None:
raise HTTPForbidden(comment=_('You cannot edit this file!'))
action = get_action(self._request)[0]
if action[:4] == 'qut!':
handler.edit_finalization(self._request, warehouse, path)
return HTTPFound(self._request.route_path(
'file_view', warehouse_id=warehouse.uid, path=path))
self._request.breadcrumbs(
_('Edition of ${f}', {'f': self._request.matchdict['path'][-1]}),
replace=self._request.route_path(
'file_view', warehouse_id=warehouse.uid, path=path),
root_chunks=3)
body = handler.edit(self._request, warehouse, content)
if body is None:
return HTTPFound(self._request.route_path(
'file_view', warehouse_id=warehouse.uid, path=path))
return Response(body)
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_download')
@view_config(route_name='file_get')
def download(self, path=None):
"""Download a file."""
if path is None:
path = join(*self._request.matchdict['path'])
warehouse, path = self._get_warehouse(path)
if not isabs(path):
path = join(warehouse.root, path)
if not exists(path):
path = NOT_FOUND
if self._request.matched_route.name == 'file_get':
return FileResponse(
path, request=self._request,
content_type=mimetype_get(path)[0])
return files2response(self._request, (path,))
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_thumbnail')
def thumbnail(self):
"""Return the route to the directory of the thumbnail."""
return self.download(
join(*((THUMBNAILS_DIR,) + self._request.matchdict['path'])))
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_preview')
def preview(self):
"""Return the thumbnail of a file or its mimetype icon. If
``?size=small`` is set, it uses the small thumbnail.
"""
path = self._request.matchdict['path']
if not path:
return file2response(self._request, NOT_FOUND, basename(NOT_FOUND))
warehouse, path = self._get_warehouse(join(*path), no_exception=True)
if warehouse is None:
return file2response(self._request, NOT_FOUND, basename(NOT_FOUND))
abs_path = join(warehouse.root, path)
if not exists(abs_path):
return file2response(self._request, NOT_FOUND, basename(NOT_FOUND))
# Thumbnail
size = THUMBNAIL_SMALL \
if self._request.GET.get('size') == 'small' else THUMBNAIL_LARGE
thumb_dir = join(warehouse.root, THUMBNAILS_DIR, path)
if isdir(abs_path):
thumb_dir = join(thumb_dir, HERE)
if exists(thumb_dir):
if exists(join(thumb_dir, '{0}.jpg'.format(size))):
return self.download(
join(thumb_dir, '{0}.jpg'.format(size)))
return self.download(
join(thumb_dir, '{0}.png'.format(size)))
# Mimetype
abs_path = join(MIMETYPES_DIR, 'normal', '{0}.svg'.format(
mimetype_get(abs_path)[1]))
return files2response(self._request, (abs_path,))
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_move', renderer='json', xhr=True)
def move(self):
"""Move a file."""
# Source
ciowarehouse = self._request.registry['modules'][
'ciowarehouse']
warehouse1 = ciowarehouse.warehouse(
self._request, self._request.matchdict['warehouse1_id'])
if warehouse1 is None:
return {'success': False, 'reload': False}
file1 = warehouse1.file_get(
self._request, self._request.matchdict['file1_id'])
if file1 is None:
return {'success': False, 'reload': False}
copy_only = self._request.params.get('copy') == 'true' or \
not ciowarehouse.warehouse_file_writer(self._request, warehouse1)
# Destination
warehouse2 = ciowarehouse.warehouse(
self._request, self._request.matchdict['warehouse2_id'])
if not ciowarehouse.warehouse_file_writer(self._request, warehouse2):
self._request.session.flash(_('Unauthorized writing!'), 'alert')
return {'success': False, 'reload': True}
file2 = warehouse2.file_get(
self._request, self._request.matchdict['file2_id'])
if file2 is None or file2['file_type'] not in ('directory', 'root'):
self._request.session.flash(_('Unauthorized writing!'), 'alert')
return {'success': False, 'reload': True}
# Move or copy
if warehouse1.uid == warehouse2.uid:
error = warehouse1.vcs.move(
(file1['directory'], file1['file_name']),
(join(file2['directory'], file2['file_name']),
file1['file_name']), copy_only=copy_only)
else:
error = move_inter_warehouses(
warehouse1, file1, warehouse2, file2, copy_only)
if error:
self._request.session.flash(error, 'alert')
return {'success': False, 'reload': True}
# Commit and refresh
path1 = normpath(join(file1['directory'], file1['file_name']))
path2 = normpath(join(
file2['directory'], file2['file_name'], file1['file_name']))
if not copy_only:
if warehouse1.uid != warehouse2.uid:
warehouse1.vcs.commit(
self._request.localizer.translate(
_('Move to ${p}', {'p': join(warehouse2.uid, path2)})),
self._request.session['user']['name'],
self._request.session['user']['email'])
warehouse1.refresh(self._request, [path1])
warehouse2.vcs.commit(
self._request.localizer.translate(
_('Copy from ${p}', {'p': join(warehouse1.uid, path1)})
if copy_only else
_('Move from ${p}', {'p': join(warehouse1.uid, path1)})),
self._request.session['user']['name'],
self._request.session['user']['email'])
warehouse2.refresh(self._request, [path2], recursive=True)
return {'success': True,
'reload': not copy_only or warehouse1.uid != warehouse2.uid}
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_save', renderer='json', xhr=True)
def save(self):
"""Save a file."""
translate = self._request.localizer.translate
ciowarehouse = self._request.registry['modules']['ciowarehouse']
warehouse, path = self._get_warehouse(
join(*self._request.matchdict['path']), no_exception=True)
if warehouse is None or not ciowarehouse.warehouse_file_writer(
self._request, warehouse):
return {
'success': False, 'message': translate(_('Access denied.'))}
handler, content = warehouse.get_handler(join(warehouse.root, path))
if handler is None:
return {
'success': False, 'message': translate(_('No file handler.'))}
error = handler.save(
self._request, warehouse, content, self._request.POST, True)
return {
'success': error is None, 'message': translate(
_('Successfully saved.')) if error is None else error}
# -------------------------------------------------------------------------
def _get_warehouse(self, path, no_exception=False):
"""Return a warehouse object. Raise an exception if the access is
denied.
:param str path:
Relative path to the file.
:param bool no_exception:
If ``True`` return ``None`` in place of raising an exception.
:rtype: tuple
:return:
A tuple such as ``(warehouse, path)``
"""
warehouse_id = self._request.matchdict['warehouse_id']
if ':' in path:
warehouse_id, path = path.partition(':')[::2]
if '/' in warehouse_id:
warehouse_id = warehouse_id.split('/')[-1]
warehouse = self._request.registry['modules'][
'ciowarehouse'].warehouse(self._request, warehouse_id)
if warehouse is None and no_exception:
return None, path
if warehouse is None:
raise HTTPForbidden(comment=_('This file is not accessible!'))
user_groups = set(self._request.session['user']['groups'])
only_groups = warehouse.infos_only_groups(path, self._request)[0]
if only_groups and not user_groups & only_groups:
raise HTTPForbidden(comment=_('This file is not accessible!'))
return warehouse, path