diff --git a/mongo-ops/site/03_use_cases/01_basic_crud/index.html b/mongo-ops/site/03_use_cases/01_basic_crud/index.html index 0a3fa32..b575df8 100644 --- a/mongo-ops/site/03_use_cases/01_basic_crud/index.html +++ b/mongo-ops/site/03_use_cases/01_basic_crud/index.html @@ -617,74 +617,109 @@
Scenario: Create a simple user management API with CRUD operations.
-from fastapi import FastAPI, HTTPException
-from contextlib import asynccontextmanager
-from mongo_ops import (
- MongoConnectionManager,
- BaseDocument,
- BaseRepository,
- ModelRegistry
-)
-from pydantic import Field
-from typing import Optional
+Pythonimport os
+from fastapi import FastAPI, Depends, HTTPException
+from contextlib import asynccontextmanager
+from pydantic import Field
+from mongo_ops import (
+ MongoConnectionManager,
+ BaseDocument,
+ BaseRepository,
+ ModelRegistry,
+)
-# Define Model
-class User(BaseDocument):
- username: str = Field(..., min_length=3, max_length=50)
- email: str = Field(...)
- is_active: bool = True
-
-# Create Repository
-class UserRepository(BaseRepository[User]):
- def __init__(self):
- super().__init__("users", User)
-
-# Register Model
-ModelRegistry.register("users", User, indexes=[("email", 1)])
-
-# Setup FastAPI
-@asynccontextmanager
-async def lifespan(app: FastAPI):
- await MongoConnectionManager.connect(
- uri="mongodb://localhost:27017",
- db_name="myapp"
- )
- await ModelRegistry.initialize_all()
- yield
- await MongoConnectionManager.disconnect()
-
-app = FastAPI(lifespan=lifespan)
-user_repo = UserRepository()
-
-# Endpoints
-@app.post("/users/", response_model=User)
-async def create_user(user: User):
- return await user_repo.create(user)
-
-@app.get("/users/{user_id}", response_model=User)
-async def get_user(user_id: str):
- user = await user_repo.get_by_id(user_id)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- return user
-
-@app.get("/users/", response_model=list[User])
-async def list_users(skip: int = 0, limit: int = 10):
- return await user_repo.get_many(skip=skip, limit=limit)
-
-@app.put("/users/{user_id}", response_model=User)
-async def update_user(user_id: str, email: str):
- user = await user_repo.update(user_id, {"email": email})
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- return user
-
-@app.delete("/users/{user_id}")
-async def delete_user(user_id: str):
- deleted = await user_repo.delete(user_id)
- if not deleted:
- raise HTTPException(status_code=404, detail="User not found")
- return {"message": "User deleted successfully"}
+# ---------------------------
+# Model Definition
+# ---------------------------
+class User(BaseDocument):
+ username: str = Field(..., min_length=3, max_length=50)
+ email: str = Field(...)
+ is_active: bool = True
+
+
+# ---------------------------
+# Repository
+# ---------------------------
+class UserRepository(BaseRepository[User]):
+ def __init__(self):
+ super().__init__("users", User)
+
+
+# Register the model only once
+ModelRegistry.register("users", User, indexes=[("email", 1)])
+
+
+# ---------------------------
+# FastAPI Lifespan
+# ---------------------------
+@asynccontextmanager
+async def lifespan(_app: FastAPI):
+ """Manage MongoDB connection during the app lifecycle."""
+ async with MongoConnectionManager.lifespan(
+ uri="mongodb://localhost:27017",
+ db_name="mydb"
+ ):
+ await ModelRegistry.initialize_all()
+ yield
+
+
+app = FastAPI(lifespan=lifespan)
+
+
+# ---------------------------
+# Dependency Injection
+# ---------------------------
+def get_user_repository() -> UserRepository:
+ """Dependency-injected repository for users."""
+ return UserRepository()
+
+
+# ---------------------------
+# Routes
+# ---------------------------
+@app.post("/users/", response_model=User)
+async def create_user(user: User, repo: UserRepository = Depends(get_user_repository)):
+ return await repo.create(user)
+
+
+@app.get("/users/{user_id}", response_model=User)
+async def get_user(user_id: str, repo: UserRepository = Depends(get_user_repository)):
+ user = await repo.get_by_id(user_id)
+ if not user:
+ raise HTTPException(status_code=404, detail="User not found")
+ return user
+
+
+@app.get("/users/", response_model=list[User])
+async def list_users(
+ skip: int = 0,
+ limit: int = 10,
+ repo: UserRepository = Depends(get_user_repository),
+):
+ return await repo.get_many(skip=skip, limit=limit)
+
+
+@app.put("/users/{user_id}", response_model=User)
+async def update_user(
+ user_id: str,
+ email: str,
+ repo: UserRepository = Depends(get_user_repository),
+):
+ user = await repo.update(user_id, {"email": email})
+ if not user:
+ raise HTTPException(status_code=404, detail="User not found")
+ return user
+
+
+@app.delete("/users/{user_id}")
+async def delete_user(
+ user_id: str,
+ repo: UserRepository = Depends(get_user_repository),
+):
+ deleted = await repo.delete(user_id)
+ if not deleted:
+ raise HTTPException(status_code=404, detail="User not found")
+ return {"message": "User deleted successfully"}
diff --git a/mongo-ops/site/index.html b/mongo-ops/site/index.html
index 8de3196..1e616fc 100644
--- a/mongo-ops/site/index.html
+++ b/mongo-ops/site/index.html
@@ -712,7 +712,7 @@ It standardizes repository patterns, async CRUD operations, and model management
Drone CI: Auto-builds and publishes tagged releases.
-© Aetoskia Internal — mongo-ops 0.1.1
+© Aetoskia Internal — mongo-ops 0.1.2
diff --git a/mongo-ops/site/search/search_index.json b/mongo-ops/site/search/search_index.json
index 72b1073..a957835 100644
--- a/mongo-ops/site/search/search_index.json
+++ b/mongo-ops/site/search/search_index.json
@@ -1 +1 @@
-{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"\ud83e\udde9 mongo-ops \u2014 Async MongoDB Operations Layer for FastAPI","text":"mongo-ops is a modular, high-performance MongoDB operations library designed for FastAPI microservices. It standardizes repository patterns, async CRUD operations, and model management \u2014 with extensible components for both Beanie and Motor.
"},{"location":"#key-features","title":"\ud83d\ude80 Key Features","text":" - \ud83e\uddf1 Unified repository pattern for MongoDB collections
- \u26a1 Fully asynchronous (Motor-based)
- \ud83e\uddec Pydantic v2 data model integration
- \ud83e\uddf0 Built-in CRUD and aggregation utilities
- \ud83d\udd12 Transaction and session helpers
- \ud83e\udde9 Optional Beanie ORM integration
- \ud83e\uddea Pytest-friendly architecture
"},{"location":"#installation","title":"\ud83d\udce6 Installation","text":"From your internal PyPI:
Bashpip install --extra-index-url https://$PYPI_USERNAME:$PYPI_PASSWORD@pip.aetoskia.com/simple mongo-ops\n
From local source:
Bashpip install -e .\n
"},{"location":"#documentation-structure","title":"\ud83d\udcc1 Documentation Structure","text":"Section Description Overview Core concept and architecture overview Core Components BaseDocument, MongoConnectionManager, BaseRepository Use Cases CRUD, transactions, pagination, and more Best Practices Recommended repo structure and patterns Common Patterns Service layer, aggregation, soft deletes Error Handling Centralized error and exception management Testing Pytest configuration and fixtures"},{"location":"#related-resources","title":"\ud83d\udd17 Related Resources","text":" - Source Code: Gitea Repository
- Internal PyPI: pip.aetoskia.com/simple/mongo-ops
- Drone CI: Auto-builds and publishes tagged releases.
\u00a9 Aetoskia Internal \u2014 mongo-ops 0.1.1
"},{"location":"01_overview/","title":"Overview","text":""},{"location":"01_overview/#library-overview","title":"Library Overview","text":"mongo-ops is a modular MongoDB operations layer for FastAPI microservices. It provides:
- Async connection management
- Base document models with auto-timestamps
- Generic CRUD repository pattern
- Transaction support
- Model registration system
"},{"location":"02_components/","title":"Core Components","text":""},{"location":"02_components/#core-components","title":"Core Components","text":""},{"location":"02_components/#1-mongoconnectionmanager","title":"1. MongoConnectionManager","text":"Manages MongoDB connections with async lifecycle. Methods:
connect(uri, db_name, **kwargs) - Connect to MongoDB disconnect() - Close connection get_database() - Get current database instance get_client() - Get current client instance lifespan(uri, db_name, **kwargs) - Context manager for FastAPI lifespan
"},{"location":"02_components/#2-basedocument","title":"2. BaseDocument","text":"Base model for all MongoDB documents. Provides:
id (aliased to _id) - ObjectId created_at - Auto-generated timestamp updated_at - Auto-updated timestamp
"},{"location":"02_components/#3-baserepositoryt","title":"3. BaseRepository[T]","text":"Generic repository with CRUD operations:
create(data: T) -> T get_by_id(id: str | ObjectId) -> Optional[T] get_many(filter, skip, limit, sort) -> List[T] update(id, data: Dict) -> Optional[T] delete(id) -> bool count(filter) -> int
"},{"location":"02_components/#4-transactionmanager","title":"4. TransactionManager","text":"Handles multi-document transactions:
start_session() - Context manager for transactions execute_transaction(operations) - Execute multiple operations atomically
"},{"location":"02_components/#5-modelregistry","title":"5. ModelRegistry","text":"Register and initialize models:
register(collection_name, model, indexes) - Register a model initialize_all() - Create all indexes get_model(collection_name) - Get registered model list_collections() - List all registered collections
"},{"location":"04_best_practices/","title":"Best Practices","text":""},{"location":"04_best_practices/#best-practices","title":"Best Practices","text":" - Always use ModelRegistry.register() before initializing the database connection
- Use lifespan context manager for proper connection lifecycle
- Inherit from BaseDocument for all models to get auto-timestamps
- Create custom repository classes for business logic instead of mixing it with models
- Use transactions for operations that modify multiple documents
- Add indexes during model registration for frequently queried fields
- Implement pagination for list endpoints to avoid performance issues
- Use type hints for better IDE support and type checking
"},{"location":"05_patterns/","title":"Common Patterns","text":""},{"location":"05_patterns/#common-patterns","title":"Common Patterns","text":""},{"location":"05_patterns/#pattern-1-service-layer-with-repository","title":"Pattern 1: Service Layer with Repository","text":"Pythonclass UserService:\n def __init__(self, user_repo: UserRepository):\n self.user_repo = user_repo\n\n async def register_user(self, username: str, email: str) -> User:\n # Check if user exists\n existing = await self.user_repo.collection.find_one({\"email\": email})\n if existing:\n raise ValueError(\"Email already exists\")\n\n # Create user\n user = User(username=username, email=email)\n return await self.user_repo.create(user)\n
"},{"location":"05_patterns/#pattern-2-aggregation-pipeline","title":"Pattern 2: Aggregation Pipeline","text":"Pythonasync def get_user_stats(self, user_id: str) -> dict:\n pipeline = [\n {\"$match\": {\"author_id\": user_id}},\n {\"$group\": {\n \"_id\": None,\n \"total_posts\": {\"$sum\": 1},\n \"total_likes\": {\"$sum\": \"$likes\"}\n }}\n ]\n result = await post_repo.collection.aggregate(pipeline).to_list(1)\n return result[0] if result else {\"total_posts\": 0, \"total_likes\": 0}\n
"},{"location":"05_patterns/#pattern-3-bulk-operations","title":"Pattern 3: Bulk Operations","text":"Pythonasync def bulk_update_status(self, ids: List[str], status: str):\n from bson import ObjectId\n object_ids = [ObjectId(id) for id in ids]\n await self.collection.update_many(\n {\"_id\": {\"$in\": object_ids}},\n {\"$set\": {\"status\": status, \"updated_at\": datetime.utcnow()}}\n )\n
"},{"location":"06_error_handling/","title":"Error Handling","text":""},{"location":"06_error_handling/#error-handling","title":"Error Handling","text":"Pythonfrom fastapi import HTTPException\nfrom pymongo.errors import DuplicateKeyError\nfrom bson.errors import InvalidId\n\n@app.post(\"/users/\")\nasync def create_user(user: User):\n try:\n return await user_repo.create(user)\n except DuplicateKeyError:\n raise HTTPException(status_code=409, detail=\"User already exists\")\n except Exception as e:\n raise HTTPException(status_code=500, detail=str(e))\n\n@app.get(\"/users/{user_id}\")\nasync def get_user(user_id: str):\n try:\n user = await user_repo.get_by_id(user_id)\n if not user:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return user\n except InvalidId:\n raise HTTPException(status_code=400, detail=\"Invalid user ID format\")\n
"},{"location":"07_testing_example/","title":"Testing Example","text":""},{"location":"07_testing_example/#testing-example","title":"Testing Example","text":"Pythonimport pytest\nfrom mongo_ops import (\n MongoConnectionManager,\n BaseDocument,\n BaseRepository,\n)\nfrom pydantic import Field\nfrom typing import Optional\n\n# Define Model\nclass User(BaseDocument):\n username: str = Field(..., min_length=3, max_length=50)\n email: str = Field(...)\n is_active: bool = True\n\n# Create Repository\nclass UserRepository(BaseRepository[User]):\n def __init__(self):\n super().__init__(\"users\", User)\n\n\n@pytest.fixture\nasync def db():\n await MongoConnectionManager.connect(\n uri=\"mongodb://localhost:27017\",\n db_name=\"test_db\"\n )\n yield MongoConnectionManager.get_database()\n await MongoConnectionManager.get_client().drop_database(\"test_db\")\n await MongoConnectionManager.disconnect()\n\n@pytest.mark.asyncio\nasync def test_create_user(db):\n repo = UserRepository()\n user = await repo.create(User(username=\"test\", email=\"test@example.com\"))\n assert user.id is not None\n assert user.username == \"test\"\n\n@pytest.mark.asyncio\nasync def test_get_by_id(db):\n repo = UserRepository()\n created = await repo.create(User(username=\"test\", email=\"test@example.com\"))\n fetched = await repo.get_by_id(str(created.id))\n assert fetched.username == created.username\n
"},{"location":"03_use_cases/01_basic_crud/","title":"Basic CRUD","text":""},{"location":"03_use_cases/01_basic_crud/#use-case-1-basic-fastapi-crud-api","title":"Use Case 1: Basic FastAPI CRUD API","text":"Scenario: Create a simple user management API with CRUD operations.
Pythonfrom fastapi import FastAPI, HTTPException\nfrom contextlib import asynccontextmanager\nfrom mongo_ops import (\n MongoConnectionManager,\n BaseDocument,\n BaseRepository,\n ModelRegistry\n)\nfrom pydantic import Field\nfrom typing import Optional\n\n# Define Model\nclass User(BaseDocument):\n username: str = Field(..., min_length=3, max_length=50)\n email: str = Field(...)\n is_active: bool = True\n\n# Create Repository\nclass UserRepository(BaseRepository[User]):\n def __init__(self):\n super().__init__(\"users\", User)\n\n# Register Model\nModelRegistry.register(\"users\", User, indexes=[(\"email\", 1)])\n\n# Setup FastAPI\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n await MongoConnectionManager.connect(\n uri=\"mongodb://localhost:27017\",\n db_name=\"myapp\"\n )\n await ModelRegistry.initialize_all()\n yield\n await MongoConnectionManager.disconnect()\n\napp = FastAPI(lifespan=lifespan)\nuser_repo = UserRepository()\n\n# Endpoints\n@app.post(\"/users/\", response_model=User)\nasync def create_user(user: User):\n return await user_repo.create(user)\n\n@app.get(\"/users/{user_id}\", response_model=User)\nasync def get_user(user_id: str):\n user = await user_repo.get_by_id(user_id)\n if not user:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return user\n\n@app.get(\"/users/\", response_model=list[User])\nasync def list_users(skip: int = 0, limit: int = 10):\n return await user_repo.get_many(skip=skip, limit=limit)\n\n@app.put(\"/users/{user_id}\", response_model=User)\nasync def update_user(user_id: str, email: str):\n user = await user_repo.update(user_id, {\"email\": email})\n if not user:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return user\n\n@app.delete(\"/users/{user_id}\")\nasync def delete_user(user_id: str):\n deleted = await user_repo.delete(user_id)\n if not deleted:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return {\"message\": \"User deleted successfully\"}\n
"},{"location":"03_use_cases/02_custom_repo/","title":"Custom Repo","text":""},{"location":"03_use_cases/02_custom_repo/#use-case-2-custom-repository-with-business-logic","title":"Use Case 2: Custom Repository with Business Logic","text":"Scenario: E-commerce product catalog with custom search and filtering.
Pythonfrom mongo_ops import BaseDocument, BaseRepository\nfrom typing import Optional, List\nfrom pydantic import Field\n\nclass Product(BaseDocument):\n name: str = Field(..., min_length=1)\n description: str\n price: float = Field(..., gt=0)\n category: str\n in_stock: bool = True\n quantity: int = Field(default=0, ge=0)\n tags: List[str] = []\n\nclass ProductRepository(BaseRepository[Product]):\n def __init__(self):\n super().__init__(\"products\", Product)\n\n async def search_by_name(self, query: str) -> List[Product]:\n \"\"\"Search products by name (case-insensitive)\"\"\"\n docs = await self.collection.find({\n \"name\": {\"$regex\": query, \"$options\": \"i\"}\n }).to_list(length=100)\n return [self.model(**doc) for doc in docs]\n\n async def get_by_category(self, category: str, in_stock_only: bool = True) -> List[Product]:\n \"\"\"Get products by category\"\"\"\n filter_query = {\"category\": category}\n if in_stock_only:\n filter_query[\"in_stock\"] = True\n return await self.get_many(filter=filter_query)\n\n async def get_low_stock(self, threshold: int = 10) -> List[Product]:\n \"\"\"Get products with low stock\"\"\"\n return await self.get_many(\n filter={\"quantity\": {\"$lt\": threshold}, \"in_stock\": True}\n )\n\n async def update_stock(self, product_id: str, quantity_delta: int) -> Optional[Product]:\n \"\"\"Update product stock (increment/decrement)\"\"\"\n result = await self.collection.find_one_and_update(\n {\"_id\": ObjectId(product_id)},\n {\"$inc\": {\"quantity\": quantity_delta}, \"$set\": {\"updated_at\": datetime.utcnow()}},\n return_document=True\n )\n return self.model(**result) if result else None\n\n# Usage in FastAPI\nfrom fastapi import FastAPI, Query\n\napp = FastAPI()\nproduct_repo = ProductRepository()\n\n@app.get(\"/products/search\", response_model=List[Product])\nasync def search_products(q: str = Query(..., min_length=1)):\n return await product_repo.search_by_name(q)\n\n@app.get(\"/products/category/{category}\", response_model=List[Product])\nasync def products_by_category(category: str, in_stock: bool = True):\n return await product_repo.get_by_category(category, in_stock)\n\n@app.get(\"/products/low-stock\", response_model=List[Product])\nasync def low_stock_products(threshold: int = 10):\n return await product_repo.get_low_stock(threshold)\n\n@app.patch(\"/products/{product_id}/stock\")\nasync def update_product_stock(product_id: str, quantity_delta: int):\n product = await product_repo.update_stock(product_id, quantity_delta)\n if not product:\n raise HTTPException(status_code=404, detail=\"Product not found\")\n return product\n
"},{"location":"03_use_cases/03_transactions/","title":"Transactions","text":""},{"location":"03_use_cases/03_transactions/#use-case-3-transaction-support-for-multi-document-operations","title":"Use Case 3: Transaction Support for Multi-Document Operations","text":"Scenario: Order processing system that updates inventory and creates order atomically.
Pythonfrom mongo_ops import BaseDocument, BaseRepository, TransactionManager\nfrom datetime import datetime\nfrom typing import List\nfrom pydantic import Field\n\nclass Order(BaseDocument):\n user_id: str\n items: List[dict] # [{\"product_id\": \"...\", \"quantity\": 2}]\n total_amount: float\n status: str = \"pending\"\n\nclass OrderRepository(BaseRepository[Order]):\n def __init__(self):\n super().__init__(\"orders\", Order)\n\nclass OrderService:\n def __init__(self, order_repo: OrderRepository, product_repo: ProductRepository):\n self.order_repo = order_repo\n self.product_repo = product_repo\n\n async def create_order_with_inventory_update(self, order: Order) -> Order:\n \"\"\"Create order and update inventory atomically\"\"\"\n\n async def create_order_transaction(session):\n # Insert order\n order_doc = order.model_dump(exclude={\"id\"}, exclude_none=True)\n order_doc[\"created_at\"] = datetime.utcnow()\n order_doc[\"updated_at\"] = datetime.utcnow()\n result = await self.order_repo.collection.insert_one(order_doc, session=session)\n\n # Update inventory for each item\n for item in order.items:\n await self.product_repo.collection.update_one(\n {\"_id\": ObjectId(item[\"product_id\"])},\n {\n \"$inc\": {\"quantity\": -item[\"quantity\"]},\n \"$set\": {\"updated_at\": datetime.utcnow()}\n },\n session=session\n )\n\n # Fetch created order\n created = await self.order_repo.collection.find_one(\n {\"_id\": result.inserted_id},\n session=session\n )\n return Order(**created)\n\n # Execute in transaction\n async with TransactionManager.start_session() as session:\n return await create_order_transaction(session)\n\n# Usage in FastAPI\n@app.post(\"/orders/\", response_model=Order)\nasync def create_order(order: Order):\n order_service = OrderService(order_repo, product_repo)\n try:\n return await order_service.create_order_with_inventory_update(order)\n except Exception as e:\n raise HTTPException(status_code=400, detail=str(e))\n
"},{"location":"03_use_cases/04_pagination/","title":"Pagination","text":""},{"location":"03_use_cases/04_pagination/#use-case-4-pagination-filtering","title":"Use Case 4: Pagination & Filtering","text":"Scenario: Blog post API with pagination and filtering.
Pythonfrom mongo_ops import BaseDocument, BaseRepository\nfrom pydantic import BaseModel, Field\nfrom typing import List, Optional, Generic, TypeVar\n\nT = TypeVar(\"T\")\n\nclass PaginatedResponse(BaseModel, Generic[T]):\n items: List[T]\n total: int\n page: int\n page_size: int\n total_pages: int\n has_next: bool\n has_prev: bool\n\nclass BlogPost(BaseDocument):\n title: str\n content: str\n author_id: str\n published: bool = False\n tags: List[str] = []\n views: int = 0\n\nclass BlogPostRepository(BaseRepository[BlogPost]):\n def __init__(self):\n super().__init__(\"blog_posts\", BlogPost)\n\n async def paginate(\n self,\n page: int = 1,\n page_size: int = 10,\n filter_dict: Optional[dict] = None,\n sort_by: str = \"created_at\",\n sort_order: int = -1\n ) -> PaginatedResponse[BlogPost]:\n \"\"\"Get paginated blog posts\"\"\"\n filter_dict = filter_dict or {}\n skip = (page - 1) * page_size\n\n # Get total count\n total = await self.count(filter_dict)\n\n # Get items\n items = await self.get_many(\n filter=filter_dict,\n skip=skip,\n limit=page_size,\n sort=[(sort_by, sort_order)]\n )\n\n total_pages = (total + page_size - 1) // page_size\n\n return PaginatedResponse(\n items=items,\n total=total,\n page=page,\n page_size=page_size,\n total_pages=total_pages,\n has_next=page < total_pages,\n has_prev=page > 1\n )\n\n async def get_by_author(self, author_id: str, published_only: bool = True):\n \"\"\"Get posts by author\"\"\"\n filter_dict = {\"author_id\": author_id}\n if published_only:\n filter_dict[\"published\"] = True\n return await self.get_many(filter=filter_dict, sort=[(\"created_at\", -1)])\n\n async def search_by_tags(self, tags: List[str]) -> List[BlogPost]:\n \"\"\"Search posts by tags\"\"\"\n return await self.get_many(filter={\"tags\": {\"$in\": tags}, \"published\": True})\n\n# Usage in FastAPI\n@app.get(\"/posts/\", response_model=PaginatedResponse[BlogPost])\nasync def list_posts(\n page: int = 1,\n page_size: int = 10,\n published: Optional[bool] = None,\n author_id: Optional[str] = None\n):\n filter_dict = {}\n if published is not None:\n filter_dict[\"published\"] = published\n if author_id:\n filter_dict[\"author_id\"] = author_id\n\n return await blog_repo.paginate(page, page_size, filter_dict)\n\n@app.get(\"/posts/author/{author_id}\", response_model=List[BlogPost])\nasync def posts_by_author(author_id: str, published: bool = True):\n return await blog_repo.get_by_author(author_id, published)\n\n@app.get(\"/posts/tags\", response_model=List[BlogPost])\nasync def posts_by_tags(tags: List[str] = Query(...)):\n return await blog_repo.search_by_tags(tags)\n
"},{"location":"03_use_cases/05_soft_deletes/","title":"Soft Deletes","text":""},{"location":"03_use_cases/05_soft_deletes/#use-case-5-soft-deletes-pattern","title":"Use Case 5: Soft Deletes Pattern","text":"Scenario: Implement soft delete functionality for data recovery.
Pythonfrom mongo_ops import BaseDocument, BaseRepository\nfrom datetime import datetime\nfrom typing import Optional\n\nclass SoftDeleteDocument(BaseDocument):\n \"\"\"Base document with soft delete support\"\"\"\n is_deleted: bool = False\n deleted_at: Optional[datetime] = None\n deleted_by: Optional[str] = None\n\nclass Task(SoftDeleteDocument):\n title: str\n description: str\n assignee_id: str\n status: str = \"pending\"\n priority: str = \"medium\"\n\nclass SoftDeleteRepository(BaseRepository[T]):\n \"\"\"Repository with soft delete operations\"\"\"\n\n async def soft_delete(self, id: str, deleted_by: str = None) -> Optional[T]:\n \"\"\"Soft delete a document\"\"\"\n return await self.update(id, {\n \"is_deleted\": True,\n \"deleted_at\": datetime.utcnow(),\n \"deleted_by\": deleted_by\n })\n\n async def restore(self, id: str) -> Optional[T]:\n \"\"\"Restore a soft-deleted document\"\"\"\n return await self.update(id, {\n \"is_deleted\": False,\n \"deleted_at\": None,\n \"deleted_by\": None\n })\n\n async def get_active(self, skip: int = 0, limit: int = 100):\n \"\"\"Get only non-deleted documents\"\"\"\n return await self.get_many(\n filter={\"is_deleted\": False},\n skip=skip,\n limit=limit\n )\n\n async def get_deleted(self, skip: int = 0, limit: int = 100):\n \"\"\"Get deleted documents\"\"\"\n return await self.get_many(\n filter={\"is_deleted\": True},\n skip=skip,\n limit=limit\n )\n\n async def permanent_delete(self, id: str) -> bool:\n \"\"\"Permanently delete a document\"\"\"\n return await self.delete(id)\n\nclass TaskRepository(SoftDeleteRepository[Task]):\n def __init__(self):\n super().__init__(\"tasks\", Task)\n\n# Usage in FastAPI\n@app.delete(\"/tasks/{task_id}\")\nasync def soft_delete_task(task_id: str, user_id: str):\n task = await task_repo.soft_delete(task_id, deleted_by=user_id)\n if not task:\n raise HTTPException(status_code=404, detail=\"Task not found\")\n return {\"message\": \"Task deleted\", \"task\": task}\n\n@app.post(\"/tasks/{task_id}/restore\")\nasync def restore_task(task_id: str):\n task = await task_repo.restore(task_id)\n if not task:\n raise HTTPException(status_code=404, detail=\"Task not found\")\n return {\"message\": \"Task restored\", \"task\": task}\n\n@app.get(\"/tasks/\", response_model=List[Task])\nasync def list_active_tasks(skip: int = 0, limit: int = 10):\n return await task_repo.get_active(skip, limit)\n\n@app.get(\"/tasks/deleted\", response_model=List[Task])\nasync def list_deleted_tasks(skip: int = 0, limit: int = 10):\n return await task_repo.get_deleted(skip, limit)\n
"},{"location":"03_use_cases/06_multi_model/","title":"Multi Model","text":""},{"location":"03_use_cases/06_multi_model/#use-case-6-multi-model-service-with-registration","title":"Use Case 6: Multi-Model Service with Registration","text":"Scenario: Complete microservice with multiple related models.
Pythonfrom mongo_ops import (\n MongoConnectionManager,\n BaseDocument,\n BaseRepository,\n ModelRegistry\n)\nfrom fastapi import FastAPI\nfrom contextlib import asynccontextmanager\n\n# Define Models\nclass User(BaseDocument):\n username: str\n email: str\n role: str = \"user\"\n\nclass Post(BaseDocument):\n title: str\n content: str\n author_id: str\n likes: int = 0\n\nclass Comment(BaseDocument):\n post_id: str\n user_id: str\n content: str\n\n# Define Repositories\nclass UserRepository(BaseRepository[User]):\n def __init__(self):\n super().__init__(\"users\", User)\n\nclass PostRepository(BaseRepository[Post]):\n def __init__(self):\n super().__init__(\"posts\", Post)\n\nclass CommentRepository(BaseRepository[Comment]):\n def __init__(self):\n super().__init__(\"comments\", Comment)\n\n# Register all models\nModelRegistry.register(\"users\", User, indexes=[(\"email\", 1), (\"username\", 1)])\nModelRegistry.register(\"posts\", Post, indexes=[(\"author_id\", 1), (\"created_at\", -1)])\nModelRegistry.register(\"comments\", Comment, indexes=[(\"post_id\", 1), (\"user_id\", 1)])\n\n# Setup FastAPI with lifespan\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n await MongoConnectionManager.connect(\n uri=\"mongodb://localhost:27017\",\n db_name=\"social_app\"\n )\n await ModelRegistry.initialize_all()\n yield\n await MongoConnectionManager.disconnect()\n\napp = FastAPI(lifespan=lifespan)\n\n# Initialize repositories\nuser_repo = UserRepository()\npost_repo = PostRepository()\ncomment_repo = CommentRepository()\n\n# Endpoints\n@app.post(\"/users/\", response_model=User)\nasync def create_user(user: User):\n return await user_repo.create(user)\n\n@app.post(\"/posts/\", response_model=Post)\nasync def create_post(post: Post):\n return await post_repo.create(post)\n\n@app.post(\"/comments/\", response_model=Comment)\nasync def create_comment(comment: Comment):\n return await comment_repo.create(comment)\n
"}]}
\ No newline at end of file
+{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"\ud83e\udde9 mongo-ops \u2014 Async MongoDB Operations Layer for FastAPI","text":"mongo-ops is a modular, high-performance MongoDB operations library designed for FastAPI microservices. It standardizes repository patterns, async CRUD operations, and model management \u2014 with extensible components for both Beanie and Motor.
"},{"location":"#key-features","title":"\ud83d\ude80 Key Features","text":" - \ud83e\uddf1 Unified repository pattern for MongoDB collections
- \u26a1 Fully asynchronous (Motor-based)
- \ud83e\uddec Pydantic v2 data model integration
- \ud83e\uddf0 Built-in CRUD and aggregation utilities
- \ud83d\udd12 Transaction and session helpers
- \ud83e\udde9 Optional Beanie ORM integration
- \ud83e\uddea Pytest-friendly architecture
"},{"location":"#installation","title":"\ud83d\udce6 Installation","text":"From your internal PyPI:
Bashpip install --extra-index-url https://$PYPI_USERNAME:$PYPI_PASSWORD@pip.aetoskia.com/simple mongo-ops\n
From local source:
Bashpip install -e .\n
"},{"location":"#documentation-structure","title":"\ud83d\udcc1 Documentation Structure","text":"Section Description Overview Core concept and architecture overview Core Components BaseDocument, MongoConnectionManager, BaseRepository Use Cases CRUD, transactions, pagination, and more Best Practices Recommended repo structure and patterns Common Patterns Service layer, aggregation, soft deletes Error Handling Centralized error and exception management Testing Pytest configuration and fixtures"},{"location":"#related-resources","title":"\ud83d\udd17 Related Resources","text":" - Source Code: Gitea Repository
- Internal PyPI: pip.aetoskia.com/simple/mongo-ops
- Drone CI: Auto-builds and publishes tagged releases.
\u00a9 Aetoskia Internal \u2014 mongo-ops 0.1.2
"},{"location":"01_overview/","title":"Overview","text":""},{"location":"01_overview/#library-overview","title":"Library Overview","text":"mongo-ops is a modular MongoDB operations layer for FastAPI microservices. It provides:
- Async connection management
- Base document models with auto-timestamps
- Generic CRUD repository pattern
- Transaction support
- Model registration system
"},{"location":"02_components/","title":"Core Components","text":""},{"location":"02_components/#core-components","title":"Core Components","text":""},{"location":"02_components/#1-mongoconnectionmanager","title":"1. MongoConnectionManager","text":"Manages MongoDB connections with async lifecycle. Methods:
connect(uri, db_name, **kwargs) - Connect to MongoDB disconnect() - Close connection get_database() - Get current database instance get_client() - Get current client instance lifespan(uri, db_name, **kwargs) - Context manager for FastAPI lifespan
"},{"location":"02_components/#2-basedocument","title":"2. BaseDocument","text":"Base model for all MongoDB documents. Provides:
id (aliased to _id) - ObjectId created_at - Auto-generated timestamp updated_at - Auto-updated timestamp
"},{"location":"02_components/#3-baserepositoryt","title":"3. BaseRepository[T]","text":"Generic repository with CRUD operations:
create(data: T) -> T get_by_id(id: str | ObjectId) -> Optional[T] get_many(filter, skip, limit, sort) -> List[T] update(id, data: Dict) -> Optional[T] delete(id) -> bool count(filter) -> int
"},{"location":"02_components/#4-transactionmanager","title":"4. TransactionManager","text":"Handles multi-document transactions:
start_session() - Context manager for transactions execute_transaction(operations) - Execute multiple operations atomically
"},{"location":"02_components/#5-modelregistry","title":"5. ModelRegistry","text":"Register and initialize models:
register(collection_name, model, indexes) - Register a model initialize_all() - Create all indexes get_model(collection_name) - Get registered model list_collections() - List all registered collections
"},{"location":"04_best_practices/","title":"Best Practices","text":""},{"location":"04_best_practices/#best-practices","title":"Best Practices","text":" - Always use ModelRegistry.register() before initializing the database connection
- Use lifespan context manager for proper connection lifecycle
- Inherit from BaseDocument for all models to get auto-timestamps
- Create custom repository classes for business logic instead of mixing it with models
- Use transactions for operations that modify multiple documents
- Add indexes during model registration for frequently queried fields
- Implement pagination for list endpoints to avoid performance issues
- Use type hints for better IDE support and type checking
"},{"location":"05_patterns/","title":"Common Patterns","text":""},{"location":"05_patterns/#common-patterns","title":"Common Patterns","text":""},{"location":"05_patterns/#pattern-1-service-layer-with-repository","title":"Pattern 1: Service Layer with Repository","text":"Pythonclass UserService:\n def __init__(self, user_repo: UserRepository):\n self.user_repo = user_repo\n\n async def register_user(self, username: str, email: str) -> User:\n # Check if user exists\n existing = await self.user_repo.collection.find_one({\"email\": email})\n if existing:\n raise ValueError(\"Email already exists\")\n\n # Create user\n user = User(username=username, email=email)\n return await self.user_repo.create(user)\n
"},{"location":"05_patterns/#pattern-2-aggregation-pipeline","title":"Pattern 2: Aggregation Pipeline","text":"Pythonasync def get_user_stats(self, user_id: str) -> dict:\n pipeline = [\n {\"$match\": {\"author_id\": user_id}},\n {\"$group\": {\n \"_id\": None,\n \"total_posts\": {\"$sum\": 1},\n \"total_likes\": {\"$sum\": \"$likes\"}\n }}\n ]\n result = await post_repo.collection.aggregate(pipeline).to_list(1)\n return result[0] if result else {\"total_posts\": 0, \"total_likes\": 0}\n
"},{"location":"05_patterns/#pattern-3-bulk-operations","title":"Pattern 3: Bulk Operations","text":"Pythonasync def bulk_update_status(self, ids: List[str], status: str):\n from bson import ObjectId\n object_ids = [ObjectId(id) for id in ids]\n await self.collection.update_many(\n {\"_id\": {\"$in\": object_ids}},\n {\"$set\": {\"status\": status, \"updated_at\": datetime.utcnow()}}\n )\n
"},{"location":"06_error_handling/","title":"Error Handling","text":""},{"location":"06_error_handling/#error-handling","title":"Error Handling","text":"Pythonfrom fastapi import HTTPException\nfrom pymongo.errors import DuplicateKeyError\nfrom bson.errors import InvalidId\n\n@app.post(\"/users/\")\nasync def create_user(user: User):\n try:\n return await user_repo.create(user)\n except DuplicateKeyError:\n raise HTTPException(status_code=409, detail=\"User already exists\")\n except Exception as e:\n raise HTTPException(status_code=500, detail=str(e))\n\n@app.get(\"/users/{user_id}\")\nasync def get_user(user_id: str):\n try:\n user = await user_repo.get_by_id(user_id)\n if not user:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return user\n except InvalidId:\n raise HTTPException(status_code=400, detail=\"Invalid user ID format\")\n
"},{"location":"07_testing_example/","title":"Testing Example","text":""},{"location":"07_testing_example/#testing-example","title":"Testing Example","text":"Pythonimport pytest\nfrom mongo_ops import (\n MongoConnectionManager,\n BaseDocument,\n BaseRepository,\n)\nfrom pydantic import Field\nfrom typing import Optional\n\n# Define Model\nclass User(BaseDocument):\n username: str = Field(..., min_length=3, max_length=50)\n email: str = Field(...)\n is_active: bool = True\n\n# Create Repository\nclass UserRepository(BaseRepository[User]):\n def __init__(self):\n super().__init__(\"users\", User)\n\n\n@pytest.fixture\nasync def db():\n await MongoConnectionManager.connect(\n uri=\"mongodb://localhost:27017\",\n db_name=\"test_db\"\n )\n yield MongoConnectionManager.get_database()\n await MongoConnectionManager.get_client().drop_database(\"test_db\")\n await MongoConnectionManager.disconnect()\n\n@pytest.mark.asyncio\nasync def test_create_user(db):\n repo = UserRepository()\n user = await repo.create(User(username=\"test\", email=\"test@example.com\"))\n assert user.id is not None\n assert user.username == \"test\"\n\n@pytest.mark.asyncio\nasync def test_get_by_id(db):\n repo = UserRepository()\n created = await repo.create(User(username=\"test\", email=\"test@example.com\"))\n fetched = await repo.get_by_id(str(created.id))\n assert fetched.username == created.username\n
"},{"location":"03_use_cases/01_basic_crud/","title":"Basic CRUD","text":""},{"location":"03_use_cases/01_basic_crud/#use-case-1-basic-fastapi-crud-api","title":"Use Case 1: Basic FastAPI CRUD API","text":"Scenario: Create a simple user management API with CRUD operations.
Pythonimport os\nfrom fastapi import FastAPI, Depends, HTTPException\nfrom contextlib import asynccontextmanager\nfrom pydantic import Field\nfrom mongo_ops import (\n MongoConnectionManager,\n BaseDocument,\n BaseRepository,\n ModelRegistry,\n)\n\n# ---------------------------\n# Model Definition\n# ---------------------------\nclass User(BaseDocument):\n username: str = Field(..., min_length=3, max_length=50)\n email: str = Field(...)\n is_active: bool = True\n\n\n# ---------------------------\n# Repository\n# ---------------------------\nclass UserRepository(BaseRepository[User]):\n def __init__(self):\n super().__init__(\"users\", User)\n\n\n# Register the model only once\nModelRegistry.register(\"users\", User, indexes=[(\"email\", 1)])\n\n\n# ---------------------------\n# FastAPI Lifespan\n# ---------------------------\n@asynccontextmanager\nasync def lifespan(_app: FastAPI):\n \"\"\"Manage MongoDB connection during the app lifecycle.\"\"\"\n async with MongoConnectionManager.lifespan(\n uri=\"mongodb://localhost:27017\",\n db_name=\"mydb\"\n ):\n await ModelRegistry.initialize_all()\n yield\n\n\napp = FastAPI(lifespan=lifespan)\n\n\n# ---------------------------\n# Dependency Injection\n# ---------------------------\ndef get_user_repository() -> UserRepository:\n \"\"\"Dependency-injected repository for users.\"\"\"\n return UserRepository()\n\n\n# ---------------------------\n# Routes\n# ---------------------------\n@app.post(\"/users/\", response_model=User)\nasync def create_user(user: User, repo: UserRepository = Depends(get_user_repository)):\n return await repo.create(user)\n\n\n@app.get(\"/users/{user_id}\", response_model=User)\nasync def get_user(user_id: str, repo: UserRepository = Depends(get_user_repository)):\n user = await repo.get_by_id(user_id)\n if not user:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return user\n\n\n@app.get(\"/users/\", response_model=list[User])\nasync def list_users(\n skip: int = 0,\n limit: int = 10,\n repo: UserRepository = Depends(get_user_repository),\n):\n return await repo.get_many(skip=skip, limit=limit)\n\n\n@app.put(\"/users/{user_id}\", response_model=User)\nasync def update_user(\n user_id: str,\n email: str,\n repo: UserRepository = Depends(get_user_repository),\n):\n user = await repo.update(user_id, {\"email\": email})\n if not user:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return user\n\n\n@app.delete(\"/users/{user_id}\")\nasync def delete_user(\n user_id: str,\n repo: UserRepository = Depends(get_user_repository),\n):\n deleted = await repo.delete(user_id)\n if not deleted:\n raise HTTPException(status_code=404, detail=\"User not found\")\n return {\"message\": \"User deleted successfully\"}\n
"},{"location":"03_use_cases/02_custom_repo/","title":"Custom Repo","text":""},{"location":"03_use_cases/02_custom_repo/#use-case-2-custom-repository-with-business-logic","title":"Use Case 2: Custom Repository with Business Logic","text":"Scenario: E-commerce product catalog with custom search and filtering.
Pythonfrom mongo_ops import BaseDocument, BaseRepository\nfrom typing import Optional, List\nfrom pydantic import Field\n\nclass Product(BaseDocument):\n name: str = Field(..., min_length=1)\n description: str\n price: float = Field(..., gt=0)\n category: str\n in_stock: bool = True\n quantity: int = Field(default=0, ge=0)\n tags: List[str] = []\n\nclass ProductRepository(BaseRepository[Product]):\n def __init__(self):\n super().__init__(\"products\", Product)\n\n async def search_by_name(self, query: str) -> List[Product]:\n \"\"\"Search products by name (case-insensitive)\"\"\"\n docs = await self.collection.find({\n \"name\": {\"$regex\": query, \"$options\": \"i\"}\n }).to_list(length=100)\n return [self.model(**doc) for doc in docs]\n\n async def get_by_category(self, category: str, in_stock_only: bool = True) -> List[Product]:\n \"\"\"Get products by category\"\"\"\n filter_query = {\"category\": category}\n if in_stock_only:\n filter_query[\"in_stock\"] = True\n return await self.get_many(filter=filter_query)\n\n async def get_low_stock(self, threshold: int = 10) -> List[Product]:\n \"\"\"Get products with low stock\"\"\"\n return await self.get_many(\n filter={\"quantity\": {\"$lt\": threshold}, \"in_stock\": True}\n )\n\n async def update_stock(self, product_id: str, quantity_delta: int) -> Optional[Product]:\n \"\"\"Update product stock (increment/decrement)\"\"\"\n result = await self.collection.find_one_and_update(\n {\"_id\": ObjectId(product_id)},\n {\"$inc\": {\"quantity\": quantity_delta}, \"$set\": {\"updated_at\": datetime.utcnow()}},\n return_document=True\n )\n return self.model(**result) if result else None\n\n# Usage in FastAPI\nfrom fastapi import FastAPI, Query\n\napp = FastAPI()\nproduct_repo = ProductRepository()\n\n@app.get(\"/products/search\", response_model=List[Product])\nasync def search_products(q: str = Query(..., min_length=1)):\n return await product_repo.search_by_name(q)\n\n@app.get(\"/products/category/{category}\", response_model=List[Product])\nasync def products_by_category(category: str, in_stock: bool = True):\n return await product_repo.get_by_category(category, in_stock)\n\n@app.get(\"/products/low-stock\", response_model=List[Product])\nasync def low_stock_products(threshold: int = 10):\n return await product_repo.get_low_stock(threshold)\n\n@app.patch(\"/products/{product_id}/stock\")\nasync def update_product_stock(product_id: str, quantity_delta: int):\n product = await product_repo.update_stock(product_id, quantity_delta)\n if not product:\n raise HTTPException(status_code=404, detail=\"Product not found\")\n return product\n
"},{"location":"03_use_cases/03_transactions/","title":"Transactions","text":""},{"location":"03_use_cases/03_transactions/#use-case-3-transaction-support-for-multi-document-operations","title":"Use Case 3: Transaction Support for Multi-Document Operations","text":"Scenario: Order processing system that updates inventory and creates order atomically.
Pythonfrom mongo_ops import BaseDocument, BaseRepository, TransactionManager\nfrom datetime import datetime\nfrom typing import List\nfrom pydantic import Field\n\nclass Order(BaseDocument):\n user_id: str\n items: List[dict] # [{\"product_id\": \"...\", \"quantity\": 2}]\n total_amount: float\n status: str = \"pending\"\n\nclass OrderRepository(BaseRepository[Order]):\n def __init__(self):\n super().__init__(\"orders\", Order)\n\nclass OrderService:\n def __init__(self, order_repo: OrderRepository, product_repo: ProductRepository):\n self.order_repo = order_repo\n self.product_repo = product_repo\n\n async def create_order_with_inventory_update(self, order: Order) -> Order:\n \"\"\"Create order and update inventory atomically\"\"\"\n\n async def create_order_transaction(session):\n # Insert order\n order_doc = order.model_dump(exclude={\"id\"}, exclude_none=True)\n order_doc[\"created_at\"] = datetime.utcnow()\n order_doc[\"updated_at\"] = datetime.utcnow()\n result = await self.order_repo.collection.insert_one(order_doc, session=session)\n\n # Update inventory for each item\n for item in order.items:\n await self.product_repo.collection.update_one(\n {\"_id\": ObjectId(item[\"product_id\"])},\n {\n \"$inc\": {\"quantity\": -item[\"quantity\"]},\n \"$set\": {\"updated_at\": datetime.utcnow()}\n },\n session=session\n )\n\n # Fetch created order\n created = await self.order_repo.collection.find_one(\n {\"_id\": result.inserted_id},\n session=session\n )\n return Order(**created)\n\n # Execute in transaction\n async with TransactionManager.start_session() as session:\n return await create_order_transaction(session)\n\n# Usage in FastAPI\n@app.post(\"/orders/\", response_model=Order)\nasync def create_order(order: Order):\n order_service = OrderService(order_repo, product_repo)\n try:\n return await order_service.create_order_with_inventory_update(order)\n except Exception as e:\n raise HTTPException(status_code=400, detail=str(e))\n
"},{"location":"03_use_cases/04_pagination/","title":"Pagination","text":""},{"location":"03_use_cases/04_pagination/#use-case-4-pagination-filtering","title":"Use Case 4: Pagination & Filtering","text":"Scenario: Blog post API with pagination and filtering.
Pythonfrom mongo_ops import BaseDocument, BaseRepository\nfrom pydantic import BaseModel, Field\nfrom typing import List, Optional, Generic, TypeVar\n\nT = TypeVar(\"T\")\n\nclass PaginatedResponse(BaseModel, Generic[T]):\n items: List[T]\n total: int\n page: int\n page_size: int\n total_pages: int\n has_next: bool\n has_prev: bool\n\nclass BlogPost(BaseDocument):\n title: str\n content: str\n author_id: str\n published: bool = False\n tags: List[str] = []\n views: int = 0\n\nclass BlogPostRepository(BaseRepository[BlogPost]):\n def __init__(self):\n super().__init__(\"blog_posts\", BlogPost)\n\n async def paginate(\n self,\n page: int = 1,\n page_size: int = 10,\n filter_dict: Optional[dict] = None,\n sort_by: str = \"created_at\",\n sort_order: int = -1\n ) -> PaginatedResponse[BlogPost]:\n \"\"\"Get paginated blog posts\"\"\"\n filter_dict = filter_dict or {}\n skip = (page - 1) * page_size\n\n # Get total count\n total = await self.count(filter_dict)\n\n # Get items\n items = await self.get_many(\n filter=filter_dict,\n skip=skip,\n limit=page_size,\n sort=[(sort_by, sort_order)]\n )\n\n total_pages = (total + page_size - 1) // page_size\n\n return PaginatedResponse(\n items=items,\n total=total,\n page=page,\n page_size=page_size,\n total_pages=total_pages,\n has_next=page < total_pages,\n has_prev=page > 1\n )\n\n async def get_by_author(self, author_id: str, published_only: bool = True):\n \"\"\"Get posts by author\"\"\"\n filter_dict = {\"author_id\": author_id}\n if published_only:\n filter_dict[\"published\"] = True\n return await self.get_many(filter=filter_dict, sort=[(\"created_at\", -1)])\n\n async def search_by_tags(self, tags: List[str]) -> List[BlogPost]:\n \"\"\"Search posts by tags\"\"\"\n return await self.get_many(filter={\"tags\": {\"$in\": tags}, \"published\": True})\n\n# Usage in FastAPI\n@app.get(\"/posts/\", response_model=PaginatedResponse[BlogPost])\nasync def list_posts(\n page: int = 1,\n page_size: int = 10,\n published: Optional[bool] = None,\n author_id: Optional[str] = None\n):\n filter_dict = {}\n if published is not None:\n filter_dict[\"published\"] = published\n if author_id:\n filter_dict[\"author_id\"] = author_id\n\n return await blog_repo.paginate(page, page_size, filter_dict)\n\n@app.get(\"/posts/author/{author_id}\", response_model=List[BlogPost])\nasync def posts_by_author(author_id: str, published: bool = True):\n return await blog_repo.get_by_author(author_id, published)\n\n@app.get(\"/posts/tags\", response_model=List[BlogPost])\nasync def posts_by_tags(tags: List[str] = Query(...)):\n return await blog_repo.search_by_tags(tags)\n
"},{"location":"03_use_cases/05_soft_deletes/","title":"Soft Deletes","text":""},{"location":"03_use_cases/05_soft_deletes/#use-case-5-soft-deletes-pattern","title":"Use Case 5: Soft Deletes Pattern","text":"Scenario: Implement soft delete functionality for data recovery.
Pythonfrom mongo_ops import BaseDocument, BaseRepository\nfrom datetime import datetime\nfrom typing import Optional\n\nclass SoftDeleteDocument(BaseDocument):\n \"\"\"Base document with soft delete support\"\"\"\n is_deleted: bool = False\n deleted_at: Optional[datetime] = None\n deleted_by: Optional[str] = None\n\nclass Task(SoftDeleteDocument):\n title: str\n description: str\n assignee_id: str\n status: str = \"pending\"\n priority: str = \"medium\"\n\nclass SoftDeleteRepository(BaseRepository[T]):\n \"\"\"Repository with soft delete operations\"\"\"\n\n async def soft_delete(self, id: str, deleted_by: str = None) -> Optional[T]:\n \"\"\"Soft delete a document\"\"\"\n return await self.update(id, {\n \"is_deleted\": True,\n \"deleted_at\": datetime.utcnow(),\n \"deleted_by\": deleted_by\n })\n\n async def restore(self, id: str) -> Optional[T]:\n \"\"\"Restore a soft-deleted document\"\"\"\n return await self.update(id, {\n \"is_deleted\": False,\n \"deleted_at\": None,\n \"deleted_by\": None\n })\n\n async def get_active(self, skip: int = 0, limit: int = 100):\n \"\"\"Get only non-deleted documents\"\"\"\n return await self.get_many(\n filter={\"is_deleted\": False},\n skip=skip,\n limit=limit\n )\n\n async def get_deleted(self, skip: int = 0, limit: int = 100):\n \"\"\"Get deleted documents\"\"\"\n return await self.get_many(\n filter={\"is_deleted\": True},\n skip=skip,\n limit=limit\n )\n\n async def permanent_delete(self, id: str) -> bool:\n \"\"\"Permanently delete a document\"\"\"\n return await self.delete(id)\n\nclass TaskRepository(SoftDeleteRepository[Task]):\n def __init__(self):\n super().__init__(\"tasks\", Task)\n\n# Usage in FastAPI\n@app.delete(\"/tasks/{task_id}\")\nasync def soft_delete_task(task_id: str, user_id: str):\n task = await task_repo.soft_delete(task_id, deleted_by=user_id)\n if not task:\n raise HTTPException(status_code=404, detail=\"Task not found\")\n return {\"message\": \"Task deleted\", \"task\": task}\n\n@app.post(\"/tasks/{task_id}/restore\")\nasync def restore_task(task_id: str):\n task = await task_repo.restore(task_id)\n if not task:\n raise HTTPException(status_code=404, detail=\"Task not found\")\n return {\"message\": \"Task restored\", \"task\": task}\n\n@app.get(\"/tasks/\", response_model=List[Task])\nasync def list_active_tasks(skip: int = 0, limit: int = 10):\n return await task_repo.get_active(skip, limit)\n\n@app.get(\"/tasks/deleted\", response_model=List[Task])\nasync def list_deleted_tasks(skip: int = 0, limit: int = 10):\n return await task_repo.get_deleted(skip, limit)\n
"},{"location":"03_use_cases/06_multi_model/","title":"Multi Model","text":""},{"location":"03_use_cases/06_multi_model/#use-case-6-multi-model-service-with-registration","title":"Use Case 6: Multi-Model Service with Registration","text":"Scenario: Complete microservice with multiple related models.
Pythonfrom mongo_ops import (\n MongoConnectionManager,\n BaseDocument,\n BaseRepository,\n ModelRegistry\n)\nfrom fastapi import FastAPI\nfrom contextlib import asynccontextmanager\n\n# Define Models\nclass User(BaseDocument):\n username: str\n email: str\n role: str = \"user\"\n\nclass Post(BaseDocument):\n title: str\n content: str\n author_id: str\n likes: int = 0\n\nclass Comment(BaseDocument):\n post_id: str\n user_id: str\n content: str\n\n# Define Repositories\nclass UserRepository(BaseRepository[User]):\n def __init__(self):\n super().__init__(\"users\", User)\n\nclass PostRepository(BaseRepository[Post]):\n def __init__(self):\n super().__init__(\"posts\", Post)\n\nclass CommentRepository(BaseRepository[Comment]):\n def __init__(self):\n super().__init__(\"comments\", Comment)\n\n# Register all models\nModelRegistry.register(\"users\", User, indexes=[(\"email\", 1), (\"username\", 1)])\nModelRegistry.register(\"posts\", Post, indexes=[(\"author_id\", 1), (\"created_at\", -1)])\nModelRegistry.register(\"comments\", Comment, indexes=[(\"post_id\", 1), (\"user_id\", 1)])\n\n# Setup FastAPI with lifespan\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n await MongoConnectionManager.connect(\n uri=\"mongodb://localhost:27017\",\n db_name=\"social_app\"\n )\n await ModelRegistry.initialize_all()\n yield\n await MongoConnectionManager.disconnect()\n\napp = FastAPI(lifespan=lifespan)\n\n# Initialize repositories\nuser_repo = UserRepository()\npost_repo = PostRepository()\ncomment_repo = CommentRepository()\n\n# Endpoints\n@app.post(\"/users/\", response_model=User)\nasync def create_user(user: User):\n return await user_repo.create(user)\n\n@app.post(\"/posts/\", response_model=Post)\nasync def create_post(post: Post):\n return await post_repo.create(post)\n\n@app.post(\"/comments/\", response_model=Comment)\nasync def create_comment(comment: Comment):\n return await comment_repo.create(comment)\n
"}]}
\ No newline at end of file
diff --git a/mongo-ops/site/sitemap.xml.gz b/mongo-ops/site/sitemap.xml.gz
index 5d8c1c4..342ef6d 100644
Binary files a/mongo-ops/site/sitemap.xml.gz and b/mongo-ops/site/sitemap.xml.gz differ