Source code for ciowarehouse.inputs.stream_ftp

"""Class for FTP input stream."""

from os import walk
from os.path import join, dirname, exists, relpath
from shutil import move
from datetime import datetime

from chrysalio.lib.utils import decrypt
from chrysalio.lib.ftp import Ftp
from . import EXCLUDED_FILES, InputStream

NEW_DIR = 'New'


# =============================================================================
[docs]class InputStreamFtp(InputStream): """Class to manage FTP input stream. See: :class:`.inputs.InputStream` """ # -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream): """Retrieve documents. See: :meth:`.inputs.InputStream.documents` """ archive, archive_dir = self.archive(build, dbstream) if archive is None: return () self.archive_clean(build, dbstream, archive) # Ftp connection ftp = Ftp(build.error) if not ftp.connect({ 'ftp_host': dbstream.host, 'ftp_path': dbstream.path, 'ftp_port': dbstream.port or 21, 'ftp_ssl': dbstream.ssl, 'ftp_user': dbstream.user, 'ftp_password': decrypt(dbstream.password, 'warehouse'), 'ftp_pasv': True}): return () # Retrieve files new_dir = join(dirname(archive_dir), NEW_DIR) if not ftp.download(new_dir, EXCLUDED_FILES) or not exists(new_dir): return () move(new_dir, archive_dir) dirs, files = ftp.list_directory() if build.settings.get('develop') != 'true': for name in dirs: if not ftp.rmtree(name): # pragma: nocover return () for name in files: if not ftp.delete(name): # pragma: nocover return () # Register documents documents = [] for root, ignored_, files in walk(archive_dir): for name in files: source = join(root, name) documents.append({ 'stream-id': dbstream.stream_id, 'stream-type': dbstream.stream_type, 'from': dbstream.host, 'to': dbstream.stream_id, 'datetime': datetime.now().isoformat(' '), 'filename': name, 'dirname': dirname(relpath(source, archive_dir)), 'path': relpath(source, archive_dir), 'source': source}) # Archive self.archive_files(build, dbstream, archive, None, archive_dir) return documents