Task Decorator API Reference
Overview
The @task decorator is used to define task handlers in RestQ. It provides configuration options for task execution and retry behavior.
Basic Usage
from restq import task
@task(name="my_task")
async def my_task(arg1: str, arg2: int):
# Task implementation
pass
Constructor
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str | Yes | - | Unique task identifier |
max_retry |
int | No | None | Maximum retry attempts |
retry_delay |
int | No | None | Seconds between retries |
Examples
Basic Task
Task with Retries
@task(
name="reliable_task",
max_retry=3,
retry_delay=5
)
async def reliable_task(data: dict):
try:
await process_data(data)
except TemporaryError:
# Will be retried up to 3 times
raise
Synchronous Task
Task Handler Requirements
Function Signature
- Can be async or sync
- Must accept kwargs defined in task call
- Return value is ignored
Error Handling
- Unhandled exceptions trigger retry mechanism
- After max retries, task is marked as failed
Best Practices
1. Unique Names
# Good - Clear, unique names
@task(name="send_welcome_email")
async def send_welcome_email(user_id: str):
pass
@task(name="send_password_reset")
async def send_password_reset(user_id: str):
pass
# Bad - Generic, potentially conflicting names
@task(name="send_email")
async def send_welcome(user_id: str):
pass
@task(name="send_email") # Name conflict!
async def send_reset(user_id: str):
pass
2. Type Hints
@task(name="process_user")
async def process_user(
user_id: str,
update_data: dict[str, Any],
notify: bool = False
) -> None:
# Implementation with clear type hints
pass
3. Documentation
@task(name="process_payment")
async def process_payment(
amount: Decimal,
currency: str,
user_id: str
) -> None:
"""Process a payment transaction.
Args:
amount: Transaction amount
currency: Three-letter currency code
user_id: User identifier
Raises:
PaymentError: If payment processing fails
ValidationError: If input data is invalid
"""
pass
4. Retry Configuration
# Short-lived task, no retries needed
@task(name="cache_data")
async def cache_data(key: str, value: Any):
pass
# Network operation, retry with backoff
@task(
name="external_api_call",
max_retry=3,
retry_delay=5
)
async def external_api_call(endpoint: str, data: dict):
pass
Advanced Usage
Custom Error Handling
@task(
name="handle_errors",
max_retry=3,
retry_delay=5
)
async def handle_errors(data: dict):
try:
await process_data(data)
except TemporaryError:
# Will be retried
raise
except PermanentError:
# Log error but don't retry
logger.error("Permanent error occurred")
return
except Exception as e:
# Unexpected error
logger.critical(f"Unexpected error: {e}")
raise
Resource Management
@task(name="db_operation")
async def db_operation(query: str, params: dict):
async with get_db_connection() as conn:
async with conn.transaction():
return await conn.execute(query, **params)