- docs(mail_intake/__init__.py): document module-based public API and usage patterns - docs(mail_intake/ingestion/reader.py): document high-level ingestion orchestration - docs(mail_intake/adapters/base.py): document adapter contract for mail providers - docs(mail_intake/adapters/gmail.py): document Gmail adapter implementation and constraints - docs(mail_intake/auth/base.py): document authentication provider contract - docs(mail_intake/auth/google.py): document Google OAuth authentication provider - docs(mail_intake/models/message.py): document canonical email message model - docs(mail_intake/models/thread.py): document canonical email thread model - docs(mail_intake/parsers/body.py): document message body extraction logic - docs(mail_intake/parsers/headers.py): document message header normalization utilities - docs(mail_intake/parsers/subject.py): document subject normalization utilities - docs(mail_intake/config.py): document global configuration model - docs(mail_intake/exceptions.py): document library exception hierarchy
123 lines
3.1 KiB
Python
123 lines
3.1 KiB
Python
"""
|
|
Message body extraction utilities for Mail Intake.
|
|
|
|
This module contains helper functions for extracting a best-effort
|
|
plain-text body from provider-native message payloads.
|
|
|
|
The logic is intentionally tolerant of malformed or partial data and
|
|
prefers human-readable text over fidelity to original formatting.
|
|
"""
|
|
|
|
import base64
|
|
from typing import Dict, Any, Optional
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from mail_intake.exceptions import MailIntakeParsingError
|
|
|
|
|
|
def _decode_base64(data: str) -> str:
|
|
"""
|
|
Decode Gmail URL-safe base64 payload into UTF-8 text.
|
|
|
|
Gmail message bodies are encoded using URL-safe base64, which may
|
|
omit padding and use non-standard characters.
|
|
|
|
Args:
|
|
data: URL-safe base64-encoded string.
|
|
|
|
Returns:
|
|
Decoded UTF-8 text with replacement for invalid characters.
|
|
|
|
Raises:
|
|
MailIntakeParsingError: If decoding fails.
|
|
"""
|
|
try:
|
|
padded = data.replace("-", "+").replace("_", "/")
|
|
decoded = base64.b64decode(padded)
|
|
return decoded.decode("utf-8", errors="replace")
|
|
except Exception as exc:
|
|
raise MailIntakeParsingError("Failed to decode message body") from exc
|
|
|
|
|
|
def _extract_from_part(part: Dict[str, Any]) -> Optional[str]:
|
|
"""
|
|
Extract text content from a single MIME part.
|
|
|
|
Supports:
|
|
- text/plain
|
|
- text/html (converted to plain text)
|
|
|
|
Args:
|
|
part: MIME part dictionary from a provider payload.
|
|
|
|
Returns:
|
|
Extracted plain-text content, or None if unsupported or empty.
|
|
"""
|
|
mime_type = part.get("mimeType")
|
|
body = part.get("body", {})
|
|
data = body.get("data")
|
|
|
|
if not data:
|
|
return None
|
|
|
|
text = _decode_base64(data)
|
|
|
|
if mime_type == "text/plain":
|
|
return text
|
|
|
|
if mime_type == "text/html":
|
|
# soup = BeautifulSoup(text, "lxml")
|
|
soup = BeautifulSoup(text, "html.parser")
|
|
return soup.get_text(separator="\n", strip=True)
|
|
|
|
return None
|
|
|
|
|
|
def extract_body(payload: Dict[str, Any]) -> str:
|
|
"""
|
|
Extract the best-effort message body from a Gmail payload.
|
|
|
|
Priority:
|
|
1. text/plain
|
|
2. text/html (stripped to text)
|
|
3. Single-part body
|
|
4. empty string (if nothing usable found)
|
|
|
|
Args:
|
|
payload: Provider-native message payload dictionary.
|
|
|
|
Returns:
|
|
Extracted plain-text message body.
|
|
"""
|
|
if not payload:
|
|
return ""
|
|
|
|
# Multipart message
|
|
if "parts" in payload:
|
|
text_plain = None
|
|
text_html = None
|
|
|
|
for part in payload.get("parts", []):
|
|
content = _extract_from_part(part)
|
|
if not content:
|
|
continue
|
|
|
|
if part.get("mimeType") == "text/plain" and text_plain is None:
|
|
text_plain = content
|
|
elif part.get("mimeType") == "text/html" and text_html is None:
|
|
text_html = content
|
|
|
|
if text_plain:
|
|
return text_plain
|
|
if text_html:
|
|
return text_html
|
|
|
|
# Single-part message
|
|
body = payload.get("body", {})
|
|
data = body.get("data")
|
|
if data:
|
|
return _decode_base64(data)
|
|
|
|
return ""
|