- docs(mail_intake/__init__.py): document module-based public API and usage patterns - docs(mail_intake/ingestion/reader.py): document high-level ingestion orchestration - docs(mail_intake/adapters/base.py): document adapter contract for mail providers - docs(mail_intake/adapters/gmail.py): document Gmail adapter implementation and constraints - docs(mail_intake/auth/base.py): document authentication provider contract - docs(mail_intake/auth/google.py): document Google OAuth authentication provider - docs(mail_intake/models/message.py): document canonical email message model - docs(mail_intake/models/thread.py): document canonical email thread model - docs(mail_intake/parsers/body.py): document message body extraction logic - docs(mail_intake/parsers/headers.py): document message header normalization utilities - docs(mail_intake/parsers/subject.py): document subject normalization utilities - docs(mail_intake/config.py): document global configuration model - docs(mail_intake/exceptions.py): document library exception hierarchy
156 lines
5.0 KiB
Python
156 lines
5.0 KiB
Python
"""
|
|
High-level mail ingestion orchestration for Mail Intake.
|
|
|
|
This module provides the primary, provider-agnostic entry point for
|
|
reading and processing mail data.
|
|
|
|
It coordinates:
|
|
- Mail adapter access
|
|
- Message and thread iteration
|
|
- Header and body parsing
|
|
- Normalization and model construction
|
|
|
|
No provider-specific logic or API semantics are permitted in this layer.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Iterator, Dict, Any
|
|
|
|
from mail_intake.adapters.base import MailIntakeAdapter
|
|
from mail_intake.models.message import MailIntakeMessage
|
|
from mail_intake.models.thread import MailIntakeThread
|
|
from mail_intake.parsers.headers import parse_headers, extract_sender
|
|
from mail_intake.parsers.body import extract_body
|
|
from mail_intake.parsers.subject import normalize_subject
|
|
from mail_intake.exceptions import MailIntakeParsingError
|
|
|
|
|
|
class MailIntakeReader:
|
|
"""
|
|
High-level read-only ingestion interface.
|
|
|
|
This class is the **primary entry point** for consumers of the Mail
|
|
Intake library.
|
|
|
|
It orchestrates the full ingestion pipeline:
|
|
- Querying the adapter for message references
|
|
- Fetching raw provider messages
|
|
- Parsing and normalizing message data
|
|
- Constructing domain models
|
|
|
|
This class is intentionally:
|
|
- Provider-agnostic
|
|
- Stateless beyond iteration scope
|
|
- Read-only
|
|
"""
|
|
|
|
def __init__(self, adapter: MailIntakeAdapter):
|
|
"""
|
|
Initialize the mail reader.
|
|
|
|
Args:
|
|
adapter: Mail adapter implementation used to retrieve raw
|
|
messages and threads from a mail provider.
|
|
"""
|
|
self._adapter = adapter
|
|
|
|
def iter_messages(self, query: str) -> Iterator[MailIntakeMessage]:
|
|
"""
|
|
Iterate over parsed messages matching a provider query.
|
|
|
|
Args:
|
|
query: Provider-specific query string used to filter messages.
|
|
|
|
Yields:
|
|
Fully parsed and normalized `MailIntakeMessage` instances.
|
|
|
|
Raises:
|
|
MailIntakeParsingError: If a message cannot be parsed.
|
|
"""
|
|
for ref in self._adapter.iter_message_refs(query):
|
|
raw = self._adapter.fetch_message(ref["message_id"])
|
|
yield self._parse_message(raw)
|
|
|
|
def iter_threads(self, query: str) -> Iterator[MailIntakeThread]:
|
|
"""
|
|
Iterate over threads constructed from messages matching a query.
|
|
|
|
Messages are grouped by `thread_id` and yielded as complete thread
|
|
objects containing all associated messages.
|
|
|
|
Args:
|
|
query: Provider-specific query string used to filter messages.
|
|
|
|
Returns:
|
|
An iterator of `MailIntakeThread` instances.
|
|
|
|
Raises:
|
|
MailIntakeParsingError: If a message cannot be parsed.
|
|
"""
|
|
threads: Dict[str, MailIntakeThread] = {}
|
|
|
|
for ref in self._adapter.iter_message_refs(query):
|
|
raw = self._adapter.fetch_message(ref["message_id"])
|
|
message = self._parse_message(raw)
|
|
|
|
thread = threads.get(message.thread_id)
|
|
if thread is None:
|
|
thread = MailIntakeThread(
|
|
thread_id=message.thread_id,
|
|
normalized_subject=normalize_subject(message.subject),
|
|
)
|
|
threads[message.thread_id] = thread
|
|
|
|
thread.add_message(message)
|
|
|
|
return iter(threads.values())
|
|
|
|
def _parse_message(self, raw_message: Dict[str, Any]) -> MailIntakeMessage:
|
|
"""
|
|
Parse a raw provider message into a `MailIntakeMessage`.
|
|
|
|
Args:
|
|
raw_message: Provider-native message payload.
|
|
|
|
Returns:
|
|
A fully populated `MailIntakeMessage` instance.
|
|
|
|
Raises:
|
|
MailIntakeParsingError: If the message payload is missing required
|
|
fields or cannot be parsed.
|
|
"""
|
|
try:
|
|
message_id = raw_message["id"]
|
|
thread_id = raw_message["threadId"]
|
|
|
|
# Gmail internalDate is milliseconds since epoch
|
|
timestamp_ms = int(raw_message.get("internalDate", 0))
|
|
timestamp = datetime.fromtimestamp(timestamp_ms / 1000)
|
|
|
|
payload = raw_message.get("payload", {})
|
|
raw_headers_list = payload.get("headers", [])
|
|
|
|
headers = parse_headers(raw_headers_list)
|
|
from_email, from_name = extract_sender(headers)
|
|
|
|
subject = headers.get("subject", "")
|
|
body_text = extract_body(payload)
|
|
snippet = raw_message.get("snippet", "")
|
|
|
|
return MailIntakeMessage(
|
|
message_id=message_id,
|
|
thread_id=thread_id,
|
|
timestamp=timestamp,
|
|
from_email=from_email,
|
|
from_name=from_name,
|
|
subject=subject,
|
|
body_text=body_text,
|
|
snippet=snippet,
|
|
raw_headers=headers,
|
|
)
|
|
|
|
except Exception as exc:
|
|
raise MailIntakeParsingError(
|
|
f"Failed to parse message {raw_message.get('id')}"
|
|
) from exc
|