Migration Guide¶
How to migrate from Dramatiq's standard RabbitMQ broker to dramatiq-kombu-broker.
Why Migrate?¶
Common reasons:
- Hitting connection limits
- Need better connection pooling
- Want topology flexibility
- Getting "Connection limit reached" errors
- Need channel pooling for threaded apps
Quick Migration¶
1. Install¶
2. Update Broker¶
Before:
from dramatiq.brokers.rabbitmq import RabbitmqBroker
broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672/")
After:
from dramatiq_kombu_broker import ConnectionPooledKombuBroker
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://guest:guest@localhost:5672/"
}
)
3. Test¶
Run your workers and verify messages process correctly.
Django Migration¶
Before (django-dramatiq):
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.rabbitmq.RabbitmqBroker",
"OPTIONS": {
"url": "amqp://guest:guest@localhost:5672/",
},
}
After:
DRAMATIQ_BROKER = {
"BROKER": "dramatiq_kombu_broker.broker.ConnectionSharedKombuBroker",
"OPTIONS": {
"kombu_connection_options": {
"hostname": "amqp://guest:guest@localhost:5672/",
},
},
}
Parameter Mapping¶
Connection URL¶
Standard broker:
Kombu broker:
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://user:pass@host:5672/vhost"
}
)
Connection Parameters¶
Standard broker:
broker = RabbitmqBroker(
host="localhost",
port=5672,
credentials=pika.PlainCredentials("guest", "guest"),
heartbeat=60,
)
Kombu broker:
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "localhost",
"port": 5672,
"userid": "guest",
"password": "guest",
"heartbeat": 60,
}
)
Note: Starting from version 0.3.0 with default heartbeat,
heartbeat=60is set automatically. You only need to specify it if you want a different value.
Max Priority¶
Standard broker:
Kombu broker:
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "..."},
max_priority=10,
)
Queue Name Migration¶
If your old setup used a different default queue name (e.g., "dramatiq" instead of "default"), you can migrate without changing actor code:
Before (actors with explicit queue names):
# You had to specify queue_name on every actor
@dramatiq.actor(queue_name="dramatiq")
def task_one():
pass
@dramatiq.actor(queue_name="dramatiq")
def task_two():
pass
After (using default_queue_name):
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "..."},
default_queue_name="dramatiq", # Replace "default" with "dramatiq"
)
dramatiq.set_broker(broker)
# No need to specify queue_name anymore - it's automatic
@dramatiq.actor
def task_one():
pass
@dramatiq.actor
def task_two():
pass
# Actors with explicit non-default queues still work as expected
@dramatiq.actor(queue_name="critical")
def urgent_task():
pass
This approach:
- Removes boilerplate from actor definitions
- Centralizes queue naming in broker configuration
- Makes it easier to change queue names across all actors
See Configuration for detailed explanation.
Common Issues¶
Issue: Topology Precondition Failed¶
Error:
Cause: Delay queues have different arguments between brokers.
Solution 1 - Clean slate:
# Delete existing queues via RabbitMQ management UI
# Or use rabbitmqctl:
rabbitmqctl delete_queue myqueue.DQ
rabbitmqctl delete_queue myqueue
rabbitmqctl delete_queue myqueue.XQ
Solution 2 - Let broker handle it:
The broker sets ignore_different_topology=True by default, which logs warnings but continues. This works if you're not changing queue structure.
Issue: Connection Pools¶
Standard broker doesn't have real connection pooling. After migrating:
- Monitor connection count in RabbitMQ UI
- Adjust
max_connectionsif needed - For Django, use
ConnectionSharedKombuBrokerinstead
Zero-Downtime Migration¶
For production with no downtime:
Step 1: Run Both Brokers¶
Deploy new code that can use both brokers:
# config.py
USE_NEW_BROKER = os.getenv("USE_NEW_BROKER", "false") == "true"
if USE_NEW_BROKER:
broker = ConnectionPooledKombuBroker(...)
else:
broker = RabbitmqBroker(...)
Step 2: Test New Broker¶
Start one worker with new broker:
Monitor for errors. Leave it running for a while.
Step 3: Gradual Rollout¶
Slowly increase workers using new broker:
# Old broker workers
dramatiq tasks &
dramatiq tasks &
# New broker workers
USE_NEW_BROKER=true dramatiq tasks &
Step 4: Clean Up Queues¶
Once all workers use new broker and queues are empty:
Step 5: Remove Old Broker¶
After a week with no issues, remove old broker code.
Testing Migration¶
Before production:
# test_migration.py
import dramatiq
from dramatiq_kombu_broker import ConnectionPooledKombuBroker
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://guest:guest@localhost:5672/test" # test vhost
}
)
dramatiq.set_broker(broker)
@dramatiq.actor
def test_task(x):
return x * 2
# Test immediate send
test_task.send(5)
# Test delayed send
test_task.send_with_options(args=(10,), delay=5000)
# Run worker
# dramatiq test_migration
Rollback Plan¶
If issues occur:
- Stop new broker workers
- Revert code to old broker
- Start old broker workers
- Messages in queue will process normally
The queue structure is compatible, so rollback is safe.
Feature Comparison¶
| Feature | Standard Broker | Kombu Broker |
|---|---|---|
| Connection pooling | Limited | Yes |
| Channel pooling | No | Yes (SharedKombuBroker) |
| Topology mismatch handling | Fails | Configurable |
| Delayed messages | Works | Works |
| Priority queues | Yes | Yes |
| Middleware | Same | Same |
| Message format | Same | Same |
Next Steps¶
After migration:
- Monitor connection count
- Check queue depths
- Verify delayed messages work
- Test retry logic
- Update monitoring dashboards
Getting Help¶
If you hit issues:
- Check Troubleshooting
- Enable debug logging:
PYTHONUNBUFFERED=1 dramatiq tasks --verbose - Ask on GitHub Discussions
- Report bugs on GitHub Issues