Source code for ciowarehouse.inputs.stream_email

"""Class for email input stream."""

from sys import version_info
from os import makedirs
from os.path import join, exists
from imaplib import IMAP4, IMAP4_SSL
from email import message_from_string
from email.utils import parsedate_tz, mktime_tz
from email.header import decode_header
from mimetypes import guess_extension
from datetime import datetime
from socket import gaierror

from chrysalio.lib.utils import decrypt
from ..lib.i18n import _, translate
from ..lib.utils import normalize_filename
from . import InputStream

MAIL_DATETIME_FORMAT = '%y%m%d%H%M%S'


# =============================================================================
[docs]class InputStreamEmail(InputStream): """Class to manage email input stream. See: :class:`.inputs.InputStream` """ # -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream): """Retrieve documents. See: :meth:`.inputs.InputStream.documents` """ # Prepare archive archive, archive_dir = self.archive(build, dbstream) if archive is None: return () self.archive_clean(build, dbstream, archive) # Connect to the IMAP server try: imap = IMAP4_SSL(dbstream.host, dbstream.port or 993) \ if dbstream.ssl else IMAP4(dbstream.host, dbstream.port or 143) if dbstream.user: imap.login( dbstream.user, decrypt(dbstream.password, 'warehouse')) except (gaierror, IMAP4.error, OSError) as error: build.error(error) return () # Select mailbox status, nums = imap.select(dbstream.path or 'INBOX') if status != 'OK': build.error(translate(_('${i}: unable to check mailbox "${m}"', { 'i': dbstream.stream_id, 'm': dbstream.path or 'INBOX'}))) imap.logout() return () if not int(nums[0]): imap.logout() return () # Retrieve message numbers status, nums = imap.search(None, 'ALL') if status != 'OK': # pragma: nocover imap.logout() return () # Loop over messages documents = [] for message_num in nums[0].split(): data = imap.fetch(message_num, '(RFC822)')[1][0][1] if version_info > (3, 0): data = data.decode('utf-8') document = self._archive_message( message_from_string(data), int(message_num), archive_dir) if build.settings.get('develop') != 'true': imap.store(message_num, '+FLAGS', '\\Deleted') if document: document['stream-id'] = dbstream.stream_id document['stream-type'] = dbstream.stream_type documents.append(document) imap.expunge() imap.close() imap.logout() # Archive if documents: self.archive_files(build, dbstream, archive, None, archive_dir) return documents
# ------------------------------------------------------------------------- @classmethod def _archive_message(cls, message, message_num, archive_dir): """Archive a message. :type message: email.message.EmailMessage :param message: Current message :param int message_num: Message number. :param str archive_dir: Absolute path to the directory for these files in the archives. """ # Metadata date = cls._message_date(message) message_id = '{0}-{1}{2:0>3}'.format( normalize_filename( cls._decode_value(message['Subject']), mode='strict')[:32], date.strftime(MAIL_DATETIME_FORMAT), message_num) document = { 'from': cls._decode_value(message['From']), 'to': cls._decode_value(message['To']), 'title': cls._decode_value(message['Subject']), 'organization': cls._decode_value(message['Organization']), 'language': message['Content-Language'], 'datetime': date.isoformat(' '), } # Attachement bodies = [] document['dirname'] = '.' for part_num, part in enumerate(message.walk()): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: bodies.append(part) continue if document['dirname'] == '.': document['dirname'] = message_id makedirs(join(archive_dir, document['dirname']), exist_ok=True) document['filename'] = part.get_filename() if not document['filename']: document['filename'] = '{0}-{1:0>3}{2}'.format( message_id, part_num, guess_extension( part.get_content_type()) or '.bin') document['path'] = join(document['dirname'], document['filename']) with open(join(archive_dir, document['path']), 'wb') as hdl: hdl.write(part.get_payload(decode=True)) # Body if bodies and not exists(archive_dir): makedirs(archive_dir, exist_ok=True) # pragma: nocover for part in bodies: if part.get_content_type() == 'text/html': document['filename'] = '{0}.html'.format(message_id) document['path'] = join( document['dirname'], document['filename']) with open(join(archive_dir, document['path']), 'wb') as hdl: hdl.write(part.get_payload(decode=True)) document['source'] = join(archive_dir, document['path']) \ if document['dirname'] == '.' \ else join(archive_dir, document['dirname']) document['body'] = part.get_payload(decode=True) break if 'source' not in document and bodies: document['filename'] = '{0}.txt'.format(message_id) document['path'] = join(document['dirname'], document['filename']) with open(join(archive_dir, document['path']), 'wb') as hdl: hdl.write(bodies[0].get_payload(decode=True)) document['source'] = join(archive_dir, document['path']) \ if document['dirname'] == '.' \ else join(archive_dir, document['dirname']) document['body'] = bodies[0].get_payload(decode=True) return document # ------------------------------------------------------------------------- @classmethod def _message_date(cls, message): """Return the date of the message. :type message: email.message.EmailMessage :param message: Current message :rtype: datetime.datetime """ date = None if message['Date']: date = parsedate_tz(message['Date']) if date: date = datetime.fromtimestamp(mktime_tz(date)) return date or datetime.now() # ------------------------------------------------------------------------- @classmethod def _decode_value(cls, encoded): """A decoded value. :param str encoded: Encoded value. :rtype: str """ if not encoded: return None decoded = decode_header(encoded) if decoded[0][1]: # pragma: nocover return decoded[0][0].decode(decoded[0][1]) return decoded[0][0]