Filas e streams são padrões fundamentais para desacoplar sistemas e processar trabalho de forma assíncrona. Embora pareçam similares, têm características de performance muito diferentes. Escolher errado pode limitar severamente a escalabilidade.
Fila é para tarefas. Stream é para eventos. Confundir os dois é receita para problemas.
Filas vs Streams
Filas (Message Queues)
Produtor → [Fila] → Consumidor
Características:
- Mensagem consumida = removida
- Um consumidor por mensagem
- Ordenação por consumidor
- Foco em processamento de tarefas
Exemplos: RabbitMQ, SQS, ActiveMQ
Streams (Event Streams)
Produtor → [Stream] → Consumidor A
→ Consumidor B
→ Consumidor C
Características:
- Mensagem permanece no stream
- Múltiplos consumidores leem mesma mensagem
- Ordenação garantida por partição
- Foco em eventos e histórico
Exemplos: Kafka, Kinesis, Pulsar
Comparação de Performance
| Aspecto | Filas | Streams |
|---|---|---|
| Throughput | Médio (~10K msg/s) | Alto (~100K+ msg/s) |
| Latência | Baixa (ms) | Variável |
| Retenção | Até consumir | Configurável (horas/dias) |
| Replay | Não | Sim |
| Ordenação | Por fila | Por partição |
| Escala | Vertical | Horizontal |
Otimizando Filas
RabbitMQ
1. Prefetch adequado
# ❌ Prefetch 1: muito lento
channel.basic_qos(prefetch_count=1)
# ✅ Prefetch balanceado
channel.basic_qos(prefetch_count=50)
Trade-off:
Prefetch alto: melhor throughput, risco de perda se consumer falhar
Prefetch baixo: menor risco, pior throughput
2. Batching de acks
# ❌ Ack por mensagem
def callback(message):
process(message)
channel.basic_ack(delivery_tag)
# ✅ Batch ack
messages_processed = 0
def callback(message):
process(message)
messages_processed += 1
if messages_processed >= 100:
channel.basic_ack(delivery_tag, multiple=True)
messages_processed = 0
3. Persistência consciente
# ❌ Tudo persistente (lento)
channel.basic_publish(
properties=pika.BasicProperties(delivery_mode=2)
)
# ✅ Persistência só quando necessário
if message.is_critical:
properties = pika.BasicProperties(delivery_mode=2)
else:
properties = pika.BasicProperties(delivery_mode=1)
4. Múltiplas filas para paralelismo
❌ Uma fila, múltiplos consumers
→ Contenção no lock da fila
✅ Múltiplas filas com sharding
→ Paralelismo real
SQS
1. Long polling
# ❌ Short polling (desperdício)
response = sqs.receive_message(
WaitTimeSeconds=0 # Retorna imediatamente
)
# ✅ Long polling
response = sqs.receive_message(
WaitTimeSeconds=20 # Espera até 20s por mensagens
)
2. Batch operations
# ❌ Envio individual
for message in messages:
sqs.send_message(MessageBody=message)
# ✅ Batch send (até 10)
sqs.send_message_batch(
Entries=[
{'Id': str(i), 'MessageBody': msg}
for i, msg in enumerate(messages[:10])
]
)
3. Visibility timeout
# Timeout baseado no tempo de processamento
sqs.receive_message(
VisibilityTimeout=processing_time * 2
)
Otimizando Streams
Kafka
1. Particionamento adequado
# Partições = paralelismo máximo
# Mais partições = mais consumers paralelos
# ❌ Poucas partições
kafka-topics --create --topic orders --partitions 3
# ✅ Partições baseadas em throughput esperado
# Se precisa 100K msg/s e cada consumer faz 10K msg/s
# Precisa de no mínimo 10 partições
kafka-topics --create --topic orders --partitions 12
2. Partition key inteligente
# ❌ Sem key (round-robin, sem garantia de ordem)
producer.send('orders', value=order)
# ❌ Key com hot spot
producer.send('orders', key=tenant_id, value=order)
# Se um tenant tem 90% do tráfego, uma partição fica sobrecarregada
# ✅ Key balanceada
producer.send('orders', key=order_id, value=order)
3. Batching no producer
producer = KafkaProducer(
batch_size=16384, # 16KB por batch
linger_ms=5, # Espera 5ms para acumular
compression_type='lz4', # Comprime batch
)
4. Consumer configuration
consumer = KafkaConsumer(
'topic',
group_id='my-group',
# Fetch optimization
fetch_min_bytes=1024, # Espera acumular 1KB
fetch_max_wait_ms=500, # Ou 500ms
max_poll_records=500, # Até 500 records por poll
# Commit strategy
enable_auto_commit=False, # Commit manual para controle
)
5. Consumer group rebalancing
# ❌ Rebalance lento para de processar
# ✅ Cooperative rebalancing
consumer = KafkaConsumer(
partition_assignment_strategy=[
CooperativeStickyAssignor
]
)
Kinesis
1. Shards adequados
# 1 shard = 1MB/s write, 2MB/s read
# Calcule baseado em throughput
throughput_mb_s = messages_per_second * avg_message_size_kb / 1024
shards_needed = ceil(throughput_mb_s)
2. Enhanced fan-out
# ❌ Polling padrão (compartilha throughput)
# ✅ Enhanced fan-out (2MB/s dedicado por consumer)
kinesis.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName='my-consumer'
)
Padrões de Performance
1. Dead Letter Queue (DLQ)
def process_with_dlq(message, max_retries=3):
retries = message.get_header('retry_count', 0)
try:
process(message)
ack(message)
except Exception as e:
if retries >= max_retries:
send_to_dlq(message, error=str(e))
ack(message) # Remove da fila principal
else:
message.set_header('retry_count', retries + 1)
requeue_with_delay(message, delay=2**retries)
2. Competing Consumers
→ Consumer 1
Fila → Dispatcher → Consumer 2
→ Consumer 3
Cada mensagem processada por apenas um consumer
Escala horizontal natural
3. Fan-out
→ Fila A → Consumer A
Evento → SNS → Fila B → Consumer B
→ Fila C → Consumer C
Mesmo evento processado por múltiplos sistemas
4. Aggregation
Eventos → Stream → Aggregator → Batch → Destino
Agrupa eventos pequenos em batches maiores
Reduz overhead de I/O
def aggregate(events, window_seconds=10, max_batch=1000):
batch = []
window_start = time.time()
for event in events:
batch.append(event)
if len(batch) >= max_batch or time.time() - window_start > window_seconds:
yield batch
batch = []
window_start = time.time()
Métricas Essenciais
Para Filas
# Tamanho da fila
queue_messages_ready
queue_messages_unacked
# Taxa de processamento
messages_published_per_second
messages_consumed_per_second
# Latência
message_age_seconds
processing_time_seconds
Para Streams
# Consumer lag (mais importante!)
kafka_consumer_group_lag
# Throughput
records_consumed_per_second
records_produced_per_second
# Partição health
under_replicated_partitions
offline_partitions
Alertas
# Fila crescendo
- alert: QueueBacklog
expr: queue_messages_ready > 10000
for: 5m
# Consumer lag alto
- alert: ConsumerLag
expr: kafka_consumer_group_lag > 100000
for: 5m
# Mensagens velhas
- alert: OldMessages
expr: message_age_seconds > 300
for: 2m
Quando Usar Cada Um
Use Filas quando:
✓ Processamento de tarefas (jobs)
✓ Trabalho que precisa ser feito "exatamente uma vez"
✓ Não precisa de replay
✓ Ordem não é crítica (ou é por consumidor)
✓ Volume moderado
Use Streams quando:
✓ Eventos que múltiplos sistemas consomem
✓ Precisa de replay (reprocessar histórico)
✓ Alto throughput
✓ Ordem é crítica por entidade
✓ Event sourcing
✓ Real-time analytics
Use Ambos:
Eventos → Kafka → Consumer → SQS → Workers
↓
(transformação)
Stream para eventos brutos
Fila para tarefas derivadas
Conclusão
Filas e streams são ferramentas complementares:
| Filas | Streams |
|---|---|
| Tarefas | Eventos |
| Consumido = deletado | Retido |
| Um consumer | Múltiplos consumers |
| Simples | Complexo |
Para performance:
- Dimensione corretamente: partições/shards baseados em throughput
- Batch operations: reduza overhead de I/O
- Monitor lag: métrica mais importante em ambos
- DLQ sempre: não deixe mensagens envenenadas travarem sistema
- Teste sob carga: comportamento muda drasticamente em escala
A escolha entre fila e stream não é técnica, é conceitual. O que você está modelando: uma tarefa ou um fato?