Fundamentos8 min

Filas e Streams: performance em processamento assíncrono

Filas e streams são fundamentais para sistemas escaláveis. Entenda as diferenças, quando usar cada um e como otimizar performance.

Filas e streams são padrões fundamentais para desacoplar sistemas e processar trabalho de forma assíncrona. Embora pareçam similares, têm características de performance muito diferentes. Escolher errado pode limitar severamente a escalabilidade.

Fila é para tarefas. Stream é para eventos. Confundir os dois é receita para problemas.

Filas vs Streams

Filas (Message Queues)

Produtor → [Fila] → Consumidor

Características:
- Mensagem consumida = removida
- Um consumidor por mensagem
- Ordenação por consumidor
- Foco em processamento de tarefas

Exemplos: RabbitMQ, SQS, ActiveMQ

Streams (Event Streams)

Produtor → [Stream] → Consumidor A
                   → Consumidor B
                   → Consumidor C

Características:
- Mensagem permanece no stream
- Múltiplos consumidores leem mesma mensagem
- Ordenação garantida por partição
- Foco em eventos e histórico

Exemplos: Kafka, Kinesis, Pulsar

Comparação de Performance

Aspecto Filas Streams
Throughput Médio (~10K msg/s) Alto (~100K+ msg/s)
Latência Baixa (ms) Variável
Retenção Até consumir Configurável (horas/dias)
Replay Não Sim
Ordenação Por fila Por partição
Escala Vertical Horizontal

Otimizando Filas

RabbitMQ

1. Prefetch adequado

# ❌ Prefetch 1: muito lento
channel.basic_qos(prefetch_count=1)

# ✅ Prefetch balanceado
channel.basic_qos(prefetch_count=50)

Trade-off:

Prefetch alto: melhor throughput, risco de perda se consumer falhar
Prefetch baixo: menor risco, pior throughput

2. Batching de acks

# ❌ Ack por mensagem
def callback(message):
    process(message)
    channel.basic_ack(delivery_tag)

# ✅ Batch ack
messages_processed = 0
def callback(message):
    process(message)
    messages_processed += 1
    if messages_processed >= 100:
        channel.basic_ack(delivery_tag, multiple=True)
        messages_processed = 0

3. Persistência consciente

# ❌ Tudo persistente (lento)
channel.basic_publish(
    properties=pika.BasicProperties(delivery_mode=2)
)

# ✅ Persistência só quando necessário
if message.is_critical:
    properties = pika.BasicProperties(delivery_mode=2)
else:
    properties = pika.BasicProperties(delivery_mode=1)

4. Múltiplas filas para paralelismo

❌ Uma fila, múltiplos consumers
   → Contenção no lock da fila

✅ Múltiplas filas com sharding
   → Paralelismo real

SQS

1. Long polling

# ❌ Short polling (desperdício)
response = sqs.receive_message(
    WaitTimeSeconds=0  # Retorna imediatamente
)

# ✅ Long polling
response = sqs.receive_message(
    WaitTimeSeconds=20  # Espera até 20s por mensagens
)

2. Batch operations

# ❌ Envio individual
for message in messages:
    sqs.send_message(MessageBody=message)

# ✅ Batch send (até 10)
sqs.send_message_batch(
    Entries=[
        {'Id': str(i), 'MessageBody': msg}
        for i, msg in enumerate(messages[:10])
    ]
)

3. Visibility timeout

# Timeout baseado no tempo de processamento
sqs.receive_message(
    VisibilityTimeout=processing_time * 2
)

Otimizando Streams

Kafka

1. Particionamento adequado

# Partições = paralelismo máximo
# Mais partições = mais consumers paralelos

# ❌ Poucas partições
kafka-topics --create --topic orders --partitions 3

# ✅ Partições baseadas em throughput esperado
# Se precisa 100K msg/s e cada consumer faz 10K msg/s
# Precisa de no mínimo 10 partições
kafka-topics --create --topic orders --partitions 12

2. Partition key inteligente

# ❌ Sem key (round-robin, sem garantia de ordem)
producer.send('orders', value=order)

# ❌ Key com hot spot
producer.send('orders', key=tenant_id, value=order)
# Se um tenant tem 90% do tráfego, uma partição fica sobrecarregada

# ✅ Key balanceada
producer.send('orders', key=order_id, value=order)

3. Batching no producer

producer = KafkaProducer(
    batch_size=16384,       # 16KB por batch
    linger_ms=5,            # Espera 5ms para acumular
    compression_type='lz4', # Comprime batch
)

4. Consumer configuration

