Examples¶
Real-world usage examples for dramatiq-kombu-broker.
Basic Task Processing¶
import dramatiq
from dramatiq_kombu_broker import ConnectionPooledKombuBroker
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://guest:guest@localhost:5672/"
}
)
dramatiq.set_broker(broker)
@dramatiq.actor
def send_welcome_email(user_id: int, email: str):
# Send email logic here
print(f"Sending welcome email to {email}")
return True
# Usage
send_welcome_email.send(123, "user@example.com")
Delayed Tasks¶
@dramatiq.actor
def send_reminder(user_id: int):
print(f"Sending reminder to user {user_id}")
# Send after 1 hour
send_reminder.send_with_options(
args=(123,),
delay=3600000 # 1 hour in milliseconds
)
# Send tomorrow at 9 AM
import datetime
now = datetime.datetime.now()
tomorrow_9am = now.replace(hour=9, minute=0, second=0) + datetime.timedelta(days=1)
delay_ms = int((tomorrow_9am - now).total_seconds() * 1000)
send_reminder.send_with_options(args=(123,), delay=delay_ms)
Priority Queue¶
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "amqp://..."},
max_priority=10, # Enable priorities 0-10
)
dramatiq.set_broker(broker)
@dramatiq.actor
def process_order(order_id: int):
print(f"Processing order {order_id}")
# Normal priority (0)
process_order.send(100)
# High priority (10)
process_order.send_with_options(args=(200,), broker_priority=10)
# Low priority (1)
process_order.send_with_options(args=(300,), broker_priority=1)
Multiple Queues¶
@dramatiq.actor(queue_name="critical")
def urgent_task(data):
print(f"Processing urgent: {data}")
@dramatiq.actor(queue_name="background")
def slow_task(data):
print(f"Processing background: {data}")
# Start workers for specific queues:
# dramatiq tasks --queues critical
# dramatiq tasks --queues background
Custom Default Queue Name¶
Replace the default queue name for all actors without changing their code:
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "amqp://..."},
default_queue_name="myapp", # Replace "default" with "myapp"
)
dramatiq.set_broker(broker)
# This actor will use "myapp" queue (automatically replaced from "default")
@dramatiq.actor
def send_email(to: str, subject: str):
print(f"Sending email to {to}")
# This actor keeps its explicit "notifications" queue (no replacement)
@dramatiq.actor(queue_name="notifications")
def send_push_notification(user_id: int, message: str):
print(f"Push to user {user_id}: {message}")
# Usage
send_email.send("user@example.com", "Welcome!") # Goes to "myapp" queue
send_push_notification.send(123, "Hello!") # Goes to "notifications" queue
When to use:
- Namespace queues by application name in shared RabbitMQ
- Migrate from another broker that used different queue naming
- Run multiple environments (dev, staging, prod) on same RabbitMQ
See Configuration for detailed explanation.
Retries and Error Handling¶
@dramatiq.actor(
max_retries=3,
min_backoff=1000, # Start with 1 second
max_backoff=60000, # Max 60 seconds
)
def flaky_api_call(url: str):
import requests
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
# Will retry up to 3 times with exponential backoff
flaky_api_call.send("https://api.example.com/data")
Django Integration¶
# settings.py
INSTALLED_APPS = [
# ...
'django_dramatiq',
]
DRAMATIQ_BROKER = {
"BROKER": "dramatiq_kombu_broker.broker.ConnectionSharedKombuBroker",
"OPTIONS": {
"kombu_connection_options": {
"hostname": "amqp://guest:guest@localhost:5672/",
},
"connection_holder_options": {
"consumer_channel_pool_size": 5,
},
"max_priority": 10,
},
}
# myapp/tasks.py
import dramatiq
@dramatiq.actor
def process_user_signup(user_id: int):
from myapp.models import User
user = User.objects.get(id=user_id)
# Send welcome email, create profile, etc.
# myapp/views.py
from myapp.tasks import process_user_signup
def signup_view(request):
user = User.objects.create(...)
process_user_signup.send(user.id)
return HttpResponse("Signup successful!")
Batch Processing¶
@dramatiq.actor
def process_batch(item_ids: list[int]):
for item_id in item_ids:
process_item(item_id)
# Send batches
items = list(range(1000))
batch_size = 100
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
process_batch.send(batch)
Periodic Tasks¶
# Use APScheduler with Dramatiq
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@dramatiq.actor
def daily_report():
# Generate and send daily report
print("Generating daily report...")
@scheduler.scheduled_job('cron', hour=9, minute=0)
def schedule_daily_report():
daily_report.send()
# Run scheduler
scheduler.start()
Chain Tasks¶
@dramatiq.actor
def download_file(url: str) -> str:
# Download and return file path
return "/tmp/downloaded_file.pdf"
@dramatiq.actor
def process_file(file_path: str) -> dict:
# Process file and return results
return {"processed": True, "path": file_path}
@dramatiq.actor
def send_notification(results: dict):
print(f"Processing complete: {results}")
# Chain using callbacks
from dramatiq.middleware import Callbacks
download_file.send_with_options(
args=("https://example.com/file.pdf",),
on_success=process_file.message(),
)
Custom Topology¶
from dramatiq_kombu_broker import DLXRoutingTopology
import datetime as dt
# Route delayed messages through DLX for monitoring
topology = DLXRoutingTopology(
delay_queue_ttl=dt.timedelta(hours=24), # Max 24h delay
dead_letter_message_ttl=None, # No TTL on DLX
)
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "amqp://..."},
topology=topology,
)
Health Checks¶
from flask import Flask, jsonify
from dramatiq_kombu_broker.testing import ensure_consumer_connection_rabbitmq
app = Flask(__name__)
@app.route('/health')
def health_check():
try:
ensure_consumer_connection_rabbitmq(broker)
return jsonify({"status": "healthy", "broker": "connected"}), 200
except Exception as e:
return jsonify({"status": "unhealthy", "error": str(e)}), 503
Production Configuration¶
import os
from dramatiq_kombu_broker import ConnectionPooledKombuBroker
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": os.environ["RABBITMQ_URL"],
"heartbeat": 60, # Default value, shown explicitly for documentation
"ssl": True,
"ssl_options": {
"ca_certs": "/etc/ssl/certs/ca.pem",
},
"transport_options": {
"max_retries": 3,
"interval_start": 0,
"interval_step": 2,
"interval_max": 30,
"confirm_publish": True,
},
},
connection_holder_options={
"max_connections": 20,
},
default_queue_name="myapp",
max_priority=10,
confirm_delivery=True,
max_enqueue_attempts=3,
max_declare_attempts=5,
)
Testing¶
import pytest
from dramatiq_kombu_broker import ConnectionPooledKombuBroker
@pytest.fixture
def test_broker():
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://guest:guest@localhost:5672/test"
}
)
yield broker
broker.flush_all() # Clean up
broker.close()
def test_task_processing(test_broker):
import dramatiq
dramatiq.set_broker(test_broker)
@dramatiq.actor
def add(x, y):
return x + y
add.send(2, 3)
# Process messages
worker = dramatiq.Worker(test_broker, worker_threads=1)
worker.start()
test_broker.join(add.queue_name, timeout=5000)
worker.stop()
Monitoring with Prometheus¶
from dramatiq.middleware import Prometheus
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "amqp://..."},
middleware=[
Prometheus(http_host="0.0.0.0", http_port=9191),
# ... other middleware
],
)
# Metrics available at http://localhost:9191
Error Tracking with Sentry¶
from dramatiq.middleware import Callbacks
import sentry_sdk
sentry_sdk.init(dsn="your-sentry-dsn")
@dramatiq.actor
def task_with_error_tracking(data):
try:
# Process data
pass
except Exception as e:
sentry_sdk.capture_exception(e)
raise
More Examples¶
See the tests directory for more examples.