Building blocks

Drain Pipeline

Batch events, retry on failure, and protect against buffer overflow with the shared drain pipeline. Supports fan-out to multiple adapters.

In production, sending one HTTP request per log event is wasteful. The drain pipeline buffers events and sends them in batches, retries on transient failures, and drops the oldest events when the buffer overflows.

Add the drain pipeline (batch + retry + fan-out)

Quick Start

The pipeline wraps any drain. The wiring depends on your framework — pick the tab that matches yours; every other example below uses the same shape.

// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
  const pipeline = createDrainPipeline<DrainContext>()
  const drain = pipeline(createAxiomDrain())

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})
Always flush the pipeline before the process exits (drain.flush()). On Nitro use the close hook; on standalone scripts call it before process.exit; on serverless runtimes use waitUntil(drain.flush()).

How It Works

  1. Events are buffered in memory as they arrive via the evlog:drain hook
  2. A batch is flushed when either the batch size is reached or the interval expires (whichever comes first)
  3. If the drain function fails, the batch is retried with the configured backoff strategy
  4. If all retries are exhausted, onDropped is called with the lost events
  5. If the buffer exceeds maxBufferSize, the oldest events are dropped to prevent memory leaks

Configuration

The options below apply to any framework — wire the resulting drain the same way you did in Quick Start.

pipeline-config.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

const pipeline = createDrainPipeline<DrainContext>({
  batch: {
    size: 50,          // Flush every 50 events
    intervalMs: 5000,  // Or every 5 seconds, whichever comes first
  },
  retry: {
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelayMs: 1000,
    maxDelayMs: 30000,
  },
  maxBufferSize: 1000,
  onDropped: (events, error) => {
    console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
  },
})

export const drain = pipeline(createAxiomDrain())
// Then wire `drain` to your framework — see Quick Start above.

Options Reference

OptionDefaultDescription
batch.size50Maximum events per batch
batch.intervalMs5000Max time (ms) before flushing a partial batch
retry.maxAttempts3Total attempts including the initial one
retry.backoff'exponential''exponential' | 'linear' | 'fixed'
retry.initialDelayMs1000Base delay for the first retry
retry.maxDelayMs30000Upper bound for any retry delay
maxBufferSize1000Max buffered events before dropping oldest
onDropped-Callback when events are dropped (overflow or retry exhaustion)

Backoff Strategies

StrategyDelay PatternUse Case
exponential1s, 2s, 4s, 8s...Default. Best for transient failures that may need time to recover
linear1s, 2s, 3s, 4s...Predictable delay growth
fixed1s, 1s, 1s, 1s...Same delay every time. Useful for rate-limited APIs

Returned Drain Function

The function returned by pipeline(drain) is hook-compatible and exposes:

PropertyTypeDescription
drain(ctx)(ctx: T) => voidPush a single event into the buffer
drain.flush()() => Promise<void>Force-flush all buffered events
drain.pendingnumberNumber of events currently buffered

Multiple Destinations

Wrap multiple adapters with a single pipeline (one batch flushed in parallel to every destination):

pipeline-fan-out.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createOTLPDrain } from 'evlog/otlp'

const axiom = createAxiomDrain()
const otlp = createOTLPDrain()

const pipeline = createDrainPipeline<DrainContext>()
export const drain = pipeline(async (batch) => {
  await Promise.allSettled([axiom(batch), otlp(batch)])
})
// Wire `drain` exactly like in Quick Start — Nitro hook, framework middleware, or initLogger.

Custom Drain Function

You don't need an adapter. Pass any async function that accepts a batch:

pipeline-custom.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })

export const drain = pipeline(async (batch) => {
  await fetch('https://your-service.com/logs', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(batch.map(ctx => ctx.event)),
  })
})
// Wire `drain` to your framework — see Quick Start above.
See the full bun-script example for a complete working script using the standalone wiring, and the Next.js guide for an App Router implementation.

Next Steps