Why Use This This skill provides specialized capabilities for psincraian's codebase.
Use Cases Developing new features in the psincraian repository Refactoring existing code to follow psincraian standards Understanding and working with psincraian's codebase structure
Install Guide 2 steps 1 2 Install inside Ananke
Click Install Skill, paste the link below, then press Install.
https://github.com/psincraian/myfy/tree/main/plugins/claude-code/skills/tasks-module Skill Snapshot Auto scan of skill assets. Informational only.
Valid SKILL.md Checks against SKILL.md specification
Source & Community
Updated At Jan 12, 2026, 08:05 PM
Skill Stats
SKILL.md 231 Lines
Total Files 1
Total Size 0 B
License NOASSERTION
---
name: tasks-module
description: myfy TasksModule for background job processing with SQL-based queue. Use when working with TasksModule, @task decorator, background jobs, task workers, TaskContext, task retries, or async task dispatch.
---
# TasksModule - Background Jobs
TasksModule provides SQL-based async task processing with DI injection and automatic retries.
## Quick Start
```python
from myfy.core import Application
from myfy.data import DataModule
from myfy.tasks import TasksModule, task
app = Application()
app.add_module(DataModule())
app.add_module(TasksModule(auto_create_tables=True))
# Define a task
@task
async def send_email(to: str, subject: str, body: str) -> None:
await email_service.send(to, subject, body)
# Dispatch from a route
@route.post("/notifications")
async def notify_user(body: NotifyRequest) -> dict:
task_id = await send_email.send(
to=body.email,
subject="Welcome!",
body="Thanks for signing up.",
)
return {"task_id": task_id}
```
## Configuration
Environment variables use the `MYFY_TASKS_` prefix:
| Variable | Default | Description |
|----------|---------|-------------|
| `MYFY_TASKS_DEFAULT_MAX_RETRIES` | `3` | Default retry attempts |
| `MYFY_TASKS_RETRY_DELAY_SECONDS` | `60.0` | Seconds between retries |
| `MYFY_TASKS_WORKER_CONCURRENCY` | `4` | Concurrent tasks per worker |
| `MYFY_TASKS_POLL_INTERVAL` | `1.0` | Seconds between queue polls |
| `MYFY_TASKS_TASK_TIMEOUT` | `300.0` | Max seconds per task |
## Defining Tasks
### Basic Task
```python
from myfy.tasks import task
@task
async def process_order(order_id: int) -> str:
# Process the order
return f"Processed order {order_id}"
```
### Task with DI Injection
Services are automatically injected at runtime:
```python
from myfy.tasks import task
from myfy.data import AsyncSession
@task
async def sync_user_data(user_id: int, session: AsyncSession) -> None:
# session is TASK-scoped (injected per task execution)
user = await session.get(User, user_id)
await sync_to_external_service(user)
```
### Task with Custom Options
```python
@task(max_retries=5, retry_on=[ConnectionError, TimeoutError])
async def upload_file(file_path: str) -> str:
# Retries up to 5 times on connection/timeout errors
return await s3.upload(file_path)
```
## Dispatching Tasks
### Basic Dispatch
```python
# Returns immediately with task_id
task_id = await send_email.send(to="[email protected] ", subject="Hi")
```
### Dispatch Options
```python
task_id = await send_email.send(
to="[email protected] ",
subject="Hi",
_priority=10, # Higher priority = executes first
_delay=60, # Wait 60 seconds before executing
_max_retries=5, # Override default retries
)
```
### Getting Results
```python
result = await send_email.get_result(task_id, timeout=60)
if result.is_completed:
print(f"Success: {result.value}")
elif result.is_failed:
print(f"Error: {result.error}")
elif result.is_pending:
print("Still processing...")
```
## TaskContext for Progress
Report progress from long-running tasks:
```python
from myfy.tasks import task, TaskContext
@task
async def import_users(file_path: str, ctx: TaskContext) -> int:
users = load_users_from_file(file_path)
total = len(users)
for i, user in enumerate(users):
await create_user(user)
await ctx.update_progress(
current=i + 1,
total=total,
message=f"Importing user {i + 1}/{total}",
)
return total
```
Check progress from caller:
```python
result = await import_users.get_result(task_id)
if result.progress:
current, total = result.progress
print(f"Progress: {current}/{total} - {result.progress_message}")
```
## Running Workers
Start a worker process:
```bash
myfy tasks worker
```
With options:
```bash
myfy tasks worker --concurrency 8 --poll-interval 0.5
```
Workers:
- Poll the database for pending tasks
- Execute tasks with full DI injection
- Handle retries automatically
- Report progress and results
- Gracefully shutdown on SIGTERM
## Task States
| Status | Description |
|--------|-------------|
| `pending` | Queued, waiting for worker |
| `running` | Being executed by worker |
| `completed` | Finished successfully |
| `failed` | Failed after all retries |
| `cancelled` | Manually cancelled |
## Error Handling
Tasks automatically retry on failure:
```python
@task(max_retries=3, retry_on=[APIError])
async def call_api(url: str) -> dict:
response = await http.get(url)
if response.status >= 500:
raise APIError("Server error") # Will retry
return response.json()
```
After all retries fail:
- Task status becomes `failed`
- Error message and traceback are stored
- Can be retrieved via `get_result()`
## Parameter Classification
| Type | Behavior |
|------|----------|
| Primitives (`str`, `int`, `float`, `bool`) | Serialized as task args |
| Lists, dicts | Serialized as task args |
| TaskContext | Injected by worker |
| Services (other types) | DI injected at runtime |
```python
@task
async def complex_task(
order_id: int, # Serialized (primitive)
items: list[str], # Serialized (list)
ctx: TaskContext, # Injected (context)
session: AsyncSession, # DI injected (service)
settings: AppSettings, # DI injected (service)
) -> None:
...
```
## Best Practices
1. **Keep tasks idempotent** - Safe to retry on failure
2. **Serialize only primitives** - Complex objects should be loaded in task
3. **Use TaskContext** - Report progress for long tasks
4. **Set appropriate timeouts** - Prevent zombie tasks
5. **Monitor worker logs** - Watch for repeated failures
6. **Use priorities** - Critical tasks get processed first
7. **Handle cleanup** - TaskContext supports cleanup callbacks