Source code for ciowarehouse.views.file

"""View callables to manage files of a warehouse."""

from os.path import join, exists, normpath, isdir, isabs, basename

from pyramid.view import view_config
from pyramid.response import Response, FileResponse
from pyramid.httpexceptions import HTTPFound, HTTPForbidden

from chrysalio.lib.utils import mimetype_get
from chrysalio.lib.form import get_action
from ..lib.i18n import _
from ..lib.utils import THUMBNAILS_DIR, THUMBNAIL_LARGE, THUMBNAIL_SMALL
from ..lib.utils import MIMETYPES_DIR, HERE, NOT_FOUND
from ..lib.utils import files2response, file2response, move_inter_warehouses


# =============================================================================
[docs]class FileView(object): """Class to manage files of a warehouse. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request): """Constructor method.""" self._request = request # -------------------------------------------------------------------------
[docs] @view_config(route_name='file_view') def view(self): """Display a file according to its type.""" # pylint: disable = too-many-return-statements if not self._request.matchdict['path']: return HTTPFound(self._request.route_path( 'browse_directory_root', warehouse_id=self._request.matchdict['warehouse_id'])) warehouse, path = self._get_warehouse( join(*self._request.matchdict['path'])) abs_path = join(warehouse.root, path) if not exists(abs_path): if self._request.GET.get('class') == 'cioNaked': return FileResponse( NOT_FOUND, request=self._request, content_type=mimetype_get(NOT_FOUND)[0]) warehouse.full_refresh(self._request, in_thread=True, force=True) return self.download() handler, content = warehouse.get_handler(abs_path) if handler is None: return self.download() action = get_action(self._request)[0] if action[:4] == 'edt!': return HTTPFound(self._request.route_path( 'file_edit', warehouse_id=warehouse.uid, path=path, _query={'rendering': action[4:]})) if isdir(join(warehouse.root, path)): return HTTPFound(self._request.route_path( 'glance_directory', warehouse_id=warehouse.uid, path=path)) if self._request.GET.get('class') != 'cioNaked': self._request.breadcrumbs( _('View of ${f}', {'f': self._request.matchdict['path'][-1]}), replace=self._request.route_path( 'file_edit', warehouse_id=warehouse.uid, path=path), root_chunks=3, compare_params=self._request.GET.get('crumb') == '1') body = handler.view(self._request, warehouse, content) if body is None: self._request.breadcrumbs.pop() return self.download() return Response(body)
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_edit') def edit(self): """Edit a file according to its type.""" warehouse, path = self._get_warehouse( join(*self._request.matchdict['path'])) handler, content = warehouse.get_handler(join(warehouse.root, path)) if handler is None: raise HTTPForbidden(comment=_('You cannot edit this file!')) action = get_action(self._request)[0] if action[:4] == 'qut!': handler.edit_finalization(self._request, warehouse, path) return HTTPFound(self._request.route_path( 'file_view', warehouse_id=warehouse.uid, path=path)) self._request.breadcrumbs( _('Edition of ${f}', {'f': self._request.matchdict['path'][-1]}), replace=self._request.route_path( 'file_view', warehouse_id=warehouse.uid, path=path), root_chunks=3) body = handler.edit(self._request, warehouse, content) if body is None: return HTTPFound(self._request.route_path( 'file_view', warehouse_id=warehouse.uid, path=path)) return Response(body)
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_download') @view_config(route_name='file_get') def download(self, path=None): """Download a file.""" if path is None: path = join(*self._request.matchdict['path']) warehouse, path = self._get_warehouse(path) if not isabs(path): path = join(warehouse.root, path) if not exists(path): path = NOT_FOUND if self._request.matched_route.name == 'file_get': return FileResponse( path, request=self._request, content_type=mimetype_get(path)[0]) return files2response(self._request, (path,))
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_thumbnail') def thumbnail(self): """Return the route to the directory of the thumbnail.""" return self.download( join(*((THUMBNAILS_DIR,) + self._request.matchdict['path'])))
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_preview') def preview(self): """Return the thumbnail of a file or its mimetype icon. If ``?size=small`` is set, it uses the small thumbnail. """ path = self._request.matchdict['path'] if not path: return file2response(self._request, NOT_FOUND, basename(NOT_FOUND)) warehouse, path = self._get_warehouse(join(*path), no_exception=True) if warehouse is None: return file2response(self._request, NOT_FOUND, basename(NOT_FOUND)) abs_path = join(warehouse.root, path) if not exists(abs_path): return file2response(self._request, NOT_FOUND, basename(NOT_FOUND)) # Thumbnail size = THUMBNAIL_SMALL \ if self._request.GET.get('size') == 'small' else THUMBNAIL_LARGE thumb_dir = join(warehouse.root, THUMBNAILS_DIR, path) if isdir(abs_path): thumb_dir = join(thumb_dir, HERE) if exists(thumb_dir): if exists(join(thumb_dir, '{0}.jpg'.format(size))): return self.download( join(thumb_dir, '{0}.jpg'.format(size))) return self.download( join(thumb_dir, '{0}.png'.format(size))) # Mimetype abs_path = join(MIMETYPES_DIR, 'normal', '{0}.svg'.format( mimetype_get(abs_path)[1])) return files2response(self._request, (abs_path,))
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_move', renderer='json', xhr=True) def move(self): """Move a file.""" # Source ciowarehouse = self._request.registry['modules'][ 'ciowarehouse'] warehouse1 = ciowarehouse.warehouse( self._request, self._request.matchdict['warehouse1_id']) if warehouse1 is None: return {'success': False, 'reload': False} file1 = warehouse1.file_get( self._request, self._request.matchdict['file1_id']) if file1 is None: return {'success': False, 'reload': False} copy_only = self._request.params.get('copy') == 'true' or \ not ciowarehouse.warehouse_file_writer(self._request, warehouse1) # Destination warehouse2 = ciowarehouse.warehouse( self._request, self._request.matchdict['warehouse2_id']) if not ciowarehouse.warehouse_file_writer(self._request, warehouse2): self._request.session.flash(_('Unauthorized writing!'), 'alert') return {'success': False, 'reload': True} file2 = warehouse2.file_get( self._request, self._request.matchdict['file2_id']) if file2 is None or file2['file_type'] not in ('directory', 'root'): self._request.session.flash(_('Unauthorized writing!'), 'alert') return {'success': False, 'reload': True} # Move or copy if warehouse1.uid == warehouse2.uid: error = warehouse1.vcs.move( (file1['directory'], file1['file_name']), (join(file2['directory'], file2['file_name']), file1['file_name']), copy_only=copy_only) else: error = move_inter_warehouses( warehouse1, file1, warehouse2, file2, copy_only) if error: self._request.session.flash(error, 'alert') return {'success': False, 'reload': True} # Commit and refresh path1 = normpath(join(file1['directory'], file1['file_name'])) path2 = normpath(join( file2['directory'], file2['file_name'], file1['file_name'])) if not copy_only: if warehouse1.uid != warehouse2.uid: warehouse1.vcs.commit( self._request.localizer.translate( _('Move to ${p}', {'p': join(warehouse2.uid, path2)})), self._request.session['user']['name'], self._request.session['user']['email']) warehouse1.refresh(self._request, [path1]) warehouse2.vcs.commit( self._request.localizer.translate( _('Copy from ${p}', {'p': join(warehouse1.uid, path1)}) if copy_only else _('Move from ${p}', {'p': join(warehouse1.uid, path1)})), self._request.session['user']['name'], self._request.session['user']['email']) warehouse2.refresh(self._request, [path2], recursive=True) return {'success': True, 'reload': not copy_only or warehouse1.uid != warehouse2.uid}
# -------------------------------------------------------------------------
[docs] @view_config(route_name='file_save', renderer='json', xhr=True) def save(self): """Save a file.""" translate = self._request.localizer.translate ciowarehouse = self._request.registry['modules']['ciowarehouse'] warehouse, path = self._get_warehouse( join(*self._request.matchdict['path']), no_exception=True) if warehouse is None or not ciowarehouse.warehouse_file_writer( self._request, warehouse): return { 'success': False, 'message': translate(_('Access denied.'))} handler, content = warehouse.get_handler(join(warehouse.root, path)) if handler is None: return { 'success': False, 'message': translate(_('No file handler.'))} error = handler.save( self._request, warehouse, content, self._request.POST, True) return { 'success': error is None, 'message': translate( _('Successfully saved.')) if error is None else error}
# ------------------------------------------------------------------------- def _get_warehouse(self, path, no_exception=False): """Return a warehouse object. Raise an exception if the access is denied. :param str path: Relative path to the file. :param bool no_exception: If ``True`` return ``None`` in place of raising an exception. :rtype: tuple :return: A tuple such as ``(warehouse, path)`` """ warehouse_id = self._request.matchdict['warehouse_id'] if ':' in path: warehouse_id, path = path.partition(':')[::2] if '/' in warehouse_id: warehouse_id = warehouse_id.split('/')[-1] warehouse = self._request.registry['modules'][ 'ciowarehouse'].warehouse(self._request, warehouse_id) if warehouse is None and no_exception: return None, path if warehouse is None: raise HTTPForbidden(comment=_('This file is not accessible!')) user_groups = set(self._request.session['user']['groups']) only_groups = warehouse.infos_only_groups(path, self._request)[0] if only_groups and not user_groups & only_groups: raise HTTPForbidden(comment=_('This file is not accessible!')) return warehouse, path