Files
mail-intake/mail_intake/ingestion/reader.py

172 lines
5.4 KiB
Python

"""
# Summary
High-level mail ingestion orchestration for Mail Intake.
This module provides the primary, provider-agnostic entry point for
reading and processing mail data.
It coordinates:
- Mail adapter access.
- Message and thread iteration.
- Header and body parsing.
- Normalization and model construction.
No provider-specific logic or API semantics are permitted in this layer.
"""
from datetime import datetime
from typing import Iterator, Dict, Any
from mail_intake.adapters.base import MailIntakeAdapter
from mail_intake.models.message import MailIntakeMessage
from mail_intake.models.thread import MailIntakeThread
from mail_intake.parsers.headers import parse_headers, extract_sender
from mail_intake.parsers.body import extract_body
from mail_intake.parsers.subject import normalize_subject
from mail_intake.exceptions import MailIntakeParsingError
class MailIntakeReader:
"""
High-level read-only ingestion interface.
Notes:
**Responsibilities:**
- This class is the primary entry point for consumers of the
Mail Intake library.
- It orchestrates the full ingestion pipeline:
- Querying the adapter for message references.
- Fetching raw provider messages.
- Parsing and normalizing message data.
- Constructing domain models.
**Constraints:**
- This class is intentionally: Provider-agnostic, stateless beyond
iteration scope, read-only.
"""
def __init__(self, adapter: MailIntakeAdapter):
"""
Initialize the mail reader.
Args:
adapter (MailIntakeAdapter):
Mail adapter implementation used to retrieve raw messages and threads from a mail provider.
"""
self._adapter = adapter
def iter_messages(self, query: str) -> Iterator[MailIntakeMessage]:
"""
Iterate over parsed messages matching a provider query.
Args:
query (str):
Provider-specific query string used to filter messages.
Yields:
MailIntakeMessage:
Fully parsed and normalized `MailIntakeMessage` instances.
Raises:
MailIntakeParsingError:
If a message cannot be parsed.
"""
for ref in self._adapter.iter_message_refs(query):
raw = self._adapter.fetch_message(ref["message_id"])
yield self._parse_message(raw)
def iter_threads(self, query: str) -> Iterator[MailIntakeThread]:
"""
Iterate over threads constructed from messages matching a query.
Args:
query (str):
Provider-specific query string used to filter messages.
Yields:
MailIntakeThread:
An iterator of `MailIntakeThread` instances.
Raises:
`MailIntakeParsingError`:
If a message cannot be parsed.
Notes:
**Guarantees:**
- Messages are grouped by `thread_id` and yielded as complete
thread objects containing all associated messages.
"""
threads: Dict[str, MailIntakeThread] = {}
for ref in self._adapter.iter_message_refs(query):
raw = self._adapter.fetch_message(ref["message_id"])
message = self._parse_message(raw)
thread = threads.get(message.thread_id)
if thread is None:
thread = MailIntakeThread(
thread_id=message.thread_id,
normalized_subject=normalize_subject(message.subject),
)
threads[message.thread_id] = thread
thread.add_message(message)
return iter(threads.values())
def _parse_message(self, raw_message: Dict[str, Any]) -> MailIntakeMessage:
"""
Parse a raw provider message into a `MailIntakeMessage`.
Args:
raw_message (Dict[str, Any]):
Provider-native message payload.
Returns:
MailIntakeMessage:
A fully populated `MailIntakeMessage` instance.
Raises:
MailIntakeParsingError:
If the message payload is missing required fields or cannot be parsed.
"""
try:
message_id = raw_message["id"]
thread_id = raw_message["threadId"]
# Gmail internalDate is milliseconds since epoch
timestamp_ms = int(raw_message.get("internalDate", 0))
timestamp = datetime.fromtimestamp(timestamp_ms / 1000)
payload = raw_message.get("payload", {})
raw_headers_list = payload.get("headers", [])
headers = parse_headers(raw_headers_list)
from_email, from_name = extract_sender(headers)
subject = headers.get("subject", "")
body_text = extract_body(payload)
snippet = raw_message.get("snippet", "")
return MailIntakeMessage(
message_id=message_id,
thread_id=thread_id,
timestamp=timestamp,
from_email=from_email,
from_name=from_name,
subject=subject,
body_text=body_text,
snippet=snippet,
raw_headers=headers,
)
except Exception as exc:
raise MailIntakeParsingError(
f"Failed to parse message {raw_message.get('id')}"
) from exc