92 lines
2.2 KiB
Python
92 lines
2.2 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from mail_intake.models.message import MailIntakeMessage
|
|
from mail_intake.models.thread import MailIntakeThread
|
|
|
|
|
|
def test_message_is_immutable():
|
|
msg = MailIntakeMessage(
|
|
message_id="m1",
|
|
thread_id="t1",
|
|
timestamp=datetime.utcnow(),
|
|
from_email="alice@example.com",
|
|
from_name="Alice",
|
|
subject="Hello",
|
|
body_text="Body",
|
|
snippet="Snippet",
|
|
raw_headers={"from": "Alice <alice@example.com>"},
|
|
)
|
|
|
|
try:
|
|
msg.subject = "Changed"
|
|
assert False, "Message should be immutable"
|
|
except Exception:
|
|
assert True
|
|
|
|
|
|
def test_thread_add_message_updates_participants_and_timestamp():
|
|
t0 = datetime.utcnow()
|
|
t1 = t0 + timedelta(minutes=5)
|
|
|
|
msg1 = MailIntakeMessage(
|
|
message_id="m1",
|
|
thread_id="t1",
|
|
timestamp=t0,
|
|
from_email="alice@example.com",
|
|
from_name="Alice",
|
|
subject="Hello",
|
|
body_text="Body",
|
|
snippet="Snippet",
|
|
raw_headers={},
|
|
)
|
|
|
|
msg2 = MailIntakeMessage(
|
|
message_id="m2",
|
|
thread_id="t1",
|
|
timestamp=t1,
|
|
from_email="bob@example.com",
|
|
from_name="Bob",
|
|
subject="Re: Hello",
|
|
body_text="Reply",
|
|
snippet="Reply",
|
|
raw_headers={},
|
|
)
|
|
|
|
thread = MailIntakeThread(
|
|
thread_id="t1",
|
|
normalized_subject="Hello",
|
|
)
|
|
|
|
thread.add_message(msg1)
|
|
assert thread.last_activity_at == t0
|
|
assert "alice@example.com" in thread.participants
|
|
|
|
thread.add_message(msg2)
|
|
assert thread.last_activity_at == t1
|
|
assert "bob@example.com" in thread.participants
|
|
assert len(thread.messages) == 2
|
|
|
|
|
|
def test_thread_handles_messages_without_sender():
|
|
msg = MailIntakeMessage(
|
|
message_id="m1",
|
|
thread_id="t1",
|
|
timestamp=datetime.utcnow(),
|
|
from_email="",
|
|
from_name=None,
|
|
subject="System Message",
|
|
body_text="Body",
|
|
snippet="Snippet",
|
|
raw_headers={},
|
|
)
|
|
|
|
thread = MailIntakeThread(
|
|
thread_id="t1",
|
|
normalized_subject="System Message",
|
|
)
|
|
|
|
thread.add_message(msg)
|
|
|
|
assert len(thread.participants) == 0
|
|
assert thread.last_activity_at is not None
|