Source code for ciowarehouse.inputs.stream_ftp
"""Class for FTP input stream."""
from os import walk
from os.path import join, dirname, exists, relpath
from shutil import move
from datetime import datetime
from chrysalio.lib.utils import decrypt
from chrysalio.lib.ftp import Ftp
from . import EXCLUDED_FILES, InputStream
NEW_DIR = 'New'
# =============================================================================
[docs]class InputStreamFtp(InputStream):
"""Class to manage FTP input stream.
See: :class:`.inputs.InputStream`
"""
# -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream):
"""Retrieve documents.
See: :meth:`.inputs.InputStream.documents`
"""
archive, archive_dir = self.archive(build, dbstream)
if archive is None:
return ()
self.archive_clean(build, dbstream, archive)
# Ftp connection
ftp = Ftp(build.error)
if not ftp.connect({
'ftp_host': dbstream.host, 'ftp_path': dbstream.path,
'ftp_port': dbstream.port or 21, 'ftp_ssl': dbstream.ssl,
'ftp_user': dbstream.user,
'ftp_password': decrypt(dbstream.password, 'warehouse'),
'ftp_pasv': True}):
return ()
# Retrieve files
new_dir = join(dirname(archive_dir), NEW_DIR)
if not ftp.download(new_dir, EXCLUDED_FILES) or not exists(new_dir):
return ()
move(new_dir, archive_dir)
dirs, files = ftp.list_directory()
if build.settings.get('develop') != 'true':
for name in dirs:
if not ftp.rmtree(name): # pragma: nocover
return ()
for name in files:
if not ftp.delete(name): # pragma: nocover
return ()
# Register documents
documents = []
for root, ignored_, files in walk(archive_dir):
for name in files:
source = join(root, name)
documents.append({
'stream-id': dbstream.stream_id,
'stream-type': dbstream.stream_type,
'from': dbstream.host,
'to': dbstream.stream_id,
'datetime': datetime.now().isoformat(' '),
'filename': name,
'dirname': dirname(relpath(source, archive_dir)),
'path': relpath(source, archive_dir),
'source': source})
# Archive
self.archive_files(build, dbstream, archive, None, archive_dir)
return documents