Source code for ciowarehouse.inputs.stream_file
"""Class for file input stream."""
from os import walk
from os.path import join, relpath, dirname
from datetime import datetime
from ..lib.utils import EXCLUDED_FILES
from . import InputStream
# =============================================================================
[docs]class InputStreamFile(InputStream):
"""Class to manage file input stream.
See: :class:`.inputs.InputStream`
"""
# -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream):
"""Retrieve documents.
See: :meth:`.inputs.InputStream.documents`
"""
warehouse = self._get_warehouse(build, dbstream.host)
if warehouse is None:
return ()
archive, archive_dir = self.archive(build, dbstream)
if archive is None:
return ()
self.archive_clean(build, dbstream, archive)
# Loop over documents
input_dir = join(warehouse.root, dbstream.path or '.')
documents = []
for path, dirs, files in walk(input_dir):
for name in dirs:
if name in EXCLUDED_FILES:
dirs.remove(name)
for name in files:
if name in EXCLUDED_FILES:
continue
source = join(path, name)
documents.append({
'stream-id': dbstream.stream_id,
'stream-type': dbstream.stream_type,
'from': dbstream.host,
'to': dbstream.stream_id,
'datetime': datetime.now().isoformat(' '),
'filename': name,
'dirname': dirname(relpath(source, archive_dir)),
'path': relpath(source, input_dir),
'source': source})
# Archive
if documents:
self.archive_files(
build, dbstream, archive, input_dir, archive_dir)
# Clean up input directory
if build.settings.get('develop') != 'true':
self.empty_warehouse_directory(
build, dbstream, warehouse, input_dir)
return documents