Worker Configuration
Workers are responsible for executing tasks from the queue. This guide covers worker configuration, task handling, and best practices.
Basic Worker Setup
from restq import Worker, task
@task(name="my_task")
async def my_task(arg1: str, arg2: int):
# Task implementation
pass
worker = Worker(
queue_name="my-queue",
url="redis://localhost:6379/0",
tasks=[my_task],
name="worker-1" # Optional worker name
)
await worker.start()
Task Definition
Basic Task
Task with Retries
@task(
name="reliable_task",
max_retry=3, # Maximum retry attempts
retry_delay=5 # Seconds between retries
)
async def reliable_task(data: dict):
# Implementation
pass
Worker Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
queue_name |
str | Yes | Queue to process tasks from |
url |
str | Yes | Redis connection URL |
tasks |
list | Yes | List of task handlers |
name |
str | No | Worker identifier |
Task Decorator Options
| Option | Type | Default | Description |
|---|---|---|---|
name |
str | Required | Unique task identifier |
max_retry |
int | None | Maximum retry attempts |
retry_delay |
int | 0 | Seconds between retries |
Error Handling
Task-Level Error Handling
@task(name="handle_errors")
async def handle_errors(data: dict):
try:
result = await process_data(data)
except ValueError as e:
# Handle validation errors
logger.error(f"Validation error: {e}")
raise
except ConnectionError as e:
# Handle connection issues
logger.error(f"Connection failed: {e}")
raise
Retry Behavior
@task(
name="retry_task",
max_retry=3,
retry_delay=5
)
async def retry_task(user_id: str):
try:
await external_service_call(user_id)
except TemporaryError:
# Will be retried automatically
raise
except PermanentError:
# Won't be retried
logger.error("Permanent failure")
return
Worker Lifecycle
Starting Workers
import asyncio
from restq import Worker
async def main():
worker = Worker(
queue_name="my-queue",
url="redis://localhost:6379/0",
tasks=[my_task]
)
await worker.start()
if __name__ == "__main__":
asyncio.run(main())
Graceful Shutdown
The worker handles SIGINT and SIGTERM signals automatically, ensuring: 1. Current tasks complete 2. No new tasks are accepted 3. Resources are properly cleaned up
Production Considerations
Worker Naming
Use descriptive names to identify workers:
worker = Worker(
queue_name="email-queue",
url=REDIS_URL,
tasks=[send_email],
name="email-worker-001"
)
Deployment Strategies
Single Worker
Multiple Workers
Run multiple worker processes for better throughput:
# worker1.py
worker = Worker(
queue_name="main-queue",
url=REDIS_URL,
tasks=[task1, task2],
name="worker-1"
)
# worker2.py
worker = Worker(
queue_name="main-queue",
url=REDIS_URL,
tasks=[task1, task2],
name="worker-2"
)
Specialized Workers
Dedicate workers to specific task types: