Skip to content

Custom Repo

Use Case 2: Custom Repository with Business Logic

Scenario: E-commerce product catalog with custom search and filtering.

Python
from mongo_ops import BaseDocument, BaseRepository
from typing import Optional, List
from pydantic import Field

class Product(BaseDocument):
    name: str = Field(..., min_length=1)
    description: str
    price: float = Field(..., gt=0)
    category: str
    in_stock: bool = True
    quantity: int = Field(default=0, ge=0)
    tags: List[str] = []

class ProductRepository(BaseRepository[Product]):
    def __init__(self):
        super().__init__("products", Product)

    async def search_by_name(self, query: str) -> List[Product]:
        """Search products by name (case-insensitive)"""
        docs = await self.collection.find({
            "name": {"$regex": query, "$options": "i"}
        }).to_list(length=100)
        return [self.model(**doc) for doc in docs]

    async def get_by_category(self, category: str, in_stock_only: bool = True) -> List[Product]:
        """Get products by category"""
        filter_query = {"category": category}
        if in_stock_only:
            filter_query["in_stock"] = True
        return await self.get_many(filter=filter_query)

    async def get_low_stock(self, threshold: int = 10) -> List[Product]:
        """Get products with low stock"""
        return await self.get_many(
            filter={"quantity": {"$lt": threshold}, "in_stock": True}
        )

    async def update_stock(self, product_id: str, quantity_delta: int) -> Optional[Product]:
        """Update product stock (increment/decrement)"""
        result = await self.collection.find_one_and_update(
            {"_id": ObjectId(product_id)},
            {"$inc": {"quantity": quantity_delta}, "$set": {"updated_at": datetime.utcnow()}},
            return_document=True
        )
        return self.model(**result) if result else None

# Usage in FastAPI
from fastapi import FastAPI, Query

app = FastAPI()
product_repo = ProductRepository()

@app.get("/products/search", response_model=List[Product])
async def search_products(q: str = Query(..., min_length=1)):
    return await product_repo.search_by_name(q)

@app.get("/products/category/{category}", response_model=List[Product])
async def products_by_category(category: str, in_stock: bool = True):
    return await product_repo.get_by_category(category, in_stock)

@app.get("/products/low-stock", response_model=List[Product])
async def low_stock_products(threshold: int = 10):
    return await product_repo.get_low_stock(threshold)

@app.patch("/products/{product_id}/stock")
async def update_product_stock(product_id: str, quantity_delta: int):
    product = await product_repo.update_stock(product_id, quantity_delta)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    return product