consumer = KafkaConsumer(
    'topic',
    group_id='my-group',

    # Fetch optimization
    fetch_min_bytes=1024,      # Espera acumular 1KB
    fetch_max_wait_ms=500,     # Ou 500ms
    max_poll_records=500,      # Até 500 records por poll

    # Commit strategy
    enable_auto_commit=False,  # Commit manual para controle
)

5. Consumer group rebalancing

# ❌ Rebalance lento para de processar
# ✅ Cooperative rebalancing
consumer = KafkaConsumer(
    partition_assignment_strategy=[
        CooperativeStickyAssignor
    ]
)

Kinesis

1. Shards adequados

# 1 shard = 1MB/s write, 2MB/s read
# Calcule baseado em throughput

throughput_mb_s = messages_per_second * avg_message_size_kb / 1024
shards_needed = ceil(throughput_mb_s)

2. Enhanced fan-out

# ❌ Polling padrão (compartilha throughput)
# ✅ Enhanced fan-out (2MB/s dedicado por consumer)
kinesis.register_stream_consumer(
    StreamARN=stream_arn,
    ConsumerName='my-consumer'
)

Padrões de Performance

1. Dead Letter Queue (DLQ)

def process_with_dlq(message, max_retries=3):
    retries = message.get_header('retry_count', 0)

    try:
        process(message)
        ack(message)
    except Exception as e:
        if retries >= max_retries:
            send_to_dlq(message, error=str(e))
            ack(message)  # Remove da fila principal
        else:
            message.set_header('retry_count', retries + 1)
            requeue_with_delay(message, delay=2**retries)

2. Competing Consumers

                    → Consumer 1
Fila → Dispatcher → Consumer 2
                    → Consumer 3

Cada mensagem processada por apenas um consumer
Escala horizontal natural

3. Fan-out

              → Fila A → Consumer A
Evento → SNS → Fila B → Consumer B
              → Fila C → Consumer C

Mesmo evento processado por múltiplos sistemas

4. Aggregation

Eventos → Stream → Aggregator → Batch → Destino

Agrupa eventos pequenos em batches maiores
Reduz overhead de I/O
def aggregate(events, window_seconds=10, max_batch=1000):
    batch = []
    window_start = time.time()

    for event in events:
        batch.append(event)

        if len(batch) >= max_batch or time.time() - window_start > window_seconds:
            yield batch
            batch = []
            window_start = time.time()

Métricas Essenciais

Para Filas

# Tamanho da fila
queue_messages_ready
queue_messages_unacked

# Taxa de processamento
messages_published_per_second
messages_consumed_per_second

# Latência
message_age_seconds
processing_time_seconds

Para Streams

# Consumer lag (mais importante!)
kafka_consumer_group_lag

# Throughput
records_consumed_per_second
records_produced_per_second

# Partição health
under_replicated_partitions
offline_partitions

Alertas

# Fila crescendo
- alert: QueueBacklog
  expr: queue_messages_ready > 10000
  for: 5m

# Consumer lag alto
- alert: ConsumerLag
  expr: kafka_consumer_group_lag > 100000
  for: 5m

# Mensagens velhas
- alert: OldMessages
  expr: message_age_seconds > 300
  for: 2m

Quando Usar Cada Um

Use Filas quando:

✓ Processamento de tarefas (jobs)
✓ Trabalho que precisa ser feito "exatamente uma vez"
✓ Não precisa de replay
✓ Ordem não é crítica (ou é por consumidor)
✓ Volume moderado

Use Streams quando:

✓ Eventos que múltiplos sistemas consomem
✓ Precisa de replay (reprocessar histórico)
✓ Alto throughput
✓ Ordem é crítica por entidade
✓ Event sourcing
✓ Real-time analytics

Use Ambos:

Eventos → Kafka → Consumer → SQS → Workers
                     ↓
              (transformação)

Stream para eventos brutos
Fila para tarefas derivadas

Conclusão

Filas e streams são ferramentas complementares:

Filas Streams
Tarefas Eventos
Consumido = deletado Retido
Um consumer Múltiplos consumers
Simples Complexo

Para performance:

  1. Dimensione corretamente: partições/shards baseados em throughput
  2. Batch operations: reduza overhead de I/O
  3. Monitor lag: métrica mais importante em ambos
  4. DLQ sempre: não deixe mensagens envenenadas travarem sistema
  5. Teste sob carga: comportamento muda drasticamente em escala

A escolha entre fila e stream não é técnica, é conceitual. O que você está modelando: uma tarefa ou um fato?

filasstreamskafkarabbitmqmensageria
Compartilhar:
Read in English

Quer entender os limites da sua plataforma?

Entre em contato para uma avaliação de performance.

Fale Conosco