Source code for ciowarehouse.inputs.stream_file

"""Class for file input stream."""

from os import walk
from os.path import join, relpath, dirname
from datetime import datetime

from ..lib.utils import EXCLUDED_FILES
from . import InputStream


# =============================================================================
[docs]class InputStreamFile(InputStream): """Class to manage file input stream. See: :class:`.inputs.InputStream` """ # -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream): """Retrieve documents. See: :meth:`.inputs.InputStream.documents` """ warehouse = self._get_warehouse(build, dbstream.host) if warehouse is None: return () archive, archive_dir = self.archive(build, dbstream) if archive is None: return () self.archive_clean(build, dbstream, archive) # Loop over documents input_dir = join(warehouse.root, dbstream.path or '.') documents = [] for path, dirs, files in walk(input_dir): for name in dirs: if name in EXCLUDED_FILES: dirs.remove(name) for name in files: if name in EXCLUDED_FILES: continue source = join(path, name) documents.append({ 'stream-id': dbstream.stream_id, 'stream-type': dbstream.stream_type, 'from': dbstream.host, 'to': dbstream.stream_id, 'datetime': datetime.now().isoformat(' '), 'filename': name, 'dirname': dirname(relpath(source, archive_dir)), 'path': relpath(source, input_dir), 'source': source}) # Archive if documents: self.archive_files( build, dbstream, archive, input_dir, archive_dir) # Clean up input directory if build.settings.get('develop') != 'true': self.empty_warehouse_directory( build, dbstream, warehouse, input_dir) return documents