Source code for ciowarehouse.inputs.stream_email
"""Class for email input stream."""
from sys import version_info
from os import makedirs
from os.path import join, exists
from imaplib import IMAP4, IMAP4_SSL
from email import message_from_string
from email.utils import parsedate_tz, mktime_tz
from email.header import decode_header
from mimetypes import guess_extension
from datetime import datetime
from socket import gaierror
from chrysalio.lib.utils import decrypt
from ..lib.i18n import _, translate
from ..lib.utils import normalize_filename
from . import InputStream
MAIL_DATETIME_FORMAT = '%y%m%d%H%M%S'
# =============================================================================
[docs]class InputStreamEmail(InputStream):
"""Class to manage email input stream.
See: :class:`.inputs.InputStream`
"""
# -------------------------------------------------------------------------
[docs] def documents(self, build, dbstream):
"""Retrieve documents.
See: :meth:`.inputs.InputStream.documents`
"""
# Prepare archive
archive, archive_dir = self.archive(build, dbstream)
if archive is None:
return ()
self.archive_clean(build, dbstream, archive)
# Connect to the IMAP server
try:
imap = IMAP4_SSL(dbstream.host, dbstream.port or 993) \
if dbstream.ssl else IMAP4(dbstream.host, dbstream.port or 143)
if dbstream.user:
imap.login(
dbstream.user, decrypt(dbstream.password, 'warehouse'))
except (gaierror, IMAP4.error, OSError) as error:
build.error(error)
return ()
# Select mailbox
status, nums = imap.select(dbstream.path or 'INBOX')
if status != 'OK':
build.error(translate(_('${i}: unable to check mailbox "${m}"', {
'i': dbstream.stream_id, 'm': dbstream.path or 'INBOX'})))
imap.logout()
return ()
if not int(nums[0]):
imap.logout()
return ()
# Retrieve message numbers
status, nums = imap.search(None, 'ALL')
if status != 'OK': # pragma: nocover
imap.logout()
return ()
# Loop over messages
documents = []
for message_num in nums[0].split():
data = imap.fetch(message_num, '(RFC822)')[1][0][1]
if version_info > (3, 0):
data = data.decode('utf-8')
document = self._archive_message(
message_from_string(data), int(message_num), archive_dir)
if build.settings.get('develop') != 'true':
imap.store(message_num, '+FLAGS', '\\Deleted')
if document:
document['stream-id'] = dbstream.stream_id
document['stream-type'] = dbstream.stream_type
documents.append(document)
imap.expunge()
imap.close()
imap.logout()
# Archive
if documents:
self.archive_files(build, dbstream, archive, None, archive_dir)
return documents
# -------------------------------------------------------------------------
@classmethod
def _archive_message(cls, message, message_num, archive_dir):
"""Archive a message.
:type message: email.message.EmailMessage
:param message:
Current message
:param int message_num:
Message number.
:param str archive_dir:
Absolute path to the directory for these files in the archives.
"""
# Metadata
date = cls._message_date(message)
message_id = '{0}-{1}{2:0>3}'.format(
normalize_filename(
cls._decode_value(message['Subject']), mode='strict')[:32],
date.strftime(MAIL_DATETIME_FORMAT), message_num)
document = {
'from': cls._decode_value(message['From']),
'to': cls._decode_value(message['To']),
'title': cls._decode_value(message['Subject']),
'organization': cls._decode_value(message['Organization']),
'language': message['Content-Language'],
'datetime': date.isoformat(' '),
}
# Attachement
bodies = []
document['dirname'] = '.'
for part_num, part in enumerate(message.walk()):
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
bodies.append(part)
continue
if document['dirname'] == '.':
document['dirname'] = message_id
makedirs(join(archive_dir, document['dirname']), exist_ok=True)
document['filename'] = part.get_filename()
if not document['filename']:
document['filename'] = '{0}-{1:0>3}{2}'.format(
message_id, part_num, guess_extension(
part.get_content_type()) or '.bin')
document['path'] = join(document['dirname'], document['filename'])
with open(join(archive_dir, document['path']), 'wb') as hdl:
hdl.write(part.get_payload(decode=True))
# Body
if bodies and not exists(archive_dir):
makedirs(archive_dir, exist_ok=True) # pragma: nocover
for part in bodies:
if part.get_content_type() == 'text/html':
document['filename'] = '{0}.html'.format(message_id)
document['path'] = join(
document['dirname'], document['filename'])
with open(join(archive_dir, document['path']), 'wb') as hdl:
hdl.write(part.get_payload(decode=True))
document['source'] = join(archive_dir, document['path']) \
if document['dirname'] == '.' \
else join(archive_dir, document['dirname'])
document['body'] = part.get_payload(decode=True)
break
if 'source' not in document and bodies:
document['filename'] = '{0}.txt'.format(message_id)
document['path'] = join(document['dirname'], document['filename'])
with open(join(archive_dir, document['path']), 'wb') as hdl:
hdl.write(bodies[0].get_payload(decode=True))
document['source'] = join(archive_dir, document['path']) \
if document['dirname'] == '.' \
else join(archive_dir, document['dirname'])
document['body'] = bodies[0].get_payload(decode=True)
return document
# -------------------------------------------------------------------------
@classmethod
def _message_date(cls, message):
"""Return the date of the message.
:type message: email.message.EmailMessage
:param message:
Current message
:rtype: datetime.datetime
"""
date = None
if message['Date']:
date = parsedate_tz(message['Date'])
if date:
date = datetime.fromtimestamp(mktime_tz(date))
return date or datetime.now()
# -------------------------------------------------------------------------
@classmethod
def _decode_value(cls, encoded):
"""A decoded value.
:param str encoded:
Encoded value.
:rtype: str
"""
if not encoded:
return None
decoded = decode_header(encoded)
if decoded[0][1]: # pragma: nocover
return decoded[0][0].decode(decoded[0][1])
return decoded[0][0]