# pylint: disable = too-many-lines
"""Warehouse class."""
from logging import getLogger
from os import stat, sep, makedirs, rmdir, remove, scandir, walk
from os.path import join, exists, abspath, dirname, basename, getmtime
from os.path import isdir, relpath, splitext, normpath, commonpath
from shutil import rmtree
from json import loads
from time import time, sleep
from collections import OrderedDict
from threading import Thread
from configparser import ConfigParser
from pytomlpp import loads as toml_loads
from whoosh.index import exists_in, create_in, open_dir, LockError
from whoosh.fields import ID, STORED, KEYWORD, TEXT, NUMERIC, DATETIME, BOOLEAN
from whoosh.fields import Schema, UnknownFieldError
from whoosh.qparser import QueryParser, MultifieldParser
from whoosh.analysis import CharsetFilter, StemmingAnalyzer
from whoosh.query import Term, Prefix, And
from whoosh.searching import NoTermsException, TimeLimit
from whoosh.support.charset import accent_map
from transaction import manager
from chrysalio.lib.i18n import translate_field
from chrysalio.lib.utils import tounicode, tostr, decrypt
from chrysalio.lib.utils import mimetype_get, convert_value
from chrysalio.lib.log import log_error, log_warning, log_info
from chrysalio.lib.xml import load_xml2
from chrysalio.lib.attachment import attachment_url
from chrysalio.helpers.builder import Builder
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.models import get_tm_dbsession
from cioservice.lib.utils import location_path2abs_path
from cioservice.models.dbjob import DBJob
from ..relaxng import RELAXNG_CIOWAREHOUSE
from ..models.dbindexfield import INDEXFIELD_BUILTIN
from ..models.dbsharing import DBSharingFile
from ..lib.handler import Handler
from .i18n import _, translate
from .utils import INDEX_DIR, THUMBNAILS_DIR, INFOS_DIR, EXCLUDED_FILES
from .utils import LOCKS_DIR, REFRESHED_FILE, TOREFRESH_FILE, REFRESHING_FILE
from .utils import CIOWAREHOUSE_NS, CACHE_REGION_USER
from .utils import HERE, make_file_id, normalize_filename, cache_user_seeds
from .vcs_none import VcsNone
from .vcs_git import VcsGit
LOG = getLogger(__name__)
WHOOSH_RETRIES = 6
# =============================================================================
[docs]
class Warehouse():
"""Class to manage a warehouse.
:param dict registry:
Application registry.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
SQLAlchemy object representing the warehouse
:param dict locations:
Dictionary of locations.
"""
# pylint: disable = too-many-instance-attributes, too-many-public-methods
# -------------------------------------------------------------------------
def __init__(self, registry, dbwarehouse, locations):
"""Constructor method."""
self.uid = dbwarehouse.warehouse_id
self.created = time()
self.root = abspath(join(locations[dbwarehouse.location], self.uid))
self._i18n_label = loads(dbwarehouse.i18n_label)
self._i18n_description = dbwarehouse.i18n_description
# Vcs
if dbwarehouse.vcs:
password = decrypt(dbwarehouse.vcs_password, 'warehouse')
password = '-' if dbwarehouse.vcs_password and not password \
else password
self.vcs = VcsGit(
self.root, dbwarehouse.vcs_url, dbwarehouse.vcs_user, password,
dbwarehouse.lock_ttl)
else:
self.vcs = VcsNone(self.root, dbwarehouse.vcs_url)
self._normalize = dbwarehouse.normalize
self.access = dbwarehouse.access
# Thumbnails
self.thumbnail_sizes = None
if dbwarehouse.thumbnail_large and \
dbwarehouse.thumbnail_large != '0x0':
self.thumbnail_sizes = (
tuple(int(k) for k in dbwarehouse.thumbnail_large.split('x')),
tuple(int(k) for k in dbwarehouse.thumbnail_small.split('x')))
# Max size of a download
self.download_max_size = dbwarehouse.download_max_size
# Rendering and seeds directories
self.rendering_dir = self._directory2abs_path(
registry, dbwarehouse.rendering_dir)
self.seeds_dir = self._directory2abs_path(
registry, dbwarehouse.seeds_dir)
# Metadata fields
self.metafields = OrderedDict()
for dbitem in dbwarehouse.metafields:
if dbitem.metafield_id in registry['metafields']:
self.metafields[dbitem.metafield_id] = \
registry['metafields'][dbitem.metafield_id]
if not self.metafields and 'metafields' in registry:
self.metafields = registry['metafields']
# Index fields
self.indexfields = registry.get('indexfields', {})
# List fields
items = [
k
for k in sorted(dbwarehouse.indexfields, key=lambda k: k.in_list)
if k.in_list
]
item_ids = [k.indexfield_id for k in items] if items else \
[k for k in sorted(self.indexfields,
key=lambda k: self.indexfields[k]['in_list'])
if self.indexfields[k]['in_list']]
self.listfields = [
{
'id': k,
'label': self.indexfields[k]['label'],
'type': self.indexfields[k]['field_type'],
'class': self.indexfields[k]['in_class']
} for k in item_ids
]
# Card fields
items = [
k
for k in sorted(dbwarehouse.indexfields, key=lambda k: k.in_list)
if k.in_cards
]
item_ids = [k.indexfield_id for k in items] if items else \
[k for k in sorted(self.indexfields,
key=lambda k: self.indexfields[k]['in_cards'])
if self.indexfields[k]['in_cards']]
self.cardfields = [
{
'id': k,
'label': self.indexfields[k]['label'],
'type': self.indexfields[k]['field_type'],
'class': self.indexfields[k]['in_class']
} for k in item_ids
]
# Handlers
self._handlers = registry.get('handlers', ())
if self._handlers and dbwarehouse.handlers:
items = [dbitem.handler_id for dbitem in dbwarehouse.handlers]
self._handlers = tuple(
k for k in self._handlers if k.uid not in items)
# Seeds
self._seeds = {}
for handler in self._handlers:
handler.install()
for seed_id in handler.seeds:
self._seeds[seed_id] = handler.seed(seed_id)
# Jobs
self._job_ids = [k.job_id for k in dbwarehouse.jobs]
self._jobs = {}
# Thread
self._thread = None
self.lock_ttl = dbwarehouse.lock_ttl
# Refresh
self.refresh_period = dbwarehouse.refresh_period
# Inkscape
self.inkscape92 = registry.settings.get('inkscape92') == 'true'
# -------------------------------------------------------------------------
[docs]
def label(self, request):
"""Return a translated label.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: str
"""
return translate_field(request, self._i18n_label, self.uid)
# -------------------------------------------------------------------------
[docs]
def description(self, request):
"""Return a translated description.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: str
"""
if not self._i18n_description:
return ''
return translate_field(request, self._i18n_description)
# -------------------------------------------------------------------------
[docs]
def directory_path(self, request, list_path):
"""Return an absolute path of the directory pointed by the path
contained in the list ``list_path``. Verify if the file is in the
warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param list list_path:
Relative path in a list.
:rtype: :class:`str` or ``None``
"""
fixed = False
directory = abspath(join(self.root, *list_path))
if not isdir(directory):
fixed = True
directory = dirname(directory)
if not isdir(directory):
fixed = True
directory = dirname(directory)
if fixed:
self.full_refresh(request, in_thread=True, force=True)
if directory.startswith(self.root) and exists(directory):
return directory
return None
# -------------------------------------------------------------------------
[docs]
def file_trail(self, request, abs_path):
"""Return a relative path in the warehouse and a its HTML
representation.
:type request: pyramid.request.Request
:param request:
Current request.
:param str abs_path:
Absolute path to the current directory.
:rtype: tuple
:return:
A tuple such as ``(html_path, rel_path)``.
"""
if not abs_path or abs_path == self.root:
return Builder().span(self.uid), '.'
html_path = Builder().a(
self.uid,
href=request.route_path(
'browse_directory_root', warehouse_id=self.uid),
id=make_file_id(self.uid),
class_='cioTrailChunk')
rel_path = relpath(abs_path, self.root)
list_path = rel_path.split(sep)
for index in range(len(list_path) - 1):
path = '/'.join(list_path[0:index + 1])
html_path += ' / ' + Builder().a(
list_path[index],
href=request.route_path(
'browse_directory',
warehouse_id=self.uid,
path=tostr(path)),
id=make_file_id(join(self.uid, path)),
class_='cioTrailChunk')
html_path += ' / ' + Builder().span(list_path[-1])
return html_path, rel_path
# -------------------------------------------------------------------------
[docs]
def file_get(self, request, file_id):
"""Return the dictionary of a file corresponding to the file ID.
:type request: pyramid.request.Request
:param request:
Current request.
:param str file_id:
ID of the file to search.
:rtype: class:`dict` or ``None``
"""
dirs, files = self.index_search(
request, ('file_id', ), 'file_id:"{0}"'.format(file_id), limit=1)
files += dirs
if not files and file_id == make_file_id(self.uid):
files = [
{
'warehouse_id': self.uid,
'directory': '.',
'file_type': 'root',
'file_name': '',
'file_id': file_id,
'file_size': 0,
'file_date': getmtime(self.root),
'score': 1.0,
'shared': False
}
]
return files[0] if files else None
# -------------------------------------------------------------------------
[docs]
def file_normalize(self, filename, is_dir=False):
"""Return a normalized file name or ``None`` if the file is in the
excluded file list.
:param str fielname:
Name to normalize.
:param bool is_dir: (default=False)
``True`` if the file is a directory.
:rtype: str
"""
if '\\' in filename:
filename = filename.split('\\')[-1]
filename = basename(filename)
if filename in EXCLUDED_FILES:
return None
if self._normalize:
filename = normalize_filename(filename, self._normalize, is_dir)
return filename
# -------------------------------------------------------------------------
[docs]
def get_handler(self, abs_path):
"""Retrieve the best file handler for file ``abs_path``.
:param str abs_path:
Absolute path to the file.
:rtype: tuple
:return:
A tuple such as ``(handler, content)`` where ``handler`` is a
:class:`.lib.handler.Handler` or ``None``.
"""
extension = splitext(abs_path)[1]
content = None
for handler in self._handlers:
found, content = handler.match(extension, abs_path, content)
if found:
return handler, content
return None, content
# -------------------------------------------------------------------------
@cache_user_seeds(CIOWAREHOUSE_NS, CACHE_REGION_USER)
def seeds(self, request):
"""Return a dictionary of available seeds.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: dictionary
"""
toml = None
if self.seeds_dir:
toml = join(self.seeds_dir, f'{self.uid.lower()}.toml')
if not exists(toml):
toml = join(self.seeds_dir, f'{self.uid}.toml')
if exists(toml):
with open(toml, 'r', encoding='utf8') as hdl:
toml = toml_loads(hdl.read())
else:
toml = None
# From a custom directory
available = {}
if toml is not None:
for seed in toml.get('seeds', ''):
if 'name' not in seed:
self.error(
_('Name is missing for a dynamic seed'), request)
continue
if 'file' not in seed:
self.error(
_('Seed "${n}" is incorrect', {'n': seed['name']}),
request)
continue
available[seed['name']] = (
seed.get('icon'), seed['label'].get(
request.locale_name, seed['label'].get(
'en', seed['name'])) if 'label' in seed else
seed['name'], seed['file'].format(here=self.seeds_dir))
# Hard coded
theme = theme_static_prefix(request)
for name, seed in self._seeds.items():
if seed not in available:
available[name] = (
f'{theme}{seed[0]}' if seed[0] else None, seed[1], seed[2])
return available
# -------------------------------------------------------------------------
[docs]
def jobs(self, request):
"""Return a dictionary of available jobs for this warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: dict
"""
config_file = request.registry.settings['__file__']
config = ConfigParser({'here': dirname(config_file)})
config.read(tounicode(config_file), encoding='utf8')
for job_id in self._job_ids:
self.job(request, job_id, config)
return self._jobs
# -------------------------------------------------------------------------
[docs]
def job(self, request, job_id, config=None):
"""Return a dictionary representing the job.
:type request: pyramid.request.Request
:param request:
Current request.
:param str job_id:
Job ID.
:type config: configparser.ConfigParser
:param config: (optional)
Configuration parser based on application configuration file.
:rtype: dict
:return:
A dictionary with keys ``'job_id'``, ``'i18n_label'``,
``'i18n_description'``, ``'icon'``, ``'threaded'``, ``'ttl'``,
``'priority'``, ``'settings'``, ``'service_id'``.
If a problem succeeds, it returns ``None``.
"""
if job_id in self._jobs:
return self._jobs[job_id]
dbjob = request.dbsession.query(DBJob).filter_by(job_id=job_id).first()
if dbjob is None:
return None
service = request.registry['services'].get(dbjob.service) \
if 'services' in request.registry else None
if service is None:
log_error(
request,
request.localizer.translate(
_(
'Service "${s}" is not available.',
{'s': dbjob.service})))
return None
if config is None:
config_file = request.registry.settings['__file__']
config = ConfigParser({'here': dirname(config_file)})
config.read(tounicode(config_file), encoding='utf8')
section = 'Job:{0}'.format(job_id)
settings = dict(config.items(section)) if config.has_section(section) \
else config.defaults()
self._jobs[job_id] = {
'job_id':
job_id,
'i18n_label':
loads(dbjob.i18n_label),
'i18n_description':
dbjob.i18n_description,
'icon':
attachment_url(
request, dbjob.attachments_dir, dbjob.attachments_key,
dbjob.icon),
'service_id':
service.uid,
'context':
dbjob.context,
'access':
dbjob.access,
'threaded':
dbjob.threaded,
'ttl':
dbjob.ttl,
'priority':
dbjob.priority,
'users':
tuple(k.user_id for k in dbjob.users),
'groups': {k.group_id
for k in dbjob.groups},
'settings':
settings
}
return self._jobs[job_id]
# -------------------------------------------------------------------------
[docs]
def full_refresh(
self,
request,
reindex=False,
recreate_thumbnails=False,
in_thread=False,
force=True):
"""Pull, index, create thumbnails and commit changes on the whole
warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param bool reindex: (default=False)
Remove old index.
:param bool recreate_thumbnails: (default=False)
Remove old thumbnails.
:param bool in_thread: (default=False)
Launch the refresh in a thread.
:param bool force: (default=True)
If ``True``, force refreshing even if the deadline is not reached.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# Nothing to do
refreshed_file = join(self.root, REFRESHED_FILE)
if not force and exists(refreshed_file) and \
getmtime(refreshed_file) + self.refresh_period > time():
return None
# Work in progress?
refreshing_file = join(self.root, REFRESHING_FILE)
if (self._thread is not None and self._thread.is_alive()) or (
exists(refreshing_file)
and getmtime(refreshing_file) + self.refresh_period > time()):
torefresh_file = join(self.root, TOREFRESH_FILE)
makedirs(dirname(torefresh_file), exist_ok=True)
with open(torefresh_file, 'w', encoding='utf8'):
pass
return _('${i}: refresh already in progress!', {'i': self.uid})
if in_thread:
# In a thread
self._thread = Thread(
target=self._full_refresh,
name='{0}:full_refresh'.format(self.uid),
args=(request, reindex, recreate_thumbnails))
self._thread.start()
else:
# Directly
self._full_refresh(request, reindex, recreate_thumbnails, 'index')
self._thread = Thread(
target=self._full_refresh,
name='{0}:full_refresh_thumbnails'.format(self.uid),
args=(request, reindex, recreate_thumbnails, 'thumbnails'))
self._thread.start()
return None
# -------------------------------------------------------------------------
def _full_refresh(
self,
request,
reindex,
recreate_thumbnails,
only=None,
again=False):
"""Pull, index, create thumbnails and commit changes, possibly in
a thread, on the whole warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param bool reindex:
Remove old index.
:param bool recreate_thumbnails:
Remove old thumbnails.
:param bool only: (optional)
If ``'index'``, refresh index only ; if ``'thumbnails'``, refresh
thumbnails only ; if ``None`` refres both.
:param bool again: (default=False)
If ``True``, this refresh follows another one.
"""
# Lock
refreshing_file = join(self.root, REFRESHING_FILE)
self.refreshed()
with open(refreshing_file, 'w', encoding='utf8'):
pass
if only is None or only == 'index':
# Add forgotten files
self.vcs.pull()
if self.vcs.is_dirty() and 'user' in request.session:
self.vcs.add()
self.vcs.commit(
request.localizer.translate(_('Maintenance')),
request.session['user']['name'],
request.session['user']['email'])
# Index
if reindex:
self.index_erase()
with manager:
dbsession = get_tm_dbsession(
request.registry['dbsession_factory'], manager)
self.index_update_all(dbsession, request)
request.registry['modules']['ciowarehouse'].cache_clear(
request, self.uid)
if only is None or only == 'thumbnails':
# Create thumbnails
if recreate_thumbnails:
self.thumbnails_erase()
self.thumbnails_update_all(request)
# Do it again
torefresh_file = join(self.root, TOREFRESH_FILE)
if exists(torefresh_file):
remove(torefresh_file)
log_info(request, 'warehouse_full_refresh', self.uid, 'again')
self._full_refresh(request, False, False, again=True)
# Unlock
self.refreshed()
if exists(refreshing_file):
remove(refreshing_file)
if only is None or only == 'index':
request.registry['modules']['ciowarehouse'].cache_clear(
request, self.uid)
if not again:
log_info(
request, 'warehouse_full_refresh', self.uid,
'reindex={0}'.format(reindex),
'recreate_thumbnails={0}'.format(recreate_thumbnails),
'only={0}'.format(only) if only is not None else '')
# -------------------------------------------------------------------------
[docs]
def refresh(
self,
request,
files,
recursive=False,
in_thread=False,
dbsession=None,
keep_cache=False):
"""Index and create thumbnails for a list of files.
:type request: pyramid.request.Request
:param request:
Current request or ``None`` if called by a script.
:param list files:
List of paths of files relative to the warehouse root.
:param bool recursive: (default=False)
Refresh recursively.
:param bool in_thread: (default=False)
Launch the refresh in a thread.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:param bool keep_cache: (default=False)
if ``True``, do not clear the cache.
"""
# pylint: disable = too-many-arguments
if not files:
return
if in_thread:
# In a thread
thread = Thread(
target=self._refresh,
name='{0}:refresh'.format(self.uid),
args=(files, recursive, dbsession, request, None, keep_cache))
thread.start()
else:
# Directly
self._refresh(
files, recursive, dbsession, request, 'index', keep_cache)
thread = Thread(
target=self._refresh,
name='{0}:refresh_thumbnails'.format(self.uid),
args=(
files, recursive, dbsession, request, 'thumbnails', True))
thread.start()
# -------------------------------------------------------------------------
def _refresh(self, files, recursive, dbsession, request, only, keep_cache):
"""Index and create thumbnails, possibly in a thread.
:param list files:
List of paths of files relative to the warehouse root.
:param bool recursive:
Refresh recursively.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type request: pyramid.request.Request
:param request:
Current request or ``None`` if called by a script.
:param bool only:
If ``'index'``, refresh index only ; if ``'thumbnails'``, refresh
thumbnails only ; if ``None`` refres both.
:param bool keep_cache:
if ``True``, do not clear the cache.
"""
# pylint: disable = too-many-arguments
if dbsession is None and request is not None:
with manager:
dbsession = get_tm_dbsession(
request.registry['dbsession_factory'], manager)
self._refresh_loop(files, recursive, dbsession, request, only)
elif dbsession is not None:
self._refresh_loop(files, recursive, dbsession, request, only)
if request is not None and not keep_cache:
request.registry['modules']['ciowarehouse'].cache_clear(
request, self.uid)
if request is not None:
log_info(
request, 'warehouse_refresh', self.uid,
'recursive={0}'.format(recursive),
'only={0}'.format(only) if only is not None else '',
'files={0}'.format(', '.join(files)))
# -------------------------------------------------------------------------
def _refresh_loop(
self, files, recursive, dbsession, request=None, only=None):
"""Refresh loop.
:param list files:
List of paths of files relative to the warehouse root.
:param bool recursive:
Refresh recursively.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
:param bool only: (optional)
If ``'index'``, refresh index only ; if ``'thumbnails'``, refresh
thumbnails only ; if ``None`` refres both.
"""
for name in tuple(files):
name = normpath(name)
if name.startswith('..'):
name = '.'
file_list = [(dirname(name) or '.', basename(name))] \
if name != '.' else []
if recursive:
file_list += self.directory_file_list(name)
if only is None or only == 'index':
self.index_update(dbsession, file_list, request)
if only is None or only == 'thumbnails':
self.thumbnails_update(file_list, request)
# -------------------------------------------------------------------------
[docs]
def refreshed(self):
"""Record the time of last refresh."""
refreshed_file = join(self.root, REFRESHED_FILE)
makedirs(dirname(refreshed_file), exist_ok=True)
with open(refreshed_file, 'w', encoding='utf8'):
pass
# -------------------------------------------------------------------------
[docs]
def unrefreshed(self):
"""Remove the record of last refresh."""
refreshed_file = join(self.root, REFRESHED_FILE)
if exists(refreshed_file):
remove(refreshed_file)
# -------------------------------------------------------------------------
[docs]
def to_refresh(self, inputs, paths):
"""Compute an optimized list of relative paths to refresh.
:param list inputs:
List of absolute paths to the input files.
:param str paths:
List of relative paths to the output files.
:rtype: set
"""
if not inputs:
return {k.partition(sep)[0] for k in paths}
root = relpath(
commonpath(inputs) if len(inputs) > 1 else dirname(inputs[0]),
self.root)
path_set = set()
for path in paths:
chunks = relpath(path, root).split(sep)
depth = [k for k in chunks if k == '..']
index = len(depth)
index = index if index < len(chunks) else len(chunks) - 1
path_set.add(normpath(join(root, sep.join(depth), chunks[index])))
return path_set
# -------------------------------------------------------------------------
[docs]
def lock(self, abs_file=None, relock=False):
"""Lock a file or the whole warehouse.
:param str abs_file: (optional)
Absolute path to the source file.
:param bool relock:
If ``True`` update the date/time of the lock.
:rtype: bool
"""
lock_file = \
join(self.root, LOCKS_DIR, relpath(abs_file, self.root)) \
if abs_file else join(self.root, LOCKS_DIR)
if abs_file is None or isdir(abs_file):
lock_file = join(lock_file, HERE)
if not relock and exists(lock_file) and \
getmtime(lock_file) + self.lock_ttl > time():
return False
try:
makedirs(dirname(lock_file), exist_ok=True)
with open(lock_file, 'w', encoding='utf8'):
pass
except (OSError, FileNotFoundError):
return True
return True
# -------------------------------------------------------------------------
[docs]
def unlock(self, abs_file=None):
"""Unlock a file or the whole warehouse.
:param str abs_file: (optional)
Relative path to a file or a directory.
"""
lock_file = \
join(self.root, LOCKS_DIR, relpath(abs_file, self.root)) \
if abs_file else join(self.root, LOCKS_DIR)
if abs_file is None or isdir(abs_file):
lock_file = join(lock_file, HERE)
if exists(lock_file):
try:
remove(lock_file)
except OSError: # pragma: nocover
pass
lock_file = dirname(lock_file)
if not exists(lock_file):
return
while lock_file != self.root:
try:
if tuple(scandir(lock_file)):
break
rmdir(lock_file)
except OSError: # pragma: nocover
break
lock_file = dirname(lock_file)
# -------------------------------------------------------------------------
[docs]
def unlock_all(self):
"""Remove lock directory."""
lock_dir = join(self.root, LOCKS_DIR)
if exists(lock_dir):
rmtree(lock_dir)
# -------------------------------------------------------------------------
[docs]
def thumbnails_erase(self):
"""Remove the thumbnail directory if exists."""
thumbnails_dir = join(self.root, THUMBNAILS_DIR)
if exists(thumbnails_dir):
rmtree(thumbnails_dir)
# -------------------------------------------------------------------------
[docs]
def thumbnails_update_all(self, request=None, registry=None):
"""Update or create small and large thumbnails for the entire
warehouse.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
:param registry: (optional)
class:`pyramid.registry.Registry` or
class:`chrysalio.scripts.ScriptRegistry` if called by populate
script.
"""
if not self.thumbnail_sizes:
return
thumbnails_dir = join(self.root, THUMBNAILS_DIR)
done = set()
# Clean up obsolete thumbnails
if exists(thumbnails_dir):
self._thumbnails_cleanup(thumbnails_dir, done)
# Create new thumbnails
if registry is None and request is not None:
registry = request.registry
for path, dirs, files in walk(self.root):
for name in tuple(dirs):
if name in EXCLUDED_FILES:
dirs.remove(name)
continue
abs_file = join(path, name)
if abs_file in done:
continue
handler = self.get_handler(abs_file)[0]
if handler:
thumb_dir = join(
thumbnails_dir, relpath(abs_file, self.root), HERE)
handler.thumbnails(
self, abs_file, thumb_dir, request, registry)
for name in files:
abs_file = join(path, name)
if name in EXCLUDED_FILES or abs_file in done:
continue
handler = self.get_handler(abs_file)[0]
if handler:
thumb_dir = join(
thumbnails_dir, relpath(abs_file, self.root))
handler.thumbnails(
self, abs_file, thumb_dir, request, registry)
# -------------------------------------------------------------------------
def _thumbnails_cleanup(self, thumbnails_dir, done):
"""Clean up obsolete thumbnail.
:param str thumbnails_dir:
Absolute path to cache directory.
:param set done:
Set of up to date file.
"""
for path, dirs, ignored_ in walk(thumbnails_dir):
for name in tuple(dirs):
if name == HERE:
continue
thumb_dir = join(path, name)
abs_file = join(self.root, relpath(thumb_dir, thumbnails_dir))
if not exists(abs_file):
rmtree(thumb_dir)
dirs.remove(name)
continue
if isdir(abs_file):
thumb_dir = join(thumb_dir, HERE)
handler = self.get_handler(abs_file)[0] or Handler()
if handler.thumbnails_obsolete(abs_file, thumb_dir):
if exists(thumb_dir):
rmtree(thumb_dir)
else:
done.add(abs_file)
if not tuple(scandir(path)):
rmdir(path)
# -------------------------------------------------------------------------
[docs]
def thumbnails_update(self, files, request=None, registry=None):
"""Update or create small and large thumbnails for each file if it is
possible.
:param list files:
List of tuples such as ``(directory, file_name)``.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
:param registry: (optional)
class:`pyramid.registry.Registry` or
class:`chrysalio.scripts.ScriptRegistry` if called by populate
script.
"""
if registry is None and request is not None:
registry = request.registry
thumbnails_dir = join(self.root, THUMBNAILS_DIR)
for k in files:
abs_file = join(self.root, k[0], k[1])
if not exists(abs_file):
continue
handler = self.get_handler(abs_file)[0]
if not handler:
continue
thumb_dir = join(
thumbnails_dir, relpath(abs_file, self.root),
HERE if isdir(abs_file) else '')
if handler.thumbnails_obsolete(abs_file, thumb_dir):
handler.thumbnails(
self, abs_file, thumb_dir, request, registry)
# -------------------------------------------------------------------------
[docs]
def index_search(self, request, fieldnames, wquery, limit=None):
"""Search in the warehouse according to a Whoosh query.
:type request: pyramid.request.Request
:param request:
Current request.
:param list fieldnames:
List of fields to search.
:param str wquery:
A query in the Whoosh default query language.
:param int limit: (optional)
Maximum number of results.
:rtype: tuple
:return:
A tuple such as ``(dirs, files)`` where ``dirs`` and ``files`` are
lists of dictionaries. Each dictionary has the keys ``score``,
``warehouse_id`` and possibly ``file_id`` plus the keys
reprensenting the fields of the index.
"""
# Open the index
index = self._index_open_or_create()
if index is None:
self.warning(
_('${i}: indexing in progress...', {'i': self.uid}), request)
return [], []
# Query
dirs, files = [], []
with index.searcher() as searcher:
if len(fieldnames) == 1:
parser = QueryParser(fieldnames[0], index.schema)
else:
parser = MultifieldParser(fieldnames, index.schema)
try:
hits = searcher.search(parser.parse(wquery), limit=limit)
except (NoTermsException, TimeLimit) as error: # pragma: nocover
log_error(request, error)
index.close()
return dirs, files
for hit in hits:
result = {k: hit[k] for k in hit if k != 'time'}
result['score'] = round(hit.score, 2)
result['warehouse_id'] = self.uid
result['only_groups'] = set(result['only_groups'].split(',')) \
if result['only_groups'] else set()
if result['file_type'] == 'directory':
try:
result['file_size'] = len(
tuple(
scandir(
join(
self.root, result['directory'],
result['file_name']))))
except (OSError, UnicodeEncodeError):
pass
dirs.append(result)
else:
files.append(result)
index.close()
return dirs, files
# -------------------------------------------------------------------------
[docs]
def index_erase(self):
"""Remove the index directory if exists."""
index_dir = join(self.root, INDEX_DIR)
if exists(index_dir):
rmtree(index_dir)
# -------------------------------------------------------------------------
[docs]
def index_update_all(self, dbsession, request=None):
"""Update the search index.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
# Open index
index = self._index_open_or_create()
if index is None:
self.warning(
_('${i}: indexing in progress...', {'i': self.uid}), request)
return
writer = self._index_writer(index, request)
if writer is None:
return
# Clean up obsolete index
done = set()
self._index_all_cleanup(writer, done)
# Loop over the files
for root, dirs, files in walk(self.root):
for name in tuple(dirs):
if name in EXCLUDED_FILES:
dirs.remove(name)
continue
abs_file = join(root, name)
path = relpath(abs_file, self.root)
if path not in done:
self._index_add_file(
dbsession, writer, path, abs_file, request)
for name in files:
if name in EXCLUDED_FILES:
continue
abs_file = join(root, name)
path = relpath(abs_file, self.root)
if path not in done:
self._index_add_file(
dbsession, writer, path, abs_file, request)
try:
writer.commit()
except UnicodeDecodeError as error: # pragma: nocover
self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request)
index.optimize()
index.close()
# -------------------------------------------------------------------------
def _index_all_cleanup(self, writer, done):
"""Remove obsolete files from the search index.
:type writer: whoosh.writing.IndexWriter
:param writer:
Writer on current index.
:param set done:
Set of up to date files.
"""
for docnum, fields in writer.reader().iter_docs():
abs_file = join(
self.root, fields['directory'], fields['file_name'])
if not exists(abs_file):
writer.delete_document(docnum)
continue
if self.index_is_obsolete(fields, abs_file):
writer.delete_document(docnum)
else:
done.add(
join(
fields['directory'] if fields['directory'] != '.' else
'', fields['file_name']))
# -------------------------------------------------------------------------
[docs]
def index_update(self, dbsession, files, request=None, force=False):
"""Update the search index.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:param list files:
List of tuples such as ``(directory, file_name)``.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
:param bool force: (default=False)
Force indexation.
"""
# pylint: disable = too-many-branches
# Open index
index = self._index_open_or_create()
if index is None:
self.warning(
_('${i}: indexing in progress...', {'i': self.uid}), request)
return
writer = self._index_writer(index, request)
if writer is None:
return
# Clean up obsolete index
files = list(files)
with index.searcher() as searcher:
for item in tuple(files):
abs_file = join(self.root, item[0], item[1])
file_exists = exists(abs_file)
results = searcher.search( # yapf: disable
And([Term('directory', item[0]),
Term('file_name', item[1])]),
limit=1)
if not results:
if not file_exists:
files.remove(item)
continue
if results[0]['file_type'] == 'directory':
self._index_cleanup_directory(
writer, searcher, item, files)
if not file_exists:
files.remove(item)
writer.delete_document(results[0].docnum)
continue
if force:
writer.delete_document(results[0].docnum)
continue
if self.index_is_obsolete(results[0], abs_file):
writer.delete_document(results[0].docnum)
else:
files.remove(item)
# Add files to index
for item in files:
abs_file = normpath(join(self.root, item[0], item[1]))
if abs_file.startswith(self.root) and abs_file != self.root:
self._index_add_file(
dbsession, writer, join(item[0], item[1]), abs_file,
request)
try:
writer.commit()
except UnicodeDecodeError as error: # pragma: nocover
self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request)
index.close()
# -------------------------------------------------------------------------
def _index_cleanup_directory(self, writer, searcher, item, files):
"""Remove obsolete files of a directory from the search index.
:type writer: whoosh.searching.Searcher
:param writer:
Whoosh object to search inside the index.
:type searcher: whoosh.searching.Searcher
:param searcher:
Whoosh object to search inside the index.
:param tuple item:
An item such as ``(directory, file_name)`` representing the
directory to process.
:param list files:
List of tuples such as ``(directory, file_name)`` representing
files being processed.
"""
# pylint: disable = unsupported-binary-operation
path = normpath(join(item[0], item[1]))
for hit in searcher.search(Term('directory', path)
| Prefix('directory', f'{path}{sep}'),
limit=None):
if (hit['directory'], hit['file_name']) not in files \
and not exists(join(
self.root, hit['directory'], hit['file_name'])):
writer.delete_document(hit.docnum)
# -------------------------------------------------------------------------
def _index_add_file(self, dbsession, writer, path, abs_path, request=None):
"""Add a file to the index.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type writer: whoosh.writing.IndexWriter
:param writer:
Writer on current index.
:param str path:
Relative path of the file.
:param str abs_path:
Absolute path of the file.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
"""
# Basic fields
path = tounicode(path)
abs_path = tounicode(abs_path)
stat_info = stat(abs_path)
file_id = make_file_id(join(self.uid, path))
shared = bool(
dbsession.query(
DBSharingFile.sharing_id).filter_by(file_id=file_id).first())
is_dir = isdir(abs_path)
whoosh_fields = { # yapf: disable
'time': time(),
'directory': dirname(path) or '.',
'file_name': basename(path),
'file_id': tounicode(file_id),
'file_type': 'directory' if is_dir else tounicode(
mimetype_get(abs_path)[1]),
'file_size': 0 if is_dir else stat_info.st_size,
'file_date': int(stat_info.st_mtime),
'shared': shared
}
# Extra fields
handler = self.get_handler(abs_path)[0] or Handler()
handler.infos_complete_fields(
self, path, abs_path, whoosh_fields, request)
# Write information
try:
writer.add_document(**whoosh_fields)
except (ValueError, UnknownFieldError) as error: # pragma: nocover
self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request)
# -------------------------------------------------------------------------
[docs]
def index_is_obsolete(self, fields, abs_file):
"""Check if the Whoosh index is obsolete.
:param fields:
Fields of index record.
:param str abs_file:
Absolute path to the source file.
:rtype: bool
"""
infos_file = join(
self.root, INFOS_DIR, fields['directory'],
'{0}.xml'.format(fields['file_name']))
return getmtime(abs_file) > fields['time'] \
or (exists(infos_file) and getmtime(infos_file) > fields['time'])
# -------------------------------------------------------------------------
def _index_open_or_create(self):
"""Open an index structure (possibly create it).
:rtype: :class:`whoosh.index.FileIndex` or ``None``
"""
# Create
index_dir = join(self.root, INDEX_DIR)
if not exists(index_dir) or \
not exists_in(index_dir, indexname=self.uid):
return self._index_create(index_dir)
# Open
counter = 0
index = None
while index is None and counter < WHOOSH_RETRIES:
if counter:
sleep(1)
try:
index = open_dir(index_dir, indexname=self.uid)
except (AttributeError, LockError): # pragma: nocover
index = None
counter += 1
return index
# -------------------------------------------------------------------------
def _index_create(self, index_dir):
"""Create an index structure.
:param str index_dir:
Absolute path to the index directory.
:rtype: :class:`whoosh.index.FileIndex` or ``None``
"""
# Create schema
schema = Schema(
time=STORED,
directory=ID(stored=True),
file_name=ID(stored=True),
file_id=ID(stored=True),
file_type=ID(stored=True),
file_size=NUMERIC(stored=True, bits=64),
file_date=NUMERIC(stored=True),
only_groups=KEYWORD(stored=True, scorable=True, commas=True),
shared=BOOLEAN(stored=True))
done = set(INDEXFIELD_BUILTIN)
for indexfield_id in self.indexfields:
if indexfield_id in done:
continue
indexfield = self.indexfields[indexfield_id]
if indexfield['whoosh_type'] == 'ID':
schema.add(indexfield_id, ID(stored=indexfield['stored']))
elif indexfield['whoosh_type'] == 'NUMERIC':
schema.add(indexfield_id, NUMERIC(stored=indexfield['stored']))
elif indexfield['whoosh_type'] == 'BOOLEAN':
schema.add(indexfield_id, BOOLEAN(stored=indexfield['stored']))
elif indexfield['whoosh_type'] == 'DATETIME':
schema.add(
indexfield_id, DATETIME(stored=indexfield['stored']))
elif indexfield['whoosh_type'] == 'KEYWORD':
schema.add(
indexfield_id,
KEYWORD(
stored=indexfield['stored'],
scorable=True,
commas=True))
elif indexfield['whoosh_type'] == 'STEMS':
schema.add(
indexfield_id,
TEXT(
analyzer=StemmingAnalyzer()
| CharsetFilter(accent_map),
stored=indexfield['stored']))
else:
schema.add(indexfield_id, TEXT(stored=indexfield['stored']))
done.add(indexfield_id)
# Create index
try:
makedirs(index_dir, exist_ok=True)
return create_in(index_dir, schema, indexname=self.uid)
except (IOError, AttributeError, LockError): # pragma: nocover
return None
# -------------------------------------------------------------------------
def _index_writer(self, index, request):
"""Retrieve an index writer.
:param index:
Absolute path to the index directory.
:rtype: :class:`whoosh.index.IndexWriter` or ``None``
"""
try:
writer = index.writer()
except (OSError, AttributeError) as error:
self.error(_('${i}: ${e}', {'i': self.uid, 'e': error}), request)
return None
except LockError:
self.error(_('${i}: Lock error}', {'i': self.uid}), request)
return None
return writer
# -------------------------------------------------------------------------
[docs]
def infos_read(self, path, meta_ids, for_index, request=None):
"""Return a dictionary with authorized groups and metadata found in the
infos XML file.
:param str path:
Relative path of the file.
:param set meta_ids:
Set of IDS of metadata fields to retrieve.
:param bool for_index:
If ``True``, convert boolean fields.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
:rtype: dict
"""
# Authorized groups
fields = {}
fields['only_groups'], tree = self.infos_only_groups(path, request)
if for_index:
fields['only_groups'] = ','.join(fields['only_groups'])
if tree is None:
return fields
# Metadata
for field_id in meta_ids:
elt = tree.xpath(
'ns0:infos/ns0:metafields/ns0:metafield[@id="{0}"]'.format(
field_id),
namespaces={'ns0': RELAXNG_CIOWAREHOUSE['namespace']})
if elt and field_id in self.metafields:
fields[field_id] = convert_value(
self.metafields[field_id]['type'],
tounicode(elt[0].text.strip()))
if for_index:
if self.indexfields[field_id]['whoosh_type'] == 'BOOLEAN':
fields[field_id] = bool(fields[field_id]) and \
fields[field_id] not in ('false', 'False', '0')
if self.indexfields[field_id]['whoosh_type'] == 'DATETIME':
fields[field_id] = fields[field_id].isoformat()
return fields
# -------------------------------------------------------------------------
[docs]
def infos_only_groups(self, path, request=None):
"""Return a set of authorized groups or an empty set if all groups are
authorized.
:param str path:
Relative path of the file.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by a script.
:rtype: tuple
:return:
A tuple such as ``(group_set, tree)``.
"""
# Open infos file
infos_file = join(self.root, INFOS_DIR, '{0}.xml'.format(path))
if not exists(infos_file):
return set(), None
tree, err = load_xml2(infos_file)
if err is not None:
self.error(err, request)
return set(), None
# Extract groups
only_groups = set()
for elt in tree.xpath('ns0:infos/ns0:groups/ns0:group',
namespaces={'ns0':
RELAXNG_CIOWAREHOUSE['namespace']}):
only_groups.add(elt.text)
return only_groups, tree
# -------------------------------------------------------------------------
[docs]
def directory_file_list(self, directory):
"""Return the list of files of a directory of the warehouse.
:param str directory:
Relative path of the directory to browse.
:rtype: list
"""
abs_dir = join(self.root, directory)
if not isdir(abs_dir):
return []
file_list = []
for path, dirs, filenames in walk(abs_dir):
for filename in tuple(dirs):
if filename in EXCLUDED_FILES:
dirs.remove(filename)
continue
filename = relpath(join(path, filename), self.root)
file_list.append(
(dirname(filename) or '.', basename(filename)))
for filename in filenames:
if filename in EXCLUDED_FILES:
continue
filename = relpath(join(path, filename), self.root)
file_list.append(
(dirname(filename) or '.', basename(filename)))
return file_list
# -------------------------------------------------------------------------
[docs]
@classmethod
def error(cls, message, request=None):
"""Log an error message.
:param str message:
Error message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.error(translate(message))
else:
log_error(request, translate(message, lang='en'))
# -------------------------------------------------------------------------
[docs]
@classmethod
def warning(cls, message, request=None):
"""Log an warning message.
:param str message:
Warning message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.warning(translate(message))
else:
log_warning(request, translate(message, lang='en'))
# -------------------------------------------------------------------------
def _directory2abs_path(self, registry, directory):
"""Return an absolute path to the directory if exists.
:param dict registry:
Application registry.
:param str directory:
Relative path to a local directory or reference to a location or
another directory.
:rtype: str
"""
if not directory:
return None
abs_path = location_path2abs_path(
registry['modules']['ciowarehouse'].locations, directory)
if abs_path is not None:
return abs_path
abs_path = normpath(join(self.root, directory))
if abs_path[:len(self.root)] != self.root or not isdir(abs_path):
return None
return abs_path