Source code for ciowarehouse.lib.wfile

# -*- coding: utf-8 -*-
"""Warehouse file class."""

from os import makedirs
from os.path import join, exists, normpath, splitext
from shutil import copy
from webob.compat import cgi_FieldStorage as FieldStorage

from chrysalio.includes.clipboard import CLIPBOARD_LABEL_MAX_ITEMS
from chrysalio.helpers.literal import Literal
from ..lib.utils import THUMBNAILS_DIR, THUMBNAIL_LARGE, HERE, thumbnail_url
from ..lib.utils import move_inter_warehouses
from ..lib.i18n import _


WFILE_CLIPBOARD_DOMAIN = 'wfile'


# =============================================================================
[docs] class WFile(object): """Class to operate on files in a warehouse or in a result of search. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request): """Constructor method.""" self._request = request # -------------------------------------------------------------------------
[docs] def detect_thumbnails(self, file_list): """Complete each element of the file list with thumbnail information. :param list file_list: List of files to complete. """ for pfile in file_list: if 'thumbnail' in pfile: continue root = self._request.registry['modules']['ciowarehouse']\ .warehouse_root(self._request, pfile['warehouse_id']) abs_thumb = join( root, THUMBNAILS_DIR, pfile['directory'], pfile['file_name'], HERE if pfile['file_type'] == 'directory' else '.', '{0}.jpg'.format(THUMBNAIL_LARGE)).encode('utf8') if exists(abs_thumb): pfile['thumbnail'] = '.jpg' continue abs_thumb = join( root, THUMBNAILS_DIR, pfile['directory'], pfile['file_name'], HERE if pfile['file_type'] == 'directory' else '.', '{0}.png'.format(THUMBNAIL_LARGE)).encode('utf8') if exists(abs_thumb): pfile['thumbnail'] = '.png' else: pfile['thumbnail'] = None
# -------------------------------------------------------------------------
[docs] def remove(self, paging, file_ids): """Remove files. :type paging: chrysalio.lib.paging.Paging :param paging: Paging containing all the files. :param list file_ids: List of file IDs. """ # Find files to remove file_dict = {} for pfile in paging: if pfile['file_id'] in file_ids: if pfile['warehouse_id'] not in file_dict: file_dict[pfile['warehouse_id']] = [pfile] else: file_dict[pfile['warehouse_id']].append(pfile) # Browse warehouses and remove files ciowarehouse = self._request.registry['modules']['ciowarehouse'] for warehouse_id, pfiles in file_dict.items(): warehouse = ciowarehouse.warehouse(self._request, warehouse_id) if warehouse is None: continue if not ciowarehouse.warehouse_file_writer( self._request, warehouse): self._request.session.flash(_( 'You cannot remove a file from "${l}".', {'l': warehouse.label(self._request)}), 'alert') return warehouse.vcs.pull() files = [] for pfile in pfiles: error = warehouse.vcs.remove( pfile['directory'], pfile['file_name']) if error is not None: # pragma: nocover self._request.session.flash(error, 'alert') continue files.append((pfile['directory'], pfile['file_name'])) paging.remove(pfile) warehouse.vcs.commit( self._request.localizer.translate(_('Deletion of the file')), self._request.session['user']['name'], self._request.session['user']['email']) warehouse.index_update( self._request.dbsession, files, self._request) ciowarehouse.cache_clear(self._request, warehouse.uid)
# -------------------------------------------------------------------------
[docs] def clipboard_copy(self, paging, file_ids, cut=False): """Copy the files to the clipboard. :type paging: chrysalio.lib.paging.Paging :param paging: Paging containing all the files. :param list file_ids: List of file IDs. :param bool cut: (default=False) ``True`` for a `cut` operation. """ if 'panels' not in self._request.registry or \ 'clipboard' not in self._request.registry['panels']: return data = [] label = '' html_label = '' num = 0 for pfile in paging: if pfile['file_id'] not in file_ids: continue data.append(( pfile['warehouse_id'], pfile['directory'], pfile['file_name'])) if num < CLIPBOARD_LABEL_MAX_ITEMS: if label: label += ', ' label += normpath(join( pfile['warehouse_id'], pfile['directory'], pfile['file_name'])) view = join(pfile['directory'], pfile['file_name']) view = self._request.route_path( 'browse_directory', warehouse_id=pfile['warehouse_id'], path=view) if pfile['file_type'] == 'directory' else \ self._request.route_path( 'file_view', warehouse_id=pfile['warehouse_id'], path=view) html_label += '<p><a href="{view}">'\ '<img src="{icon}" alt="{name}" class="cioIcon"></a>'\ ' <a href="{view}">{path}</a></p>'.format( view=view, icon=thumbnail_url(self._request, pfile, 'small'), name=pfile['file_name'], path=normpath(join( pfile['warehouse_id'], pfile['directory'], pfile['file_name']))) elif '…' not in label: label += '…' html_label += '…' num += 1 html_label = Literal(html_label) self._request.registry['panels']['clipboard'].push( self._request, WFILE_CLIPBOARD_DOMAIN, cut, tuple(data), html_label) self._request.session.flash(_('${l} in the clipboard.', {'l': label}))
# -------------------------------------------------------------------------
[docs] def clipboard_paste(self, warehouse, directory): """Paste the files from the clipboard to the directroy. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param str directory: Current directory. """ if warehouse is None or 'panels' not in self._request.registry \ or 'clipboard' not in self._request.registry['panels']: return if not warehouse.lock(join(warehouse.root, directory)): self._request.session.flash(_('The directory is locked!'), 'alert') return # Sort by warehouse warehouses = {} entries = self._request.registry['panels']['clipboard'].selection( self._request, (WFILE_CLIPBOARD_DOMAIN,)) for entry in entries: for data in entry[2]: if data[0] not in warehouses: warehouses[data[0]] = [] warehouses[data[0]].append(data[1:] + (entry[1],)) # Copy ciowarehouse = self._request.registry['modules']['ciowarehouse'] files = [] for warehouse_id, data in warehouses.items(): warehouse1 = ciowarehouse.warehouse(self._request, warehouse_id) if warehouse1 is None: continue files += self._clipboard_paste_from( warehouse1, data, warehouse, directory) # Commit and refresh warehouse.unlock(join(warehouse.root, directory)) self._commit_and_refresh(warehouse, files, _('Copy from clipboard')) if files and not self._request.session.peek_flash('alert'): self._request.session.flash(_('Paste successfully complete!'))
# -------------------------------------------------------------------------
[docs] def make_directory(self, warehouse, directory): """Make a directory. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param str directory: Current directory. """ filename = warehouse.file_normalize( self._request.POST.get('directory'), True) if not filename or exists( join(warehouse.root, directory, filename)): return if splitext(filename)[1] == '.xml': self._request.session.flash( _('You cannot use .xml as directory extension.'), 'alert') return makedirs(join(warehouse.root, directory, filename), exist_ok=True) warehouse.index_update( self._request.dbsession, [(directory, filename)], self._request) self._request.registry['modules']['ciowarehouse'].cache_clear( self._request, warehouse.uid)
# -------------------------------------------------------------------------
[docs] def new_file(self, warehouse, directory, seed_id): """Create a new file. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param str directory: Current directory. :param str seed_id: ID of a seed to create the file. """ filename = warehouse.file_normalize( self._request.POST.get('filename'), True) seed = warehouse.seeds(self._request).get(seed_id) if not filename or not seed or not exists(seed[2]): return if splitext(filename)[1] != splitext(seed[2])[1]: filename = '{0}{1}'.format(filename, splitext(seed[2])[1]) abs_path = join(warehouse.root, directory, filename) if exists(abs_path): self._request.session.flash( _('File ${f} already exists!', {'f': filename}), 'alert') return copy(seed[2], abs_path) warehouse.vcs.add(abs_path) self._commit_and_refresh( warehouse, ((directory, filename),), _('New file'))
# -------------------------------------------------------------------------
[docs] def upload_all(self, warehouse, directory): """Import several files in the current directory. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param str directory: Current directory. """ # Lock warehouse if not warehouse.lock(join(warehouse.root, directory)): self._request.session.flash(_('The directory is locked!'), 'alert') return ciowarehouse = self._request.registry['modules']['ciowarehouse'] if not ciowarehouse.warehouse_file_writer(self._request, warehouse): self._request.session.flash( _('You cannot import files here.'), 'alert') warehouse.unlock(join(warehouse.root, directory)) return # Retrieve files files = [] for field_storage in self._request.POST.getall('file'): if isinstance(field_storage, FieldStorage): files.append(self._upload(warehouse, directory, field_storage)) files = [(directory, k) for k in files if k is not None] if not files: warehouse.unlock(join(warehouse.root, directory)) return # Add, index, commit and refresh warehouse.vcs.pull() for item in files: warehouse.vcs.add(join(*item)) warehouse.unlock(join(warehouse.root, directory)) self._commit_and_refresh(warehouse, files, _('Upload')) if not self._request.session.peek_flash('alert'): self._request.session.flash( _('Upload successfully complete!'), 'refresh')
# ------------------------------------------------------------------------- @classmethod def _upload(cls, warehouse, directory, field_storage): """Import one file in the current directory. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param str directory: Current directory. :type field_storage: cgi.FieldStorage :param field_storage: Object containing the file coming from the Web. :rtype: str :return: Name of imported file or ``None``. """ filename = warehouse.file_normalize(field_storage.filename) if filename is None: return None abs_path = join(warehouse.root, directory, filename).encode('utf8') with open(abs_path, 'wb') as hdl: hdl.write(field_storage.file.read()) field_storage.file.close() return filename # ------------------------------------------------------------------------- def _clipboard_paste_from(self, warehouse1, data, warehouse2, directory2): """Copy/cut from a source . :type warehouse1: .lib.warehouse.Warehouse :param warehouse1: Source warehouse object. :param dict data: Description of data to transfer. :type warehouse2: .lib.warehouse.Warehouse :param warehouse2: Target warehouse object. :param str directory2: Target directory2. :rtype: list :return: A list of tuples of copied files like ``(directory, file_name)``. """ # Move or copy files1 = [] files2 = [] warehouse1.vcs.pull() if warehouse1.uid != warehouse2.uid: warehouse2.vcs.pull() for item in data: abs_file1 = join(warehouse1.root, item[0], item[1]) if not exists(abs_file1): continue abs_file2 = join(warehouse2.root, directory2, item[1]) file2 = item[1] if abs_file1 != abs_file2 else '{0}_2{1}'.format( *splitext(item[1])) if exists(join(warehouse2.root, directory2, file2)): warehouse2.vcs.remove(directory2, file2) if warehouse1.uid == warehouse2.uid: warehouse1.vcs.move( (item[0], item[1]), (directory2, file2), copy_only=not item[2]) else: move_inter_warehouses( warehouse1, {'directory': item[0], 'file_name': item[1]}, warehouse2, {'directory': directory2, 'file_name': '.'}, not item[2]) if item[2]: files1.append((item[0], item[1])) files2.append((directory2, file2)) # Commit and refresh if files1: self._commit_and_refresh( warehouse1, files1, _('Move to ${p}', { 'p': normpath(join(warehouse2.uid, directory2))})) return files2 # ------------------------------------------------------------------------- def _commit_and_refresh(self, warehouse, files, message, in_thread=False): """Update VCS and index for files ``files``. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param list files: List of tuple such as ``(directory, file_name)``. :type message: pyramid.i18n.TranslationString :param message: Commit message. :param bool in_thread: (default=False) Launch the refresh in a thread. """ # VCS error = warehouse.vcs.commit( self._request.localizer.translate(message), self._request.session['user']['name'], self._request.session['user']['email']) if error: # pragma: nocover self._request.session.flash(error, 'alert') # Refresh warehouse.refresh( self._request, [join(k[0], k[1]) for k in files], recursive=True, in_thread=in_thread, dbsession=self._request.dbsession) self._request.registry['modules']['ciowarehouse'].cache_clear( self._request, warehouse.uid)