Basic CRUD
Use Case 1: Basic FastAPI CRUD API
Scenario: Create a simple user management API with CRUD operations.
Python
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from mongo_ops import (
MongoConnectionManager,
BaseDocument,
BaseRepository,
ModelRegistry
)
from pydantic import Field
from typing import Optional
# Define Model
class User(BaseDocument):
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(...)
is_active: bool = True
# Create Repository
class UserRepository(BaseRepository[User]):
def __init__(self):
super().__init__("users", User)
# Register Model
ModelRegistry.register("users", User, indexes=[("email", 1)])
# Setup FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
await MongoConnectionManager.connect(
uri="mongodb://localhost:27017",
db_name="myapp"
)
await ModelRegistry.initialize_all()
yield
await MongoConnectionManager.disconnect()
app = FastAPI(lifespan=lifespan)
user_repo = UserRepository()
# Endpoints
@app.post("/users/", response_model=User)
async def create_user(user: User):
return await user_repo.create(user)
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: str):
user = await user_repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.get("/users/", response_model=list[User])
async def list_users(skip: int = 0, limit: int = 10):
return await user_repo.get_many(skip=skip, limit=limit)
@app.put("/users/{user_id}", response_model=User)
async def update_user(user_id: str, email: str):
user = await user_repo.update(user_id, {"email": email})
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.delete("/users/{user_id}")
async def delete_user(user_id: str):
deleted = await user_repo.delete(user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted successfully"}