py-async-patterns by aiskillstore
Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.
Content & Writing
85 Stars
2 Forks
Updated Jan 19, 2026, 04:39 AM
Why Use This
This skill provides specialized capabilities for aiskillstore's codebase.
Use Cases
- Developing new features in the aiskillstore repository
- Refactoring existing code to follow aiskillstore standards
- Understanding and working with aiskillstore's codebase structure
Install Guide
2 steps- 1
Skip this step if Ananke is already installed.
- 2
Skill Snapshot
Auto scan of skill assets. Informational only.
Valid SKILL.md
Checks against SKILL.md specification
Source & Community
Skill Stats
SKILL.md 359 Lines
Total Files 1
Total Size 0 B
License NOASSERTION
---
name: py-async-patterns
description: Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.
---
# Python Async Patterns
## Problem Statement
Async Python is powerful but error-prone. Race conditions, session leaks, and connection pool issues are common pitfalls in async codebases.
---
## Pattern: AsyncSession Lifecycle
**Problem:** Session must be scoped to request. Leaking sessions causes stale data and connection exhaustion.
```python
# ✅ CORRECT: Session scoped to request via dependency
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
# Session automatically closed after request
# Usage in endpoint
@router.get("/users/{user_id}")
async def get_user(
user_id: UUID,
session: AsyncSession = Depends(get_session),
) -> UserRead:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one()
# ❌ WRONG: Global session (stale data, connection leaks)
_global_session = None # NEVER do this
async def get_user(user_id: UUID):
result = await _global_session.execute(...) # Stale, shared state
```
**Why it matters:** Each request needs isolated database state. Shared sessions see stale data and can't be safely committed.
---
## Pattern: Concurrent vs Sequential Queries
**Problem:** Running independent queries sequentially wastes time. But dependent queries must be sequential.
```python
# ✅ CORRECT: Concurrent independent queries
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
# These don't depend on each other - run in parallel
user_result, stats_result, recent_result = await asyncio.gather(
session.execute(select(User).where(User.id == user_id)),
session.execute(select(UserStats).where(UserStats.user_id == user_id)),
session.execute(
select(Activity)
.where(Activity.user_id == user_id)
.order_by(Activity.created_at.desc())
.limit(10)
),
)
return {
"user": user_result.scalar_one(),
"stats": stats_result.scalar_one_or_none(),
"recent": recent_result.scalars().all(),
}
# ❌ WRONG: Sequential when parallel is safe
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
user = await session.execute(...) # Wait...
stats = await session.execute(...) # Wait more...
recent = await session.execute(...) # Even more waiting
# Total time = sum of all queries
# ✅ CORRECT: Sequential when queries depend on each other
async def get_user_with_team(user_id: UUID, session: AsyncSession):
# Must get user first to know team_id
user_result = await session.execute(
select(User).where(User.id == user_id)
)
user = user_result.scalar_one()
# Now we can query team
team_result = await session.execute(
select(Team).where(Team.id == user.team_id)
)
return user, team_result.scalar_one()
```
**Decision framework:**
| Queries share data? | Use |
|---------------------|-----|
| No (independent) | `asyncio.gather()` |
| Yes (dependent) | Sequential `await` |
---
## Pattern: Transaction Boundaries
**Problem:** Knowing when to commit, rollback, and refresh.
```python
# ✅ CORRECT: Explicit transaction for multi-step operations
async def transfer_player(
player_id: UUID,
from_team_id: UUID,
to_team_id: UUID,
session: AsyncSession,
):
try:
# All operations in one transaction
player = await session.get(Player, player_id)
player.team_id = to_team_id
from_team = await session.get(Team, from_team_id)
from_team.player_count -= 1
to_team = await session.get(Team, to_team_id)
to_team.player_count += 1
await session.commit()
except Exception:
await session.rollback()
raise
# ✅ CORRECT: Using context manager
async with session.begin():
# All operations here are in a transaction
# Auto-commits on success, auto-rollbacks on exception
player.team_id = to_team_id
from_team.player_count -= 1
to_team.player_count += 1
# ✅ CORRECT: Refresh after commit to get DB-generated values
await session.commit()
await session.refresh(new_entity) # Get id, created_at, etc.
return new_entity
```
**When to use what:**
| Scenario | Pattern |
|----------|---------|
| Single create/update | `session.add()` + `commit()` at request end |
| Multi-step operation | Explicit `begin()` / `commit()` / `rollback()` |
| Need DB-generated values | `refresh()` after commit |
| Read-only query | No commit needed |
---
## Pattern: Connection Pool Management
**Problem:** Exhausting connection pool causes requests to hang.
```python
# This codebase uses NullPool for async - understand why
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # No connection pooling
)
# NullPool: Each request gets new connection, closes after
# Why: Avoids issues with asyncpg + connection reuse
# Tradeoff: Slightly more connection overhead
# ✅ CORRECT: Always close sessions (handled by Depends)
async with async_session() as session:
# Work with session
pass # Session closed here
# ❌ WRONG: Forgetting to close
session = async_session()
result = await session.execute(query)
# Session never closed - connection leak!
```
---
## Pattern: Background Tasks
**Problem:** Long-running work shouldn't block the response.
```python
from fastapi import BackgroundTasks
# ✅ CORRECT: FastAPI BackgroundTasks for request-scoped work
@router.post("/assessments/{id}/submit")
async def submit_assessment(
id: UUID,
session: AsyncSession = Depends(get_session),
background_tasks: BackgroundTasks,
) -> AssessmentResult:
# Quick work - return response
result = await process_submission(id, session)
# Slow work - do after response
background_tasks.add_task(send_completion_email, result.user_email)
background_tasks.add_task(update_analytics, result)
return result
# ✅ CORRECT: asyncio.create_task for fire-and-forget
async def process_with_side_effect():
result = await main_operation()
# Fire and forget - don't await
asyncio.create_task(log_to_external_service(result))
return result
# ❌ WRONG: Awaiting non-critical slow operations
async def slow_endpoint():
result = await main_operation()
await send_email(result) # User waits for email...
await update_analytics(result) # User still waiting...
return result
```
**When to use what:**
| Scenario | Pattern |
|----------|---------|
| Post-response cleanup | `BackgroundTasks` |
| Fire-and-forget logging | `asyncio.create_task()` |
| Must complete before response | Direct `await` |
---
## Pattern: Avoiding Deadlocks
**Problem:** Concurrent operations acquiring locks in different order.
```python
# ❌ WRONG: Potential deadlock
async def transfer_both_ways():
# Task 1: Lock A, then B
# Task 2: Lock B, then A
# = Deadlock if interleaved
pass
# ✅ CORRECT: Consistent lock ordering
async def transfer_credits(
from_id: UUID,
to_id: UUID,
amount: int,
session: AsyncSession,
):
# Always lock in consistent order (e.g., by UUID)
first_id, second_id = sorted([from_id, to_id])
# Lock in consistent order
first = await session.get(Account, first_id, with_for_update=True)
second = await session.get(Account, second_id, with_for_update=True)
# Now safe to modify
if from_id == first_id:
first.balance -= amount
second.balance += amount
else:
second.balance -= amount
first.balance += amount
await session.commit()
```
---
## Pattern: Post-Condition Validation
Same principle as frontend - verify async operations succeeded:
```python
# ✅ CORRECT: Validate after async operations
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
assessment = Assessment(**data.model_dump())
session.add(assessment)
await session.commit()
await session.refresh(assessment)
# Validate post-condition
if assessment.id is None:
raise RuntimeError("Assessment creation failed - no ID assigned")
return assessment
# ✅ CORRECT: Validate data was actually loaded
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(404, f"User {user_id} not found")
return user
```
---
## Pattern: Logging Async Operations
```python
import structlog
logger = structlog.get_logger()
async def complex_operation(user_id: UUID, session: AsyncSession):
logger.info("complex_operation.start", user_id=str(user_id))
try:
result = await step_one(session)
logger.debug("complex_operation.step_one_complete", result_count=len(result))
await step_two(result, session)
logger.debug("complex_operation.step_two_complete")
await session.commit()
logger.info("complex_operation.success", user_id=str(user_id))
except Exception as e:
logger.error("complex_operation.failed",
user_id=str(user_id),
error=str(e),
step="unknown"
)
raise
```
---
## Common Issues
| Issue | Likely Cause | Solution |
|-------|--------------|----------|
| "Session is closed" | Using session after request ends | Keep session in request scope |
| Connection timeout | Pool exhausted | Check for session leaks |
| Stale data | Shared session or missing refresh | Scope session to request, refresh after commit |
| Deadlock | Inconsistent lock ordering | Always acquire locks in same order |
| Slow endpoint | Sequential queries that could be parallel | Use `asyncio.gather()` |
---
## Detection Commands
```bash
# Find potential session leaks (global sessions)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"
# Find sequential queries that might be parallelizable
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"
# Find missing awaits
ruff check --select=RUF006 # asyncio dangling task
```
Name Size