Files
mail-intake/mail_intake/adapters/gmail.py

191 lines
5.3 KiB
Python

"""
Gmail adapter implementation for Mail Intake.
---
## Summary
This module provides a **Gmail-specific implementation** of the
`MailIntakeAdapter` contract.
It is the only place in the codebase where:
- `googleapiclient` is imported
- Gmail REST API semantics are known
- Low-level `.execute()` calls are made
All Gmail-specific behavior must be strictly contained within this module.
"""
from typing import Iterator, Dict, Any
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from mail_intake.adapters.base import MailIntakeAdapter
from mail_intake.exceptions import MailIntakeAdapterError
from mail_intake.auth.base import MailIntakeAuthProvider
class MailIntakeGmailAdapter(MailIntakeAdapter):
"""
Gmail read-only adapter.
This adapter implements the `MailIntakeAdapter` interface using the
Gmail REST API. It translates the generic mail intake contract into
Gmail-specific API calls.
Notes:
**Responsibilities:**
- This class is the ONLY place where googleapiclient is imported
- Gmail REST semantics are known
- .execute() is called
**Constraints:**
- Must remain thin and imperative
- Must not perform parsing or interpretation
- Must not expose Gmail-specific types beyond this class
"""
def __init__(
self,
auth_provider: MailIntakeAuthProvider,
user_id: str = "me",
):
"""
Initialize the Gmail adapter.
Args:
auth_provider (MailIntakeAuthProvider):
Authentication provider capable of supplying valid Gmail API credentials.
user_id (str):
Gmail user identifier. Defaults to `"me"`.
"""
self._auth_provider = auth_provider
self._user_id = user_id
self._service = None
@property
def service(self):
"""
Lazily initialize and return the Gmail API service client.
Returns:
Any:
Initialized Gmail API service instance.
Raises:
MailIntakeAdapterError:
If the Gmail service cannot be initialized.
"""
if self._service is None:
try:
creds = self._auth_provider.get_credentials()
self._service = build("gmail", "v1", credentials=creds)
except Exception as exc:
raise MailIntakeAdapterError(
"Failed to initialize Gmail service"
) from exc
return self._service
def iter_message_refs(self, query: str) -> Iterator[Dict[str, str]]:
"""
Iterate over message references matching the query.
Args:
query (str):
Gmail search query string.
Yields:
Dict[str, str]:
Dictionaries containing ``message_id`` and ``thread_id``.
Raises:
MailIntakeAdapterError:
If the Gmail API returns an error.
"""
try:
request = (
self.service.users()
.messages()
.list(userId=self._user_id, q=query)
)
while request is not None:
response = request.execute()
for msg in response.get("messages", []):
yield {
"message_id": msg["id"],
"thread_id": msg["threadId"],
}
request = (
self.service.users()
.messages()
.list_next(request, response)
)
except HttpError as exc:
raise MailIntakeAdapterError(
"Gmail API error while listing messages"
) from exc
def fetch_message(self, message_id: str) -> Dict[str, Any]:
"""
Fetch a full Gmail message by message ID.
Args:
message_id (str):
Gmail message identifier.
Returns:
Dict[str, Any]:
Provider-native Gmail message payload.
Raises:
MailIntakeAdapterError:
If the Gmail API returns an error.
"""
try:
return (
self.service.users()
.messages()
.get(userId=self._user_id, id=message_id)
.execute()
)
except HttpError as exc:
raise MailIntakeAdapterError(
f"Gmail API error while fetching message {message_id}"
) from exc
def fetch_thread(self, thread_id: str) -> Dict[str, Any]:
"""
Fetch a full Gmail thread by thread ID.
Args:
thread_id (str):
Gmail thread identifier.
Returns:
Dict[str, Any]:
Provider-native Gmail thread payload.
Raises:
MailIntakeAdapterError:
If the Gmail API returns an error.
"""
try:
return (
self.service.users()
.threads()
.get(userId=self._user_id, id=thread_id)
.execute()
)
except HttpError as exc:
raise MailIntakeAdapterError(
f"Gmail API error while fetching thread {thread_id}"
) from exc