84 lines
2.1 KiB
Python
84 lines
2.1 KiB
Python
import base64
|
|
from typing import Dict, Any, Optional
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from mail_intake.exceptions import MailIntakeParsingError
|
|
|
|
|
|
def _decode_base64(data: str) -> str:
|
|
"""
|
|
Decode Gmail URL-safe base64 payload into UTF-8 text.
|
|
"""
|
|
try:
|
|
padded = data.replace("-", "+").replace("_", "/")
|
|
decoded = base64.b64decode(padded)
|
|
return decoded.decode("utf-8", errors="replace")
|
|
except Exception as exc:
|
|
raise MailIntakeParsingError("Failed to decode message body") from exc
|
|
|
|
|
|
def _extract_from_part(part: Dict[str, Any]) -> Optional[str]:
|
|
"""
|
|
Extract text content from a single MIME part.
|
|
"""
|
|
mime_type = part.get("mimeType")
|
|
body = part.get("body", {})
|
|
data = body.get("data")
|
|
|
|
if not data:
|
|
return None
|
|
|
|
text = _decode_base64(data)
|
|
|
|
if mime_type == "text/plain":
|
|
return text
|
|
|
|
if mime_type == "text/html":
|
|
# soup = BeautifulSoup(text, "lxml")
|
|
soup = BeautifulSoup(text, "html.parser")
|
|
return soup.get_text(separator="\n", strip=True)
|
|
|
|
return None
|
|
|
|
|
|
def extract_body(payload: Dict[str, Any]) -> str:
|
|
"""
|
|
Extract the best-effort message body from a Gmail payload.
|
|
|
|
Priority:
|
|
1. text/plain
|
|
2. text/html (stripped to text)
|
|
3. empty string (if nothing usable found)
|
|
"""
|
|
if not payload:
|
|
return ""
|
|
|
|
# Multipart message
|
|
if "parts" in payload:
|
|
text_plain = None
|
|
text_html = None
|
|
|
|
for part in payload.get("parts", []):
|
|
content = _extract_from_part(part)
|
|
if not content:
|
|
continue
|
|
|
|
if part.get("mimeType") == "text/plain" and text_plain is None:
|
|
text_plain = content
|
|
elif part.get("mimeType") == "text/html" and text_html is None:
|
|
text_html = content
|
|
|
|
if text_plain:
|
|
return text_plain
|
|
if text_html:
|
|
return text_html
|
|
|
|
# Single-part message
|
|
body = payload.get("body", {})
|
|
data = body.get("data")
|
|
if data:
|
|
return _decode_base64(data)
|
|
|
|
return ""
|