Source code for ciowarehouse.lib.warehouse

# pylint: disable = too-many-lines
"""Warehouse class."""

from logging import getLogger
from os import stat, sep, makedirs, rmdir, remove, scandir, walk
from os.path import join, exists, abspath, dirname, basename, getmtime
from os.path import isdir, relpath, splitext, normpath, commonpath
from shutil import rmtree
from json import loads
from time import time, sleep
from collections import OrderedDict
from threading import Thread
from configparser import ConfigParser

from pytomlpp import loads as toml_loads
from whoosh.index import exists_in, create_in, open_dir, LockError
from whoosh.fields import ID, STORED, KEYWORD, TEXT, NUMERIC, DATETIME, BOOLEAN
from whoosh.fields import Schema, UnknownFieldError
from whoosh.qparser import QueryParser, MultifieldParser
from whoosh.analysis import CharsetFilter, StemmingAnalyzer
from whoosh.query import Term, Prefix, And
from whoosh.searching import NoTermsException, TimeLimit
from whoosh.support.charset import accent_map
from transaction import manager

from chrysalio.lib.i18n import translate_field
from chrysalio.lib.utils import tounicode, tostr, decrypt
from chrysalio.lib.utils import mimetype_get, convert_value
from chrysalio.lib.log import log_error, log_warning, log_info
from chrysalio.lib.xml import load_xml2
from chrysalio.lib.attachment import attachment_url
from chrysalio.helpers.builder import Builder
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.models import get_tm_dbsession
from cioservice.lib.utils import location_path2abs_path
from cioservice.models.dbjob import DBJob
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..models.dbindexfield import INDEXFIELD_BUILTIN
from ..models.dbsharing import DBSharingFile
from ..lib.handler import Handler
from .i18n import _, translate
from .utils import INDEX_DIR, THUMBNAILS_DIR, INFOS_DIR, EXCLUDED_FILES
from .utils import LOCKS_DIR, REFRESHED_FILE, TOREFRESH_FILE, REFRESHING_FILE
from .utils import CIOWAREHOUSE_NS, CACHE_REGION_USER
from .utils import HERE, make_file_id, normalize_filename, cache_user_seeds
from .vcs_none import VcsNone
from .vcs_git import VcsGit

LOG = getLogger(__name__)
WHOOSH_RETRIES = 6


