""" High-level mail ingestion orchestration for Mail Intake. This module provides the primary, provider-agnostic entry point for reading and processing mail data. It coordinates: - Mail adapter access - Message and thread iteration - Header and body parsing - Normalization and model construction No provider-specific logic or API semantics are permitted in this layer. """ from datetime import datetime from typing import Iterator, Dict, Any from mail_intake.adapters.base import MailIntakeAdapter from mail_intake.models.message import MailIntakeMessage from mail_intake.models.thread import MailIntakeThread from mail_intake.parsers.headers import parse_headers, extract_sender from mail_intake.parsers.body import extract_body from mail_intake.parsers.subject import normalize_subject from mail_intake.exceptions import MailIntakeParsingError class MailIntakeReader: """ High-level read-only ingestion interface. This class is the **primary entry point** for consumers of the Mail Intake library. It orchestrates the full ingestion pipeline: - Querying the adapter for message references - Fetching raw provider messages - Parsing and normalizing message data - Constructing domain models This class is intentionally: - Provider-agnostic - Stateless beyond iteration scope - Read-only """ def __init__(self, adapter: MailIntakeAdapter): """ Initialize the mail reader. Args: adapter: Mail adapter implementation used to retrieve raw messages and threads from a mail provider. """ self._adapter = adapter def iter_messages(self, query: str) -> Iterator[MailIntakeMessage]: """ Iterate over parsed messages matching a provider query. Args: query: Provider-specific query string used to filter messages. Yields: Fully parsed and normalized `MailIntakeMessage` instances. Raises: MailIntakeParsingError: If a message cannot be parsed. """ for ref in self._adapter.iter_message_refs(query): raw = self._adapter.fetch_message(ref["message_id"]) yield self._parse_message(raw) def iter_threads(self, query: str) -> Iterator[MailIntakeThread]: """ Iterate over threads constructed from messages matching a query. Messages are grouped by `thread_id` and yielded as complete thread objects containing all associated messages. Args: query: Provider-specific query string used to filter messages. Returns: An iterator of `MailIntakeThread` instances. Raises: MailIntakeParsingError: If a message cannot be parsed. """ threads: Dict[str, MailIntakeThread] = {} for ref in self._adapter.iter_message_refs(query): raw = self._adapter.fetch_message(ref["message_id"]) message = self._parse_message(raw) thread = threads.get(message.thread_id) if thread is None: thread = MailIntakeThread( thread_id=message.thread_id, normalized_subject=normalize_subject(message.subject), ) threads[message.thread_id] = thread thread.add_message(message) return iter(threads.values()) def _parse_message(self, raw_message: Dict[str, Any]) -> MailIntakeMessage: """ Parse a raw provider message into a `MailIntakeMessage`. Args: raw_message: Provider-native message payload. Returns: A fully populated `MailIntakeMessage` instance. Raises: MailIntakeParsingError: If the message payload is missing required fields or cannot be parsed. """ try: message_id = raw_message["id"] thread_id = raw_message["threadId"] # Gmail internalDate is milliseconds since epoch timestamp_ms = int(raw_message.get("internalDate", 0)) timestamp = datetime.fromtimestamp(timestamp_ms / 1000) payload = raw_message.get("payload", {}) raw_headers_list = payload.get("headers", []) headers = parse_headers(raw_headers_list) from_email, from_name = extract_sender(headers) subject = headers.get("subject", "") body_text = extract_body(payload) snippet = raw_message.get("snippet", "") return MailIntakeMessage( message_id=message_id, thread_id=thread_id, timestamp=timestamp, from_email=from_email, from_name=from_name, subject=subject, body_text=body_text, snippet=snippet, raw_headers=headers, ) except Exception as exc: raise MailIntakeParsingError( f"Failed to parse message {raw_message.get('id')}" ) from exc