lib init
This commit is contained in:
83
mail_intake/parsers/body.py
Normal file
83
mail_intake/parsers/body.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import base64
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from mail_intake.exceptions import MailIntakeParsingError
|
||||
|
||||
|
||||
def _decode_base64(data: str) -> str:
|
||||
"""
|
||||
Decode Gmail URL-safe base64 payload into UTF-8 text.
|
||||
"""
|
||||
try:
|
||||
padded = data.replace("-", "+").replace("_", "/")
|
||||
decoded = base64.b64decode(padded)
|
||||
return decoded.decode("utf-8", errors="replace")
|
||||
except Exception as exc:
|
||||
raise MailIntakeParsingError("Failed to decode message body") from exc
|
||||
|
||||
|
||||
def _extract_from_part(part: Dict[str, Any]) -> Optional[str]:
|
||||
"""
|
||||
Extract text content from a single MIME part.
|
||||
"""
|
||||
mime_type = part.get("mimeType")
|
||||
body = part.get("body", {})
|
||||
data = body.get("data")
|
||||
|
||||
if not data:
|
||||
return None
|
||||
|
||||
text = _decode_base64(data)
|
||||
|
||||
if mime_type == "text/plain":
|
||||
return text
|
||||
|
||||
if mime_type == "text/html":
|
||||
# soup = BeautifulSoup(text, "lxml")
|
||||
soup = BeautifulSoup(text, "html.parser")
|
||||
return soup.get_text(separator="\n", strip=True)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def extract_body(payload: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Extract the best-effort message body from a Gmail payload.
|
||||
|
||||
Priority:
|
||||
1. text/plain
|
||||
2. text/html (stripped to text)
|
||||
3. empty string (if nothing usable found)
|
||||
"""
|
||||
if not payload:
|
||||
return ""
|
||||
|
||||
# Multipart message
|
||||
if "parts" in payload:
|
||||
text_plain = None
|
||||
text_html = None
|
||||
|
||||
for part in payload.get("parts", []):
|
||||
content = _extract_from_part(part)
|
||||
if not content:
|
||||
continue
|
||||
|
||||
if part.get("mimeType") == "text/plain" and text_plain is None:
|
||||
text_plain = content
|
||||
elif part.get("mimeType") == "text/html" and text_html is None:
|
||||
text_html = content
|
||||
|
||||
if text_plain:
|
||||
return text_plain
|
||||
if text_html:
|
||||
return text_html
|
||||
|
||||
# Single-part message
|
||||
body = payload.get("body", {})
|
||||
data = body.get("data")
|
||||
if data:
|
||||
return _decode_base64(data)
|
||||
|
||||
return ""
|
||||
Reference in New Issue
Block a user