Source code for ciowarehouse.inputs
"""A collection of input stream and input rules."""
from os import scandir
from os.path import join, exists, getmtime, normpath, relpath, dirname
from logging import getLogger
from datetime import datetime, timedelta
from time import time
from chrysalio.lib.utils import copy_content
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES
LOG = getLogger(__name__)
ARCHIVE_DATETIME_FORMAT = '%y%m%d-%H%M%S'
ARCHIVE_CLEAN_FLAG = 'clean'
ARCHIVE_CLEAN_PERIOD = 3600*24
# =============================================================================
[docs]class InputStream(object):
"""Base class to manage input stream.
:param get_warehouse:
Function to retrieve a warehouse.
"""
# -------------------------------------------------------------------------
def __init__(self, get_warehouse):
"""Constructor method."""
self._get_warehouse = get_warehouse
# -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream):
"""Retrieve documents.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:rtype: list
"""
# pylint: disable = unused-argument
return ()
# -------------------------------------------------------------------------
[docs] def archive(self, build, dbstream):
"""Return the warehouse object for archive and the absolute path to the
archive directory.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:rtype: tuple
:return:
A tuple such as ``(archive_warehouse, archive_dir)``.
"""
warehouse = self._get_warehouse(build, dbstream.archive_id)
if warehouse is None:
build.error(_(
'Unknown warehouse "${w}"', {'w': dbstream.archive_id}))
return None, None
archive_dir = normpath(join(
warehouse.root, dbstream.archive_path or '.',
datetime.now().strftime(ARCHIVE_DATETIME_FORMAT)))
return warehouse, archive_dir
# -------------------------------------------------------------------------
[docs] @classmethod
def archive_files(cls, build, dbstream, archive, input_dir, archive_dir):
"""Save a copy of ``input_dir`` into ``archive_dir``, add
``archive_dir`` to the repository, commit the action and refresh the
warehouse.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:param str input_dir:
Absolute path to the directory to copy.
:type archive: ciowarehouse.lib.warehouse.Warehouse
:param archive:
Warehouse object for archive.
:param str archive_dir:
Absolute path to the directory for these files in the archives.
"""
if input_dir:
copy_content(input_dir, archive_dir, exclude=EXCLUDED_FILES)
archive.vcs.add(archive_dir)
error = archive.vcs.commit(
translate(_('Automatic archiving')), dbstream.stream_id)
if error: # pragma: nocover
build.error(error)
path = relpath(archive_dir, archive.root)
build.info(translate(_('Archive "${a}" created.', {'a': path})))
archive.refresh(
None, (path,), recursive=True, dbsession=build.dbsession)
archive.unrefreshed()
# -------------------------------------------------------------------------
[docs] @classmethod
def archive_clean(cls, build, dbstream, archive):
"""Loop over archives and remove old ones.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:type archive: ciowarehouse.lib.warehouse.Warehouse
:param archive:
Warehouse object for archive.
"""
archive_dir = join(archive.root, dbstream.archive_path or '.')
clean_flag = join(archive_dir, ARCHIVE_CLEAN_FLAG)
period = min(ARCHIVE_CLEAN_PERIOD, 3600*dbstream.archive_ttl) \
if dbstream.archive_ttl else ARCHIVE_CLEAN_PERIOD
if not dbstream.archive_ttl or not exists(archive_dir) or (
exists(clean_flag) and getmtime(clean_flag) > time() - period):
return
deadline = (datetime.now() - timedelta(hours=dbstream.archive_ttl))\
.strftime(ARCHIVE_DATETIME_FORMAT)
found = False
for entry in scandir(archive_dir):
if entry.name not in EXCLUDED_FILES and entry.name < deadline:
error = archive.vcs.remove(archive_dir, entry.name)
if error: # pragma: nocover
build.error(error)
else:
build.info(translate(
_('Archive "${a}" cleaned.', {'a': entry.name})))
found = True
with open(clean_flag, 'w', encoding='utf8'):
pass
archive.vcs.add(clean_flag)
error = archive.vcs.commit(translate(
_('Automatic archive cleaning up')), dbstream.stream_id)
if error: # pragma: nocover
build.error(error)
if found:
archive.refresh(
None, (relpath(archive_dir, archive.root),), recursive=True,
dbsession=build.dbsession)
archive.unrefreshed()
# -------------------------------------------------------------------------
[docs] @classmethod
def empty_warehouse_directory(cls, build, dbstream, warehouse, directory):
"""Empty a directory.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object for the warehouse containing the directory.
:param str directory:
Absolute path to the directory.
"""
for entry in scandir(directory):
if entry.name not in EXCLUDED_FILES:
warehouse.vcs.remove(dirname(entry.path), entry.name)
error = warehouse.vcs.commit(translate(
_('Automatic cleaning up')), dbstream.stream_id)
if error: # pragma: nocover
build.error(error)
warehouse.unrefreshed()