# -*- coding: utf-8 -*-
"""Warehouse file class."""
from os import makedirs
from os.path import join, exists, normpath, splitext
from shutil import copy
from webob.compat import cgi_FieldStorage as FieldStorage
from chrysalio.includes.clipboard import CLIPBOARD_LABEL_MAX_ITEMS
from chrysalio.helpers.literal import Literal
from ..lib.utils import THUMBNAILS_DIR, THUMBNAIL_LARGE, HERE, thumbnail_url
from ..lib.utils import move_inter_warehouses
from ..lib.i18n import _
WFILE_CLIPBOARD_DOMAIN = 'wfile'
# =============================================================================
[docs]
class WFile(object):
"""Class to operate on files in a warehouse or in a result of search.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
def __init__(self, request):
"""Constructor method."""
self._request = request
# -------------------------------------------------------------------------
[docs]
def detect_thumbnails(self, file_list):
"""Complete each element of the file list with thumbnail information.
:param list file_list:
List of files to complete.
"""
for pfile in file_list:
if 'thumbnail' in pfile:
continue
root = self._request.registry['modules']['ciowarehouse']\
.warehouse_root(self._request, pfile['warehouse_id'])
abs_thumb = join(
root, THUMBNAILS_DIR, pfile['directory'], pfile['file_name'],
HERE if pfile['file_type'] == 'directory' else '.',
'{0}.jpg'.format(THUMBNAIL_LARGE)).encode('utf8')
if exists(abs_thumb):
pfile['thumbnail'] = '.jpg'
continue
abs_thumb = join(
root, THUMBNAILS_DIR, pfile['directory'], pfile['file_name'],
HERE if pfile['file_type'] == 'directory' else '.',
'{0}.png'.format(THUMBNAIL_LARGE)).encode('utf8')
if exists(abs_thumb):
pfile['thumbnail'] = '.png'
else:
pfile['thumbnail'] = None
# -------------------------------------------------------------------------
[docs]
def remove(self, paging, file_ids):
"""Remove files.
:type paging: chrysalio.lib.paging.Paging
:param paging:
Paging containing all the files.
:param list file_ids:
List of file IDs.
"""
# Find files to remove
file_dict = {}
for pfile in paging:
if pfile['file_id'] in file_ids:
if pfile['warehouse_id'] not in file_dict:
file_dict[pfile['warehouse_id']] = [pfile]
else:
file_dict[pfile['warehouse_id']].append(pfile)
# Browse warehouses and remove files
ciowarehouse = self._request.registry['modules']['ciowarehouse']
for warehouse_id, pfiles in file_dict.items():
warehouse = ciowarehouse.warehouse(self._request, warehouse_id)
if warehouse is None:
continue
if not ciowarehouse.warehouse_file_writer(
self._request, warehouse):
self._request.session.flash(_(
'You cannot remove a file from "${l}".',
{'l': warehouse.label(self._request)}), 'alert')
return
warehouse.vcs.pull()
files = []
for pfile in pfiles:
error = warehouse.vcs.remove(
pfile['directory'], pfile['file_name'])
if error is not None: # pragma: nocover
self._request.session.flash(error, 'alert')
continue
files.append((pfile['directory'], pfile['file_name']))
paging.remove(pfile)
warehouse.vcs.commit(
self._request.localizer.translate(_('Deletion of the file')),
self._request.session['user']['name'],
self._request.session['user']['email'])
warehouse.index_update(
self._request.dbsession, files, self._request)
ciowarehouse.cache_clear(self._request, warehouse.uid)
# -------------------------------------------------------------------------
[docs]
def clipboard_copy(self, paging, file_ids, cut=False):
"""Copy the files to the clipboard.
:type paging: chrysalio.lib.paging.Paging
:param paging:
Paging containing all the files.
:param list file_ids:
List of file IDs.
:param bool cut: (default=False)
``True`` for a `cut` operation.
"""
if 'panels' not in self._request.registry or \
'clipboard' not in self._request.registry['panels']:
return
data = []
label = ''
html_label = ''
num = 0
for pfile in paging:
if pfile['file_id'] not in file_ids:
continue
data.append((
pfile['warehouse_id'], pfile['directory'],
pfile['file_name']))
if num < CLIPBOARD_LABEL_MAX_ITEMS:
if label:
label += ', '
label += normpath(join(
pfile['warehouse_id'], pfile['directory'],
pfile['file_name']))
view = join(pfile['directory'], pfile['file_name'])
view = self._request.route_path(
'browse_directory', warehouse_id=pfile['warehouse_id'],
path=view) if pfile['file_type'] == 'directory' else \
self._request.route_path(
'file_view', warehouse_id=pfile['warehouse_id'],
path=view)
html_label += '<p><a href="{view}">'\
'<img src="{icon}" alt="{name}" class="cioIcon"></a>'\
' <a href="{view}">{path}</a></p>'.format(
view=view,
icon=thumbnail_url(self._request, pfile, 'small'),
name=pfile['file_name'],
path=normpath(join(
pfile['warehouse_id'], pfile['directory'],
pfile['file_name'])))
elif '…' not in label:
label += '…'
html_label += '…'
num += 1
html_label = Literal(html_label)
self._request.registry['panels']['clipboard'].push(
self._request, WFILE_CLIPBOARD_DOMAIN, cut, tuple(data),
html_label)
self._request.session.flash(_('${l} in the clipboard.', {'l': label}))
# -------------------------------------------------------------------------
[docs]
def clipboard_paste(self, warehouse, directory):
"""Paste the files from the clipboard to the directroy.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param str directory:
Current directory.
"""
if warehouse is None or 'panels' not in self._request.registry \
or 'clipboard' not in self._request.registry['panels']:
return
if not warehouse.lock(join(warehouse.root, directory)):
self._request.session.flash(_('The directory is locked!'), 'alert')
return
# Sort by warehouse
warehouses = {}
entries = self._request.registry['panels']['clipboard'].selection(
self._request, (WFILE_CLIPBOARD_DOMAIN,))
for entry in entries:
for data in entry[2]:
if data[0] not in warehouses:
warehouses[data[0]] = []
warehouses[data[0]].append(data[1:] + (entry[1],))
# Copy
ciowarehouse = self._request.registry['modules']['ciowarehouse']
files = []
for warehouse_id, data in warehouses.items():
warehouse1 = ciowarehouse.warehouse(self._request, warehouse_id)
if warehouse1 is None:
continue
files += self._clipboard_paste_from(
warehouse1, data, warehouse, directory)
# Commit and refresh
warehouse.unlock(join(warehouse.root, directory))
self._commit_and_refresh(warehouse, files, _('Copy from clipboard'))
if files and not self._request.session.peek_flash('alert'):
self._request.session.flash(_('Paste successfully complete!'))
# -------------------------------------------------------------------------
[docs]
def make_directory(self, warehouse, directory):
"""Make a directory.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param str directory:
Current directory.
"""
filename = warehouse.file_normalize(
self._request.POST.get('directory'), True)
if not filename or exists(
join(warehouse.root, directory, filename)):
return
if splitext(filename)[1] == '.xml':
self._request.session.flash(
_('You cannot use .xml as directory extension.'), 'alert')
return
makedirs(join(warehouse.root, directory, filename), exist_ok=True)
warehouse.index_update(
self._request.dbsession, [(directory, filename)],
self._request)
self._request.registry['modules']['ciowarehouse'].cache_clear(
self._request, warehouse.uid)
# -------------------------------------------------------------------------
[docs]
def new_file(self, warehouse, directory, seed_id):
"""Create a new file.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param str directory:
Current directory.
:param str seed_id:
ID of a seed to create the file.
"""
filename = warehouse.file_normalize(
self._request.POST.get('filename'), True)
seed = warehouse.seeds(self._request).get(seed_id)
if not filename or not seed or not exists(seed[2]):
return
if splitext(filename)[1] != splitext(seed[2])[1]:
filename = '{0}{1}'.format(filename, splitext(seed[2])[1])
abs_path = join(warehouse.root, directory, filename)
if exists(abs_path):
self._request.session.flash(
_('File ${f} already exists!', {'f': filename}), 'alert')
return
copy(seed[2], abs_path)
warehouse.vcs.add(abs_path)
self._commit_and_refresh(
warehouse, ((directory, filename),), _('New file'))
# -------------------------------------------------------------------------
[docs]
def upload_all(self, warehouse, directory):
"""Import several files in the current directory.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param str directory:
Current directory.
"""
# Lock warehouse
if not warehouse.lock(join(warehouse.root, directory)):
self._request.session.flash(_('The directory is locked!'), 'alert')
return
ciowarehouse = self._request.registry['modules']['ciowarehouse']
if not ciowarehouse.warehouse_file_writer(self._request, warehouse):
self._request.session.flash(
_('You cannot import files here.'), 'alert')
warehouse.unlock(join(warehouse.root, directory))
return
# Retrieve files
files = []
for field_storage in self._request.POST.getall('file'):
if isinstance(field_storage, FieldStorage):
files.append(self._upload(warehouse, directory, field_storage))
files = [(directory, k) for k in files if k is not None]
if not files:
warehouse.unlock(join(warehouse.root, directory))
return
# Add, index, commit and refresh
warehouse.vcs.pull()
for item in files:
warehouse.vcs.add(join(*item))
warehouse.unlock(join(warehouse.root, directory))
self._commit_and_refresh(warehouse, files, _('Upload'))
if not self._request.session.peek_flash('alert'):
self._request.session.flash(
_('Upload successfully complete!'), 'refresh')
# -------------------------------------------------------------------------
@classmethod
def _upload(cls, warehouse, directory, field_storage):
"""Import one file in the current directory.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param str directory:
Current directory.
:type field_storage: cgi.FieldStorage
:param field_storage:
Object containing the file coming from the Web.
:rtype: str
:return:
Name of imported file or ``None``.
"""
filename = warehouse.file_normalize(field_storage.filename)
if filename is None:
return None
abs_path = join(warehouse.root, directory, filename).encode('utf8')
with open(abs_path, 'wb') as hdl:
hdl.write(field_storage.file.read())
field_storage.file.close()
return filename
# -------------------------------------------------------------------------
def _clipboard_paste_from(self, warehouse1, data, warehouse2, directory2):
"""Copy/cut from a source .
:type warehouse1: .lib.warehouse.Warehouse
:param warehouse1:
Source warehouse object.
:param dict data:
Description of data to transfer.
:type warehouse2: .lib.warehouse.Warehouse
:param warehouse2:
Target warehouse object.
:param str directory2:
Target directory2.
:rtype: list
:return:
A list of tuples of copied files like ``(directory, file_name)``.
"""
# Move or copy
files1 = []
files2 = []
warehouse1.vcs.pull()
if warehouse1.uid != warehouse2.uid:
warehouse2.vcs.pull()
for item in data:
abs_file1 = join(warehouse1.root, item[0], item[1])
if not exists(abs_file1):
continue
abs_file2 = join(warehouse2.root, directory2, item[1])
file2 = item[1] if abs_file1 != abs_file2 else '{0}_2{1}'.format(
*splitext(item[1]))
if exists(join(warehouse2.root, directory2, file2)):
warehouse2.vcs.remove(directory2, file2)
if warehouse1.uid == warehouse2.uid:
warehouse1.vcs.move(
(item[0], item[1]), (directory2, file2),
copy_only=not item[2])
else:
move_inter_warehouses(
warehouse1, {'directory': item[0], 'file_name': item[1]},
warehouse2, {'directory': directory2, 'file_name': '.'},
not item[2])
if item[2]:
files1.append((item[0], item[1]))
files2.append((directory2, file2))
# Commit and refresh
if files1:
self._commit_and_refresh(
warehouse1, files1, _('Move to ${p}', {
'p': normpath(join(warehouse2.uid, directory2))}))
return files2
# -------------------------------------------------------------------------
def _commit_and_refresh(self, warehouse, files, message, in_thread=False):
"""Update VCS and index for files ``files``.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param list files:
List of tuple such as ``(directory, file_name)``.
:type message: pyramid.i18n.TranslationString
:param message:
Commit message.
:param bool in_thread: (default=False)
Launch the refresh in a thread.
"""
# VCS
error = warehouse.vcs.commit(
self._request.localizer.translate(message),
self._request.session['user']['name'],
self._request.session['user']['email'])
if error: # pragma: nocover
self._request.session.flash(error, 'alert')
# Refresh
warehouse.refresh(
self._request, [join(k[0], k[1]) for k in files], recursive=True,
in_thread=in_thread, dbsession=self._request.dbsession)
self._request.registry['modules']['ciowarehouse'].cache_clear(
self._request, warehouse.uid)