Troubleshooting¶
Common issues and how to fix them.
Connection Issues¶
Connection Refused¶
Error: ConnectionRefusedError: [Errno 111] Connection refused
Causes: - RabbitMQ not running - Wrong hostname/port - Firewall blocking connection
Fix:
# Check RabbitMQ is running
systemctl status rabbitmq-server
# Check port is open
telnet localhost 5672
# Check connection string
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://guest:guest@localhost:5672/" # Verify this
}
)
Connection Limit Reached¶
Error: connection_limit_reached
Fix 1 - Use Shared Connection:
# Instead of ConnectionPooledKombuBroker
from dramatiq_kombu_broker import ConnectionSharedKombuBroker
broker = ConnectionSharedKombuBroker(...)
Fix 2 - Reduce Pool Size:
broker = ConnectionPooledKombuBroker(
connection_holder_options={
"max_connections": 5, # Lower than default
}
)
Fix 3 - Increase RabbitMQ Limit:
Heartbeat Failures¶
Error: Connection drops unexpectedly
By default, heartbeat is set to 60 seconds. To reduce it for unreliable networks:
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://...",
"heartbeat": 30, # Lower than default 60s
}
)
Publish Deadlocks¶
Symptoms: - Worker threads hang indefinitely - Message publishing never completes - Application becomes unresponsive during RabbitMQ issues
Cause: When confirm_delivery=True, the broker waits for RabbitMQ to confirm message receipt. If the connection drops during this wait (before heartbeat detects it), the thread can block forever.
Solution: Use confirm_timeout (enabled by default):
broker = ConnectionPooledKombuBroker(
kombu_connection_options={
"hostname": "amqp://...",
"heartbeat": 60, # Transport-level protection
},
confirm_delivery=True,
confirm_timeout=5.0, # Application-level protection (default)
)
If you're getting timeout errors:
-
Check network latency - High-latency networks may need longer timeout:
-
Check RabbitMQ load - Overloaded RabbitMQ may delay confirmations. Monitor RabbitMQ metrics.
-
Check connection stability - Frequent timeouts indicate connection issues. Investigate network or RabbitMQ health.
Relationship with heartbeat:
| Parameter | Purpose | When It Helps |
|---|---|---|
heartbeat=60 |
Detects dead connections | Idle connections, no activity |
confirm_timeout=5.0 |
Prevents publish blocking | Active publishing, connection drops mid-publish |
Both parameters work together to provide comprehensive connection protection.
See Delivery Guarantees for more details.
Queue Issues¶
Precondition Failed¶
Error: amqp.exceptions.PreconditionFailed: inequivalent arg 'x-dead-letter-exchange'
Cause: Queue already exists with different arguments
Fix: Delete queue and recreate
# Via rabbitmqctl
rabbitmqctl delete_queue myqueue
rabbitmqctl delete_queue myqueue.DQ
rabbitmqctl delete_queue myqueue.XQ
# Or via management UI
# Queues tab → Select queue → Delete
Delayed Messages Not Working¶
Symptoms: - Messages sent with delay never appear - Delay queue has messages stuck
Check 1 - Delay Queue Configuration:
# Ensure using DefaultDramatiqTopology (not custom)
from dramatiq_kombu_broker import ConnectionPooledKombuBroker
broker = ConnectionPooledKombuBroker(...) # Uses DefaultDramatiqTopology
Check 2 - Dead Letter Parameters:
# Check delay queue has x-dead-letter-routing-key
rabbitmqctl list_queues name arguments | grep ".DQ"
Should show:
Fix: Delete delay queue, let it recreate with correct parameters
Messages Going to Wrong Queue¶
Check routing key:
def debug_message(broker, message):
print(f"Queue: {message.queue_name}")
print(f"Routing key: {message.options.get('routing_key')}")
Worker Issues¶
Worker Not Processing¶
Check 1 - Worker Running:
Check 2 - Correct Queue:
Check 3 - Actor Discovered:
High Memory Usage¶
Causes: - Too many worker threads - Memory leaks in actors - Large messages
Fix 1 - Reduce Threads:
Fix 2 - Process Messages in Batches:
@dramatiq.actor
def process_large_dataset(chunk_ids):
for chunk_id in chunk_ids:
process_chunk(chunk_id)
# Free memory between chunks
del chunk_data
Slow Processing¶
Profile actor:
import cProfile
@dramatiq.actor
def slow_task(data):
profiler = cProfile.Profile()
profiler.enable()
# Your code
process(data)
profiler.disable()
profiler.print_stats()
Check: - Database queries (N+1 problem) - External API calls - CPU-intensive operations
Message Issues¶
Messages Lost¶
Check 1 - Confirm Delivery:
broker = ConnectionPooledKombuBroker(
kombu_connection_options={"hostname": "amqp://..."},
confirm_delivery=True, # Ensure this is True
)
Check 2 - Message Persistence: Messages are persistent by default. Check RabbitMQ logs for crashes during publishing.
Check 3 - Dead Letter Queue:
def check_dlq(broker, queue_name):
main, delay, dlq = broker.get_queue_message_counts(queue_name)
print(f"DLQ has {dlq} messages")
Messages Stuck in DLQ¶
View DLQ messages:
Reprocess:
# Move back to main queue
rabbitmqadmin get queue=myqueue.XQ requeue=false count=1 | \
rabbitmqadmin publish routing_key=myqueue
Deserialization Errors¶
Error: JSONDecodeError or similar
Causes: - Message format changed - Corrupted message
Fix:
@dramatiq.actor
def my_task(data):
try:
process(data)
except (ValueError, TypeError) as e:
# Log and skip bad message
logger.error(f"Bad message: {e}")
return # Don't raise, let it ACK
Django Issues¶
Django DB Connection Errors¶
Error: OperationalError: server closed the connection
Fix - Use DB Middleware:
DRAMATIQ_BROKER["OPTIONS"]["middleware"] = [
"django_dramatiq.middleware.DbConnectionsMiddleware",
# ... other middleware
]
App Not Found¶
Error: django.core.exceptions.AppRegistryNotReady
Fix - Ensure Django Setup:
Performance Issues¶
High Latency¶
Check 1 - Connection Pool:
Check 2 - Worker Threads:
Check 3 - Prefetch:
Queue Backlog¶
Symptoms: Messages piling up
Fix 1 - More Workers:
Fix 2 - Optimize Actors: - Remove unnecessary work - Batch operations - Use asyncio for I/O-bound tasks
Debugging¶
Enable Verbose Logging¶
Python Debugging¶
RabbitMQ Management UI¶
Access at http://localhost:15672 (guest/guest)
- View queue depths
- Inspect messages
- Check connection count
- Monitor memory usage
Connection Info¶
def debug_connections(broker):
with broker.connection_holder.acquire_consumer_channel() as channel:
print(f"Connection: {channel.connection}")
print(f"Is open: {channel.connection.connected}")
Getting Help¶
If you're still stuck:
- Check logs - Worker logs, RabbitMQ logs
- Minimal reproducer - Isolate the problem
- Search issues - GitHub Issues
- Ask for help - GitHub Discussions
When asking for help, include: - dramatiq-kombu-broker version - RabbitMQ version - Python version - Full error message and traceback - Minimal code to reproduce
Common Solutions Summary¶
| Issue | Quick Fix |
|---|---|
| Connection refused | Check RabbitMQ is running |
| Connection limit | Use ConnectionSharedKombuBroker |
| Publish deadlock | Use confirm_timeout=5.0 (default) |
| Precondition failed | Delete queue, let it recreate |
| Delays not working | Delete .DQ queue, use DefaultDramatiqTopology |
| High memory | Reduce worker threads |
| Slow processing | Profile code, optimize |
| Messages lost | Enable confirm_delivery |
| Queue backlog | More workers, optimize actors |
Next Steps¶
- Configuration - Tune settings
- Performance - Optimize performance
- Examples - See working code