# =============================================================================
[docs] class Warehouse(): """Class to manage a warehouse. :param dict registry: Application registry. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: SQLAlchemy object representing the warehouse :param dict locations: Dictionary of locations. """ # pylint: disable = too-many-instance-attributes, too-many-public-methods # ------------------------------------------------------------------------- def __init__(self, registry, dbwarehouse, locations): """Constructor method.""" self.uid = dbwarehouse.warehouse_id self.created = time() self.root = abspath(join(locations[dbwarehouse.location], self.uid)) self._i18n_label = loads(dbwarehouse.i18n_label) self._i18n_description = dbwarehouse.i18n_description # Vcs if dbwarehouse.vcs: password = decrypt(dbwarehouse.vcs_password, 'warehouse') password = '-' if dbwarehouse.vcs_password and not password \ else password self.vcs = VcsGit( self.root, dbwarehouse.vcs_url, dbwarehouse.vcs_user, password, dbwarehouse.lock_ttl) else: self.vcs = VcsNone(self.root, dbwarehouse.vcs_url) self._normalize = dbwarehouse.normalize self.access = dbwarehouse.access # Thumbnails self.thumbnail_sizes = None if dbwarehouse.thumbnail_large and \ dbwarehouse.thumbnail_large != '0x0': self.thumbnail_sizes = ( tuple(int(k) for k in dbwarehouse.thumbnail_large.split('x')), tuple(int(k) for k in dbwarehouse.thumbnail_small.split('x'))) # Max size of a download self.download_max_size = dbwarehouse.download_max_size # Rendering and seeds directories self.rendering_dir = self._directory2abs_path( registry, dbwarehouse.rendering_dir) self.seeds_dir = self._directory2abs_path( registry, dbwarehouse.seeds_dir) # Metadata fields self.metafields = OrderedDict() for dbitem in dbwarehouse.metafields: if dbitem.metafield_id in registry['metafields']: self.metafields[dbitem.metafield_id] = \ registry['metafields'][dbitem.metafield_id] if not self.metafields and 'metafields' in registry: self.metafields = registry['metafields'] # Index fields self.indexfields = registry.get('indexfields', {}) # List fields items = [ k for k in sorted(dbwarehouse.indexfields, key=lambda k: k.in_list) if k.in_list ] item_ids = [k.indexfield_id for k in items] if items else \ [k for k in sorted(self.indexfields, key=lambda k: self.indexfields[k]['in_list']) if self.indexfields[k]['in_list']] self.listfields = [ { 'id': k, 'label': self.indexfields[k]['label'], 'type': self.indexfields[k]['field_type'], 'class': self.indexfields[k]['in_class'] } for k in item_ids ] # Card fields items = [ k for k in sorted(dbwarehouse.indexfields, key=lambda k: k.in_list) if k.in_cards ] item_ids = [k.indexfield_id for k in items] if items else \ [k for k in sorted(self.indexfields, key=lambda k: self.indexfields[k]['in_cards']) if self.indexfields[k]['in_cards']] self.cardfields = [ { 'id': k, 'label': self.indexfields[k]['label'], 'type': self.indexfields[k]['field_type'], 'class': self.indexfields[k]['in_class'] } for k in item_ids ] # Handlers self._handlers = registry.get('handlers', ()) if self._handlers and dbwarehouse.handlers: items = [dbitem.handler_id for dbitem in dbwarehouse.handlers] self._handlers = tuple( k for k in self._handlers if k.uid not in items) # Seeds self._seeds = {} for handler in self._handlers: handler.install() for seed_id in handler.seeds: self._seeds[seed_id] = handler.seed(seed_id) # Jobs self._job_ids = [k.job_id for k in dbwarehouse.jobs] self._jobs = {} # Thread self._thread = None self.lock_ttl = dbwarehouse.lock_ttl # Refresh self.refresh_period = dbwarehouse.refresh_period # Inkscape self.inkscape92 = registry.settings.get('inkscape92') == 'true' # -------------------------------------------------------------------------
[docs] def label(self, request): """Return a translated label. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ return translate_field(request, self._i18n_label, self.uid)
# -------------------------------------------------------------------------
[docs] def description(self, request): """Return a translated description. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ if not self._i18n_description: return '' return translate_field(request, self._i18n_description)
# -------------------------------------------------------------------------
[docs] def directory_path(self, request, list_path): """Return an absolute path of the directory pointed by the path contained in the list ``list_path``. Verify if the file is in the warehouse. :type request: pyramid.request.Request :param request: Current request. :param list list_path: Relative path in a list. :rtype: :class:`str` or ``None`` """ fixed = False directory = abspath(join(self.root, *list_path)) if not isdir(directory): fixed = True directory = dirname(directory) if not isdir(directory): fixed = True directory = dirname(directory) if fixed: self.full_refresh(request, in_thread=True, force=True) if directory.startswith(self.root) and exists(directory): return directory return None
# -------------------------------------------------------------------------
[docs] def file_trail(self, request, abs_path): """Return a relative path in the warehouse and a its HTML representation. :type request: pyramid.request.Request :param request: Current request. :param str abs_path: Absolute path to the current directory. :rtype: tuple :return: A tuple such as ``(html_path, rel_path)``. """ if not abs_path or abs_path == self.root: return Builder().span(self.uid), '.' html_path = Builder().a( self.uid, href=request.route_path( 'browse_directory_root', warehouse_id=self.uid), id=make_file_id(self.uid), class_='cioTrailChunk') rel_path = relpath(abs_path, self.root) list_path = rel_path.split(sep) for index in range(len(list_path) - 1): path = '/'.join(list_path[0:index + 1]) html_path += ' / ' + Builder().a( list_path[index], href=request.route_path( 'browse_directory', warehouse_id=self.uid, path=tostr(path)), id=make_file_id(join(self.uid, path)), class_='cioTrailChunk') html_path += ' / ' + Builder().span(list_path[-1]) return html_path, rel_path
# -------------------------------------------------------------------------
[docs] def file_get(self, request, file_id): """Return the dictionary of a file corresponding to the file ID. :type request: pyramid.request.Request :param request: Current request. :param str file_id: ID of the file to search. :rtype: class:`dict` or ``None`` """ dirs, files = self.index_search( request, ('file_id', ), 'file_id:"{0}"'.format(file_id), limit=1) files += dirs if not files and file_id == make_file_id(self.uid): files = [ { 'warehouse_id': self.uid, 'directory': '.', 'file_type': 'root', 'file_name': '', 'file_id': file_id, 'file_size': 0, 'file_date': getmtime(self.root), 'score': 1.0, 'shared': False } ] return files[0] if files else None
# -------------------------------------------------------------------------
[docs] def file_normalize(self, filename, is_dir=False): """Return a normalized file name or ``None`` if the file is in the excluded file list. :param str fielname: Name to normalize. :param bool is_dir: (default=False) ``True`` if the file is a directory. :rtype: str """ if '\\' in filename: filename = filename.split('\\')[-1] filename = basename(filename) if filename in EXCLUDED_FILES: return None if self._normalize: filename = normalize_filename(filename, self._normalize, is_dir) return filename
# -------------------------------------------------------------------------
[docs] def get_handler(self, abs_path): """Retrieve the best file handler for file ``abs_path``. :param str abs_path: Absolute path to the file. :rtype: tuple :return: A tuple such as ``(handler, content)`` where ``handler`` is a :class:`.lib.handler.Handler` or ``None``. """ extension = splitext(abs_path)[1] content = None for handler in self._handlers: found, content = handler.match(extension, abs_path, content) if found: return handler, content return None, content
# ------------------------------------------------------------------------- @cache_user_seeds(CIOWAREHOUSE_NS, CACHE_REGION_USER) def seeds(self, request): """Return a dictionary of available seeds. :type request: pyramid.request.Request :param request: Current request. :rtype: dictionary """ toml = None if self.seeds_dir: toml = join(self.seeds_dir, f'{self.uid.lower()}.toml') if not exists(toml): toml = join(self.seeds_dir, f'{self.uid}.toml') if exists(toml): with open(toml, 'r', encoding='utf8') as hdl: toml = toml_loads(hdl.read()) else: toml = None # From a custom directory available = {} if toml is not None: for seed in toml.get('seeds', ''): if 'name' not in seed: self.error( _('Name is missing for a dynamic seed'), request) continue if 'file' not in seed: self.error( _('Seed "${n}" is incorrect', {'n': seed['name']}), request) continue available[seed['name']] = ( seed.get('icon'), seed['label'].get( request.locale_name, seed['label'].get( 'en', seed['name'])) if 'label' in seed else seed['name'], seed['file'].format(here=self.seeds_dir)) # Hard coded theme = theme_static_prefix(request) for name, seed in self._seeds.items(): if seed not in available: available[name] = ( f'{theme}{seed[0]}' if seed[0] else None, seed[1], seed[2]) return available # -------------------------------------------------------------------------
[docs] def jobs(self, request): """Return a dictionary of available jobs for this warehouse. :type request: pyramid.request.Request :param request: Current request. :rtype: dict """ config_file = request.registry.settings['__file__'] config = ConfigParser({'here': dirname(config_file)}) config.read(tounicode(config_file), encoding='utf8') for job_id in self._job_ids: self.job(request, job_id, config) return self._jobs
# -------------------------------------------------------------------------
[docs] def job(self, request, job_id, config=None): """Return a dictionary representing the job. :type request: pyramid.request.Request :param request: Current request. :param str job_id: Job ID. :type config: configparser.ConfigParser :param config: (optional) Configuration parser based on application configuration file. :rtype: dict :return: A dictionary with keys ``'job_id'``, ``'i18n_label'``, ``'i18n_description'``, ``'icon'``, ``'threaded'``, ``'ttl'``, ``'priority'``, ``'settings'``, ``'service_id'``. If a problem succeeds, it returns ``None``. """ if job_id in self._jobs: return self._jobs[job_id] dbjob = request.dbsession.query(DBJob).filter_by(job_id=job_id).first() if dbjob is None: return None service = request.registry['services'].get(dbjob.service) \ if 'services' in request.registry else None if service is None: log_error( request, request.localizer.translate( _( 'Service "${s}" is not available.', {'s': dbjob.service}))) return None if config is None: config_file = request.registry.settings['__file__'] config = ConfigParser({'here': dirname(config_file)}) config.read(tounicode(config_file), encoding='utf8') section = 'Job:{0}'.format(job_id) settings = dict(config.items(section)) if config.has_section(section) \ else config.defaults() self._jobs[job_id] = { 'job_id': job_id, 'i18n_label': loads(dbjob.i18n_label), 'i18n_description': dbjob.i18n_description, 'icon': attachment_url( request, dbjob.attachments_dir, dbjob.attachments_key, dbjob.icon), 'service_id': service.uid, 'context': dbjob.context, 'access': dbjob.access, 'threaded': dbjob.threaded, 'ttl': dbjob.ttl, 'priority': dbjob.priority, 'users': tuple(k.user_id for k in dbjob.users), 'groups': {k.group_id for k in dbjob.groups}, 'settings': settings } return self._jobs[job_id]
# -------------------------------------------------------------------------
[docs] def full_refresh( self, request, reindex=False, recreate_thumbnails=False, in_thread=False, force=True): """Pull, index, create thumbnails and commit changes on the whole warehouse. :type request: pyramid.request.Request :param request: Current request. :param bool reindex: (default=False) Remove old index. :param bool recreate_thumbnails: (default=False) Remove old thumbnails. :param bool in_thread: (default=False) Launch the refresh in a thread. :param bool force: (default=True) If ``True``, force refreshing even if the deadline is not reached. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # Nothing to do refreshed_file = join(self.root, REFRESHED_FILE) if not force and exists(refreshed_file) and \ getmtime(refreshed_file) + self.refresh_period > time(): return None # Work in progress? refreshing_file = join(self.root, REFRESHING_FILE) if (self._thread is not None and self._thread.is_alive()) or ( exists(refreshing_file) and getmtime(refreshing_file) + self.refresh_period > time()): torefresh_file = join(self.root, TOREFRESH_FILE) makedirs(dirname(torefresh_file), exist_ok=True) with open(torefresh_file, 'w', encoding='utf8'): pass return _('${i}: refresh already in progress!', {'i': self.uid}) if in_thread: # In a thread self._thread = Thread( target=self._full_refresh, name='{0}:full_refresh'.format(self.uid), args=(request, reindex, recreate_thumbnails)) self._thread.start() else: # Directly self._full_refresh(request, reindex, recreate_thumbnails, 'index') self._thread = Thread( target=self._full_refresh, name='{0}:full_refresh_thumbnails'.format(self.uid), args=(request, reindex, recreate_thumbnails, 'thumbnails')) self._thread.start() return None
# ------------------------------------------------------------------------- def _full_refresh( self, request, reindex, recreate_thumbnails, only=None, again=False): """Pull, index, create thumbnails and commit changes, possibly in a thread, on the whole warehouse. :type request: pyramid.request.Request :param request: Current request. :param bool reindex: Remove old index. :param bool recreate_thumbnails: Remove old thumbnails. :param bool only: (optional) If ``'index'``, refresh index only ; if ``'thumbnails'``, refresh thumbnails only ; if ``None`` refres both. :param bool again: (default=False) If ``True``, this refresh follows another one. """ # Lock refreshing_file = join(self.root, REFRESHING_FILE) self.refreshed() with open(refreshing_file, 'w', encoding='utf8'): pass if only is None or only == 'index': # Add forgotten files self.vcs.pull() if self.vcs.is_dirty() and 'user' in request.session: self.vcs.add() self.vcs.commit( request.localizer.translate(_('Maintenance')), request.session['user']['name'], request.session['user']['email']) # Index if reindex: self.index_erase() with manager: dbsession = get_tm_dbsession( request.registry['dbsession_factory'], manager) self.index_update_all(dbsession, request) request.registry['modules']['ciowarehouse'].cache_clear( request, self.uid) if only is None or only == 'thumbnails': # Create thumbnails if recreate_thumbnails: self.thumbnails_erase() self.thumbnails_update_all(request) # Do it again torefresh_file = join(self.root, TOREFRESH_FILE) if exists(torefresh_file): remove(torefresh_file) log_info(request, 'warehouse_full_refresh', self.uid, 'again') self._full_refresh(request, False, False, again=True) # Unlock self.refreshed() if exists(refreshing_file): remove(refreshing_file) if only is None or only == 'index': request.registry['modules']['ciowarehouse'].cache_clear( request, self.uid) if not again: log_info( request, 'warehouse_full_refresh', self.uid, 'reindex={0}'.format(reindex), 'recreate_thumbnails={0}'.format(recreate_thumbnails), 'only={0}'.format(only) if only is not None else '') # -------------------------------------------------------------------------
[docs] def refresh( self, request, files, recursive=False, in_thread=False, dbsession=None, keep_cache=False): """Index and create thumbnails for a list of files. :type request: pyramid.request.Request :param request: Current request or ``None`` if called by a script. :param list files: List of paths of files relative to the warehouse root. :param bool recursive: (default=False) Refresh recursively. :param bool in_thread: (default=False) Launch the refresh in a thread. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :param bool keep_cache: (default=False) if ``True``, do not clear the cache. """ # pylint: disable = too-many-arguments if not files: return if in_thread: # In a thread thread = Thread( target=self._refresh, name='{0}:refresh'.format(self.uid), args=(files, recursive, dbsession, request, None, keep_cache)) thread.start() else: # Directly self._refresh( files, recursive, dbsession, request, 'index', keep_cache) thread = Thread( target=self._refresh, name='{0}:refresh_thumbnails'.format(self.uid), args=( files, recursive, dbsession, request, 'thumbnails', True)) thread.start()
# ------------------------------------------------------------------------- def _refresh(self, files, recursive, dbsession, request, only, keep_cache): """Index and create thumbnails, possibly in a thread. :param list files: List of paths of files relative to the warehouse root. :param bool recursive: Refresh recursively. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type request: pyramid.request.Request :param request: Current request or ``None`` if called by a script. :param bool only: If ``'index'``, refresh index only ; if ``'thumbnails'``, refresh thumbnails only ; if ``None`` refres both. :param bool keep_cache: if ``True``, do not clear the cache. """ # pylint: disable = too-many-arguments if dbsession is None and request is not None: with manager: dbsession = get_tm_dbsession( request.registry['dbsession_factory'], manager) self._refresh_loop(files, recursive, dbsession, request, only) elif dbsession is not None: self._refresh_loop(files, recursive, dbsession, request, only) if request is not None and not keep_cache: request.registry['modules']['ciowarehouse'].cache_clear( request, self.uid) if request is not None: log_info( request, 'warehouse_refresh', self.uid, 'recursive={0}'.format(recursive), 'only={0}'.format(only) if only is not None else '', 'files={0}'.format(', '.join(files))) # ------------------------------------------------------------------------- def _refresh_loop( self, files, recursive, dbsession, request=None, only=None): """Refresh loop. :param list files: List of paths of files relative to the warehouse root. :param bool recursive: Refresh recursively. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. :param bool only: (optional) If ``'index'``, refresh index only ; if ``'thumbnails'``, refresh thumbnails only ; if ``None`` refres both. """ for name in tuple(files): name = normpath(name) if name.startswith('..'): name = '.' file_list = [(dirname(name) or '.', basename(name))] \ if name != '.' else [] if recursive: file_list += self.directory_file_list(name) if only is None or only == 'index': self.index_update(dbsession, file_list, request) if only is None or only == 'thumbnails': self.thumbnails_update(file_list, request) # -------------------------------------------------------------------------
[docs] def refreshed(self): """Record the time of last refresh.""" refreshed_file = join(self.root, REFRESHED_FILE) makedirs(dirname(refreshed_file), exist_ok=True) with open(refreshed_file, 'w', encoding='utf8'): pass
# -------------------------------------------------------------------------
[docs] def unrefreshed(self): """Remove the record of last refresh.""" refreshed_file = join(self.root, REFRESHED_FILE) if exists(refreshed_file): remove(refreshed_file)
# -------------------------------------------------------------------------
[docs] def to_refresh(self, inputs, paths): """Compute an optimized list of relative paths to refresh. :param list inputs: List of absolute paths to the input files. :param str paths: List of relative paths to the output files. :rtype: set """ if not inputs: return {k.partition(sep)[0] for k in paths} root = relpath( commonpath(inputs) if len(inputs) > 1 else dirname(inputs[0]), self.root) path_set = set() for path in paths: chunks = relpath(path, root).split(sep) depth = [k for k in chunks if k == '..'] index = len(depth) index = index if index < len(chunks) else len(chunks) - 1 path_set.add(normpath(join(root, sep.join(depth), chunks[index]))) return path_set
# -------------------------------------------------------------------------
[docs] def lock(self, abs_file=None, relock=False): """Lock a file or the whole warehouse. :param str abs_file: (optional) Absolute path to the source file. :param bool relock: If ``True`` update the date/time of the lock. :rtype: bool """ lock_file = \ join(self.root, LOCKS_DIR, relpath(abs_file, self.root)) \ if abs_file else join(self.root, LOCKS_DIR) if abs_file is None or isdir(abs_file): lock_file = join(lock_file, HERE) if not relock and exists(lock_file) and \ getmtime(lock_file) + self.lock_ttl > time(): return False try: makedirs(dirname(lock_file), exist_ok=True) with open(lock_file, 'w', encoding='utf8'): pass except (OSError, FileNotFoundError): return True return True
# -------------------------------------------------------------------------
[docs] def unlock(self, abs_file=None): """Unlock a file or the whole warehouse. :param str abs_file: (optional) Relative path to a file or a directory. """ lock_file = \ join(self.root, LOCKS_DIR, relpath(abs_file, self.root)) \ if abs_file else join(self.root, LOCKS_DIR) if abs_file is None or isdir(abs_file): lock_file = join(lock_file, HERE) if exists(lock_file): try: remove(lock_file) except OSError: # pragma: nocover pass lock_file = dirname(lock_file) if not exists(lock_file): return while lock_file != self.root: try: if tuple(scandir(lock_file)): break rmdir(lock_file) except OSError: # pragma: nocover break lock_file = dirname(lock_file)
# -------------------------------------------------------------------------
[docs] def unlock_all(self): """Remove lock directory.""" lock_dir = join(self.root, LOCKS_DIR) if exists(lock_dir): rmtree(lock_dir)
# -------------------------------------------------------------------------
[docs] def thumbnails_erase(self): """Remove the thumbnail directory if exists.""" thumbnails_dir = join(self.root, THUMBNAILS_DIR) if exists(thumbnails_dir): rmtree(thumbnails_dir)
# -------------------------------------------------------------------------
[docs] def thumbnails_update_all(self, request=None, registry=None): """Update or create small and large thumbnails for the entire warehouse. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. :param registry: (optional) class:`pyramid.registry.Registry` or class:`chrysalio.scripts.ScriptRegistry` if called by populate script. """ if not self.thumbnail_sizes: return thumbnails_dir = join(self.root, THUMBNAILS_DIR) done = set() # Clean up obsolete thumbnails if exists(thumbnails_dir): self._thumbnails_cleanup(thumbnails_dir, done) # Create new thumbnails if registry is None and request is not None: registry = request.registry for path, dirs, files in walk(self.root): for name in tuple(dirs): if name in EXCLUDED_FILES: dirs.remove(name) continue abs_file = join(path, name) if abs_file in done: continue handler = self.get_handler(abs_file)[0] if handler: thumb_dir = join( thumbnails_dir, relpath(abs_file, self.root), HERE) handler.thumbnails( self, abs_file, thumb_dir, request, registry) for name in files: abs_file = join(path, name) if name in EXCLUDED_FILES or abs_file in done: continue handler = self.get_handler(abs_file)[0] if handler: thumb_dir = join( thumbnails_dir, relpath(abs_file, self.root)) handler.thumbnails( self, abs_file, thumb_dir, request, registry)
# ------------------------------------------------------------------------- def _thumbnails_cleanup(self, thumbnails_dir, done): """Clean up obsolete thumbnail. :param str thumbnails_dir: Absolute path to cache directory. :param set done: Set of up to date file. """ for path, dirs, ignored_ in walk(thumbnails_dir): for name in tuple(dirs): if name == HERE: continue thumb_dir = join(path, name) abs_file = join(self.root, relpath(thumb_dir, thumbnails_dir)) if not exists(abs_file): rmtree(thumb_dir) dirs.remove(name) continue if isdir(abs_file): thumb_dir = join(thumb_dir, HERE) handler = self.get_handler(abs_file)[0] or Handler() if handler.thumbnails_obsolete(abs_file, thumb_dir): if exists(thumb_dir): rmtree(thumb_dir) else: done.add(abs_file) if not tuple(scandir(path)): rmdir(path) # -------------------------------------------------------------------------
[docs] def thumbnails_update(self, files, request=None, registry=None): """Update or create small and large thumbnails for each file if it is possible. :param list files: List of tuples such as ``(directory, file_name)``. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. :param registry: (optional) class:`pyramid.registry.Registry` or class:`chrysalio.scripts.ScriptRegistry` if called by populate script. """ if registry is None and request is not None: registry = request.registry thumbnails_dir = join(self.root, THUMBNAILS_DIR) for k in files: abs_file = join(self.root, k[0], k[1]) if not exists(abs_file): continue handler = self.get_handler(abs_file)[0] if not handler: continue thumb_dir = join( thumbnails_dir, relpath(abs_file, self.root), HERE if isdir(abs_file) else '') if handler.thumbnails_obsolete(abs_file, thumb_dir): handler.thumbnails( self, abs_file, thumb_dir, request, registry)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
[docs] def index_erase(self): """Remove the index directory if exists.""" index_dir = join(self.root, INDEX_DIR) if exists(index_dir): rmtree(index_dir)
# -------------------------------------------------------------------------
[docs] def index_update_all(self, dbsession, request=None): """Update the search index. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ # Open index index = self._index_open_or_create() if index is None: self.warning( _('${i}: indexing in progress...', {'i': self.uid}), request) return writer = self._index_writer(index, request) if writer is None: return # Clean up obsolete index done = set() self._index_all_cleanup(writer, done) # Loop over the files for root, dirs, files in walk(self.root): for name in tuple(dirs): if name in EXCLUDED_FILES: dirs.remove(name) continue abs_file = join(root, name) path = relpath(abs_file, self.root) if path not in done: self._index_add_file( dbsession, writer, path, abs_file, request) for name in files: if name in EXCLUDED_FILES: continue abs_file = join(root, name) path = relpath(abs_file, self.root) if path not in done: self._index_add_file( dbsession, writer, path, abs_file, request) try: writer.commit() except UnicodeDecodeError as error: # pragma: nocover self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request) index.optimize() index.close()
# ------------------------------------------------------------------------- def _index_all_cleanup(self, writer, done): """Remove obsolete files from the search index. :type writer: whoosh.writing.IndexWriter :param writer: Writer on current index. :param set done: Set of up to date files. """ for docnum, fields in writer.reader().iter_docs(): abs_file = join( self.root, fields['directory'], fields['file_name']) if not exists(abs_file): writer.delete_document(docnum) continue if self.index_is_obsolete(fields, abs_file): writer.delete_document(docnum) else: done.add( join( fields['directory'] if fields['directory'] != '.' else '', fields['file_name'])) # -------------------------------------------------------------------------
[docs] def index_update(self, dbsession, files, request=None, force=False): """Update the search index. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :param list files: List of tuples such as ``(directory, file_name)``. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. :param bool force: (default=False) Force indexation. """ # pylint: disable = too-many-branches # Open index index = self._index_open_or_create() if index is None: self.warning( _('${i}: indexing in progress...', {'i': self.uid}), request) return writer = self._index_writer(index, request) if writer is None: return # Clean up obsolete index files = list(files) with index.searcher() as searcher: for item in tuple(files): abs_file = join(self.root, item[0], item[1]) file_exists = exists(abs_file) results = searcher.search( # yapf: disable And([Term('directory', item[0]), Term('file_name', item[1])]), limit=1) if not results: if not file_exists: files.remove(item) continue if results[0]['file_type'] == 'directory': self._index_cleanup_directory( writer, searcher, item, files) if not file_exists: files.remove(item) writer.delete_document(results[0].docnum) continue if force: writer.delete_document(results[0].docnum) continue if self.index_is_obsolete(results[0], abs_file): writer.delete_document(results[0].docnum) else: files.remove(item) # Add files to index for item in files: abs_file = normpath(join(self.root, item[0], item[1])) if abs_file.startswith(self.root) and abs_file != self.root: self._index_add_file( dbsession, writer, join(item[0], item[1]), abs_file, request) try: writer.commit() except UnicodeDecodeError as error: # pragma: nocover self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request) index.close()
# ------------------------------------------------------------------------- def _index_cleanup_directory(self, writer, searcher, item, files): """Remove obsolete files of a directory from the search index. :type writer: whoosh.searching.Searcher :param writer: Whoosh object to search inside the index. :type searcher: whoosh.searching.Searcher :param searcher: Whoosh object to search inside the index. :param tuple item: An item such as ``(directory, file_name)`` representing the directory to process. :param list files: List of tuples such as ``(directory, file_name)`` representing files being processed. """ # pylint: disable = unsupported-binary-operation path = normpath(join(item[0], item[1])) for hit in searcher.search(Term('directory', path) | Prefix('directory', f'{path}{sep}'), limit=None): if (hit['directory'], hit['file_name']) not in files \ and not exists(join( self.root, hit['directory'], hit['file_name'])): writer.delete_document(hit.docnum) # ------------------------------------------------------------------------- def _index_add_file(self, dbsession, writer, path, abs_path, request=None): """Add a file to the index. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type writer: whoosh.writing.IndexWriter :param writer: Writer on current index. :param str path: Relative path of the file. :param str abs_path: Absolute path of the file. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. """ # Basic fields path = tounicode(path) abs_path = tounicode(abs_path) stat_info = stat(abs_path) file_id = make_file_id(join(self.uid, path)) shared = bool( dbsession.query( DBSharingFile.sharing_id).filter_by(file_id=file_id).first()) is_dir = isdir(abs_path) whoosh_fields = { # yapf: disable 'time': time(), 'directory': dirname(path) or '.', 'file_name': basename(path), 'file_id': tounicode(file_id), 'file_type': 'directory' if is_dir else tounicode( mimetype_get(abs_path)[1]), 'file_size': 0 if is_dir else stat_info.st_size, 'file_date': int(stat_info.st_mtime), 'shared': shared } # Extra fields handler = self.get_handler(abs_path)[0] or Handler() handler.infos_complete_fields( self, path, abs_path, whoosh_fields, request) # Write information try: writer.add_document(**whoosh_fields) except (ValueError, UnknownFieldError) as error: # pragma: nocover self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request) # -------------------------------------------------------------------------
[docs] def index_is_obsolete(self, fields, abs_file): """Check if the Whoosh index is obsolete. :param fields: Fields of index record. :param str abs_file: Absolute path to the source file. :rtype: bool """ infos_file = join( self.root, INFOS_DIR, fields['directory'], '{0}.xml'.format(fields['file_name'])) return getmtime(abs_file) > fields['time'] \ or (exists(infos_file) and getmtime(infos_file) > fields['time'])
# ------------------------------------------------------------------------- def _index_open_or_create(self): """Open an index structure (possibly create it). :rtype: :class:`whoosh.index.FileIndex` or ``None`` """ # Create index_dir = join(self.root, INDEX_DIR) if not exists(index_dir) or \ not exists_in(index_dir, indexname=self.uid): return self._index_create(index_dir) # Open counter = 0 index = None while index is None and counter < WHOOSH_RETRIES: if counter: sleep(1) try: index = open_dir(index_dir, indexname=self.uid) except (AttributeError, LockError): # pragma: nocover index = None counter += 1 return index # ------------------------------------------------------------------------- def _index_create(self, index_dir): """Create an index structure. :param str index_dir: Absolute path to the index directory. :rtype: :class:`whoosh.index.FileIndex` or ``None`` """ # Create schema schema = Schema( time=STORED, directory=ID(stored=True), file_name=ID(stored=True), file_id=ID(stored=True), file_type=ID(stored=True), file_size=NUMERIC(stored=True, bits=64), file_date=NUMERIC(stored=True), only_groups=KEYWORD(stored=True, scorable=True, commas=True), shared=BOOLEAN(stored=True)) done = set(INDEXFIELD_BUILTIN) for indexfield_id in self.indexfields: if indexfield_id in done: continue indexfield = self.indexfields[indexfield_id] if indexfield['whoosh_type'] == 'ID': schema.add(indexfield_id, ID(stored=indexfield['stored'])) elif indexfield['whoosh_type'] == 'NUMERIC': schema.add(indexfield_id, NUMERIC(stored=indexfield['stored'])) elif indexfield['whoosh_type'] == 'BOOLEAN': schema.add(indexfield_id, BOOLEAN(stored=indexfield['stored'])) elif indexfield['whoosh_type'] == 'DATETIME': schema.add( indexfield_id, DATETIME(stored=indexfield['stored'])) elif indexfield['whoosh_type'] == 'KEYWORD': schema.add( indexfield_id, KEYWORD( stored=indexfield['stored'], scorable=True, commas=True)) elif indexfield['whoosh_type'] == 'STEMS': schema.add( indexfield_id, TEXT( analyzer=StemmingAnalyzer() | CharsetFilter(accent_map), stored=indexfield['stored'])) else: schema.add(indexfield_id, TEXT(stored=indexfield['stored'])) done.add(indexfield_id) # Create index try: makedirs(index_dir, exist_ok=True) return create_in(index_dir, schema, indexname=self.uid) except (IOError, AttributeError, LockError): # pragma: nocover return None # ------------------------------------------------------------------------- def _index_writer(self, index, request): """Retrieve an index writer. :param index: Absolute path to the index directory. :rtype: :class:`whoosh.index.IndexWriter` or ``None`` """ try: writer = index.writer() except (OSError, AttributeError) as error: self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request) return None except LockError: self.error(_('${i}: Lock error}', {'i': self.uid}), request) return None return writer # -------------------------------------------------------------------------
[docs] def infos_read(self, path, meta_ids, for_index, request=None): """Return a dictionary with authorized groups and metadata found in the infos XML file. :param str path: Relative path of the file. :param set meta_ids: Set of IDS of metadata fields to retrieve. :param bool for_index: If ``True``, convert boolean fields. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. :rtype: dict """ # Authorized groups fields = {} fields['only_groups'], tree = self.infos_only_groups(path, request) if for_index: fields['only_groups'] = ','.join(fields['only_groups']) if tree is None: return fields # Metadata for field_id in meta_ids: elt = tree.xpath( 'ns0:infos/ns0:metafields/ns0:metafield[@id="{0}"]'.format( field_id), namespaces={'ns0': RELAXNG_CIOWAREHOUSE['namespace']}) if elt and field_id in self.metafields: fields[field_id] = convert_value( self.metafields[field_id]['type'], tounicode(elt[0].text.strip())) if for_index: if self.indexfields[field_id]['whoosh_type'] == 'BOOLEAN': fields[field_id] = bool(fields[field_id]) and \ fields[field_id] not in ('false', 'False', '0') if self.indexfields[field_id]['whoosh_type'] == 'DATETIME': fields[field_id] = fields[field_id].isoformat() return fields
# -------------------------------------------------------------------------
[docs] def infos_only_groups(self, path, request=None): """Return a set of authorized groups or an empty set if all groups are authorized. :param str path: Relative path of the file. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by a script. :rtype: tuple :return: A tuple such as ``(group_set, tree)``. """ # Open infos file infos_file = join(self.root, INFOS_DIR, '{0}.xml'.format(path)) if not exists(infos_file): return set(), None tree, err = load_xml2(infos_file) if err is not None: self.error(err, request) return set(), None # Extract groups only_groups = set() for elt in tree.xpath('ns0:infos/ns0:groups/ns0:group', namespaces={'ns0': RELAXNG_CIOWAREHOUSE['namespace']}): only_groups.add(elt.text) return only_groups, tree
# -------------------------------------------------------------------------
[docs] def directory_file_list(self, directory): """Return the list of files of a directory of the warehouse. :param str directory: Relative path of the directory to browse. :rtype: list """ abs_dir = join(self.root, directory) if not isdir(abs_dir): return [] file_list = [] for path, dirs, filenames in walk(abs_dir): for filename in tuple(dirs): if filename in EXCLUDED_FILES: dirs.remove(filename) continue filename = relpath(join(path, filename), self.root) file_list.append( (dirname(filename) or '.', basename(filename))) for filename in filenames: if filename in EXCLUDED_FILES: continue filename = relpath(join(path, filename), self.root) file_list.append( (dirname(filename) or '.', basename(filename))) return file_list
# -------------------------------------------------------------------------
[docs] @classmethod def error(cls, message, request=None): """Log an error message. :param str message: Error message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.error(translate(message)) else: log_error(request, translate(message, lang='en'))
# -------------------------------------------------------------------------
[docs] @classmethod def warning(cls, message, request=None): """Log an warning message. :param str message: Warning message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.warning(translate(message)) else: log_warning(request, translate(message, lang='en'))
# ------------------------------------------------------------------------- def _directory2abs_path(self, registry, directory): """Return an absolute path to the directory if exists. :param dict registry: Application registry. :param str directory: Relative path to a local directory or reference to a location or another directory. :rtype: str """ if not directory: return None abs_path = location_path2abs_path( registry['modules']['ciowarehouse'].locations, directory) if abs_path is not None: return abs_path abs_path = normpath(join(self.root, directory)) if abs_path[:len(self.root)] != self.root or not isdir(abs_path): return None return abs_path