Common Patterns
Common Patterns
Pattern 1: Service Layer with Repository
Python
class UserService:
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
async def register_user(self, username: str, email: str) -> User:
# Check if user exists
existing = await self.user_repo.collection.find_one({"email": email})
if existing:
raise ValueError("Email already exists")
# Create user
user = User(username=username, email=email)
return await self.user_repo.create(user)
Pattern 2: Aggregation Pipeline
Python
async def get_user_stats(self, user_id: str) -> dict:
pipeline = [
{"$match": {"author_id": user_id}},
{"$group": {
"_id": None,
"total_posts": {"$sum": 1},
"total_likes": {"$sum": "$likes"}
}}
]
result = await post_repo.collection.aggregate(pipeline).to_list(1)
return result[0] if result else {"total_posts": 0, "total_likes": 0}
Pattern 3: Bulk Operations
Python
async def bulk_update_status(self, ids: List[str], status: str):
from bson import ObjectId
object_ids = [ObjectId(id) for id in ids]
await self.collection.update_many(
{"_id": {"$in": object_ids}},
{"$set": {"status": status, "updated_at": datetime.utcnow()}}
)