When webhook volume grows from hundreds to millions of events per day, naive synchronous processing quickly becomes a bottleneck. This guide covers architecture patterns, queue strategies, and scaling techniques to build webhook systems that handle enterprise-level traffic while maintaining reliability and low latency.
The Scaling Challenge
┌─────────────────────────────────────────────────────────────────────────┐
│ WEBHOOK VOLUME GROWTH PATH │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Stage 1: Startup Stage 2: Growth Stage 3: Scale │
│ ~100/day ~10K/day ~1M+/day │
│ ──────────────── ───────────────── ────────────────── │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Single │ │ Single │ │ Multiple│ │
│ │ Server │ │ Server │ │ Servers │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Process ┌────────┐ ┌────────┐ │
│ Sync │ Queue │ │ Queue │ │
│ └───┬────┘ │Cluster │ │
│ │ └───┬────┘ │
│ ▼ │ │
│ ┌────────┐ ▼ │
│ │ Worker │ ┌────────┐ │
│ └────────┘ │Workers │ │
│ │ Pool │ │
│ └────────┘ │
│ │
│ ✓ Simple ✓ Decoupled ✓ Horizontal scale │
│ ✗ No buffering ✓ Buffered ✓ Fault tolerant │
│ ✗ Timeouts ✓ Retries ✓ Auto-scaling │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Architecture Overview
The Fundamental Pattern: Receive Fast, Process Later
// High-performance webhook receiver
import express from 'express';
import { Queue } from 'bullmq';
import { Redis } from 'ioredis';
const redis = new Redis({
host: process.env.REDIS_HOST,
port: 6379,
maxRetriesPerRequest: 3
});
const webhookQueue = new Queue('webhooks', {
connection: redis,
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 5000,
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000
}
}
});
const app = express();
// Receive webhook - respond immediately
app.post('/webhooks/:provider',
express.raw({ type: 'application/json', limit: '1mb' }),
async (req, res) => {
const startTime = Date.now();
const provider = req.params.provider;
try {
// Basic validation only - don't verify signature here for speed
// (Signature verification happens in the worker)
const body = req.body.toString();
// Add to queue with minimal processing
await webhookQueue.add(
`${provider}-webhook`,
{
provider,
headers: req.headers,
body,
receivedAt: Date.now()
},
{
priority: getPriority(provider, req.headers)
}
);
// Respond fast (target: < 100ms)
const duration = Date.now() - startTime;
res.status(200).json({
received: true,
queueTime: duration
});
} catch (error) {
console.error('Failed to queue webhook:', error);
res.status(500).json({ error: 'Queue unavailable' });
}
}
);
function getPriority(provider: string, headers: any): number {
// Lower number = higher priority
// Critical payment events get priority
if (provider === 'stripe') {
const eventType = headers['stripe-event-type'];
if (eventType?.includes('payment') || eventType?.includes('charge')) {
return 1;
}
}
return 5; // Default priority
}
app.listen(3000, () => {
console.log('Webhook receiver running on port 3000');
});
Queue-Based Processing Architecture
┌──────────────────────────────────────────────────────────────────────────┐
│ QUEUE-BASED WEBHOOK ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ Webhook Providers Receivers Message Queue │
│ ───────────────── ───────── ───────────── │
│ │
│ ┌─────────┐ │
│ │ Stripe │───┐ │
│ └─────────┘ │ ┌──────────┐ │
│ │ │Receiver 1│────┐ │
│ ┌─────────┐ │ └──────────┘ │ ┌───────────────┐ │
│ │ GitHub │───┼──────▶ ├────▶│ │ │
│ └─────────┘ │ ┌──────────┐ │ │ Redis Queue │ │
│ │ │Receiver 2│────┤ │ (BullMQ) │ │
│ ┌─────────┐ │ └──────────┘ │ │ │ │
│ │ Shopify │───┘ │ └───────┬───────┘ │
│ └─────────┘ ┌──────────┐ │ │ │
│ │Receiver N│────┘ │ │
│ └──────────┘ │ │
│ │ │
│ Load Balancer distributes │ │
│ across receiver instances ▼ │
│ │
│ Workers Database / Services │
│ ─────── ─────────────────── │
│ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ Worker 1 │────────▶│ PostgreSQL │ │
│ └──────────┘ └──────────────┘ │
│ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ Worker 2 │────────▶│ External API │ │
│ └──────────┘ └──────────────┘ │
│ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ Worker N │────────▶│ Webhook │ │
│ └──────────┘ │ Fanout │ │
│ └──────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────┘
Message Queue Strategies
BullMQ (Redis-Based) - Recommended for Most Cases
// worker.ts - Webhook processing worker
import { Worker, Job, QueueEvents } from 'bullmq';
import { Redis } from 'ioredis';
const redis = new Redis({
host: process.env.REDIS_HOST,
port: 6379,
maxRetriesPerRequest: null // Required for workers
});
interface WebhookJob {
provider: string;
headers: Record<string, string>;
body: string;
receivedAt: number;
}
// Create worker with concurrency
const worker = new Worker<WebhookJob>(
'webhooks',
async (job: Job<WebhookJob>) => {
const { provider, headers, body, receivedAt } = job.data;
const queueLatency = Date.now() - receivedAt;
console.log(`Processing ${provider} webhook (queued ${queueLatency}ms)`);
try {
// Now do full validation including signature
const event = await validateAndParse(provider, headers, body);
// Check idempotency
const isDuplicate = await checkDuplicate(event.id);
if (isDuplicate) {
return { status: 'duplicate', eventId: event.id };
}
// Process based on provider and event type
await processEvent(provider, event);
// Mark as processed
await markProcessed(event.id);
return {
status: 'processed',
eventId: event.id,
processingTime: Date.now() - receivedAt
};
} catch (error) {
// Distinguish retriable from permanent failures
if (isRetriableError(error)) {
throw error; // BullMQ will retry
} else {
// Log and don't retry
await logPermanentFailure(job.id, error);
return { status: 'failed_permanent', error: String(error) };
}
}
},
{
connection: redis,
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '10'),
limiter: {
max: 100, // Max jobs per duration
duration: 1000 // Duration in ms
}
}
);
// Monitor worker events
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
worker.on('failed', (job, error) => {
console.error(`Job ${job?.id} failed:`, error.message);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down worker...');
await worker.close();
process.exit(0);
});
Queue Configuration for Different Scenarios
// queue-config.ts
import { Queue, QueueOptions } from 'bullmq';
// High-priority queue for payment events
const paymentQueue = new Queue('payments', {
defaultJobOptions: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: false, // Keep for audit
removeOnFail: false
}
});
// Standard queue for general events
const standardQueue = new Queue('standard', {
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000
},
removeOnComplete: 1000,
removeOnFail: 5000
}
});
// Low-priority queue for analytics/logging
const analyticsQueue = new Queue('analytics', {
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000
},
removeOnComplete: 100,
removeOnFail: 1000
}
});
// Route webhooks to appropriate queue
export function routeToQueue(provider: string, eventType: string): Queue {
if (provider === 'stripe' && eventType.includes('payment')) {
return paymentQueue;
}
if (eventType.includes('analytics') || eventType.includes('log')) {
return analyticsQueue;
}
return standardQueue;
}
Amazon SQS for Higher Durability
// sqs-processor.ts
import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageCommand,
SendMessageCommand
} from '@aws-sdk/client-sqs';
const sqs = new SQSClient({ region: 'us-east-1' });
const QUEUE_URL = process.env.SQS_QUEUE_URL!;
const DLQ_URL = process.env.SQS_DLQ_URL!;
interface WebhookMessage {
provider: string;
eventId: string;
body: string;
headers: Record<string, string>;
attempts: number;
}
async function pollMessages(): Promise<void> {
while (true) {
try {
const response = await sqs.send(new ReceiveMessageCommand({
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20, // Long polling
VisibilityTimeout: 60,
MessageAttributeNames: ['All']
}));
if (response.Messages) {
await Promise.all(
response.Messages.map(async (message) => {
try {
const webhook: WebhookMessage = JSON.parse(message.Body!);
await processWebhook(webhook);
// Delete on success
await sqs.send(new DeleteMessageCommand({
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle!
}));
} catch (error) {
console.error('Processing failed:', error);
// Message returns to queue after visibility timeout
// SQS will move to DLQ after maxReceiveCount
}
})
);
}
} catch (error) {
console.error('Polling error:', error);
await sleep(5000);
}
}
}
// Send webhook to queue
export async function queueWebhook(webhook: WebhookMessage): Promise<void> {
await sqs.send(new SendMessageCommand({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify(webhook),
MessageGroupId: webhook.provider, // FIFO queue ordering
MessageDeduplicationId: webhook.eventId
}));
}
Worker Pool Management
Dynamic Worker Scaling
// worker-pool.ts
import { Worker, Queue } from 'bullmq';
import os from 'os';
interface WorkerPoolConfig {
minWorkers: number;
maxWorkers: number;
scaleUpThreshold: number; // Queue depth to trigger scale up
scaleDownThreshold: number;
scaleInterval: number; // ms
}
class WorkerPool {
private workers: Worker[] = [];
private queue: Queue;
private config: WorkerPoolConfig;
private scaleTimer?: NodeJS.Timeout;
constructor(queue: Queue, config: Partial<WorkerPoolConfig> = {}) {
this.queue = queue;
this.config = {
minWorkers: config.minWorkers || 2,
maxWorkers: config.maxWorkers || os.cpus().length * 2,
scaleUpThreshold: config.scaleUpThreshold || 1000,
scaleDownThreshold: config.scaleDownThreshold || 100,
scaleInterval: config.scaleInterval || 10000
};
}
async start(): Promise<void> {
// Start minimum workers
for (let i = 0; i < this.config.minWorkers; i++) {
await this.addWorker();
}
// Start auto-scaling
this.scaleTimer = setInterval(
() => this.autoScale(),
this.config.scaleInterval
);
console.log(`Worker pool started with ${this.workers.length} workers`);
}
private async addWorker(): Promise<void> {
const worker = new Worker(
this.queue.name,
async (job) => {
return await processWebhookJob(job);
},
{
concurrency: 10,
connection: this.queue.opts.connection
}
);
worker.on('failed', (job, error) => {
console.error(`Worker failed job ${job?.id}:`, error);
});
this.workers.push(worker);
console.log(`Added worker, total: ${this.workers.length}`);
}
private async removeWorker(): Promise<void> {
if (this.workers.length <= this.config.minWorkers) return;
const worker = this.workers.pop();
if (worker) {
await worker.close();
console.log(`Removed worker, total: ${this.workers.length}`);
}
}
private async autoScale(): Promise<void> {
const queueStatus = await this.queue.getJobCounts();
const queueDepth = queueStatus.waiting + queueStatus.active;
console.log(`Queue depth: ${queueDepth}, Workers: ${this.workers.length}`);
if (queueDepth > this.config.scaleUpThreshold) {
if (this.workers.length < this.config.maxWorkers) {
// Scale up
const toAdd = Math.min(
Math.ceil(queueDepth / 500), // Add 1 worker per 500 jobs
this.config.maxWorkers - this.workers.length
);
for (let i = 0; i < toAdd; i++) {
await this.addWorker();
}
}
} else if (queueDepth < this.config.scaleDownThreshold) {
// Scale down (remove one at a time)
await this.removeWorker();
}
}
async stop(): Promise<void> {
if (this.scaleTimer) {
clearInterval(this.scaleTimer);
}
await Promise.all(this.workers.map(w => w.close()));
this.workers = [];
console.log('Worker pool stopped');
}
}
Kubernetes Horizontal Pod Autoscaler
# kubernetes/webhook-workers.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: webhook-workers
spec:
replicas: 3
selector:
matchLabels:
app: webhook-worker
template:
metadata:
labels:
app: webhook-worker
spec:
containers:
- name: worker
image: myapp/webhook-worker:latest
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2000m"
memory: "2Gi"
env:
- name: WORKER_CONCURRENCY
value: "20"
- name: REDIS_HOST
value: redis-master.default.svc.cluster.local
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: webhook-workers-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: webhook-workers
minReplicas: 3
maxReplicas: 50
metrics:
# Scale based on queue depth (custom metric)
- type: External
external:
metric:
name: redis_queue_depth
selector:
matchLabels:
queue: webhooks
target:
type: AverageValue
averageValue: "500"
# Also consider CPU
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 5
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Batching Strategies
Database Write Batching
// batch-processor.ts
import { Job } from 'bullmq';
interface BatchConfig {
maxSize: number;
maxWaitMs: number;
}
class BatchProcessor<T> {
private batch: T[] = [];
private timer: NodeJS.Timeout | null = null;
private config: BatchConfig;
private processor: (items: T[]) => Promise<void>;
constructor(
processor: (items: T[]) => Promise<void>,
config: Partial<BatchConfig> = {}
) {
this.processor = processor;
this.config = {
maxSize: config.maxSize || 100,
maxWaitMs: config.maxWaitMs || 1000
};
}
async add(item: T): Promise<void> {
this.batch.push(item);
if (this.batch.length >= this.config.maxSize) {
await this.flush();
} else if (!this.timer) {
this.timer = setTimeout(
() => this.flush(),
this.config.maxWaitMs
);
}
}
async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.batch.length === 0) return;
const items = this.batch;
this.batch = [];
try {
await this.processor(items);
console.log(`Processed batch of ${items.length} items`);
} catch (error) {
console.error('Batch processing failed:', error);
// Re-queue items for individual retry
for (const item of items) {
this.batch.push(item);
}
throw error;
}
}
}
// Usage: Batch database inserts
const eventBatcher = new BatchProcessor<WebhookEvent>(
async (events) => {
// Single bulk insert instead of N inserts
await db.query(`
INSERT INTO webhook_events (event_id, event_type, payload, received_at)
SELECT * FROM UNNEST($1::text[], $2::text[], $3::jsonb[], $4::timestamptz[])
`, [
events.map(e => e.id),
events.map(e => e.type),
events.map(e => JSON.stringify(e.data)),
events.map(e => e.receivedAt)
]);
},
{ maxSize: 100, maxWaitMs: 500 }
);
// In worker
async function processWebhookJob(job: Job): Promise<void> {
const event = parseWebhookEvent(job.data);
await eventBatcher.add(event);
}
API Call Batching
// api-batcher.ts
interface BatchableRequest {
id: string;
endpoint: string;
payload: any;
resolve: (result: any) => void;
reject: (error: Error) => void;
}
class APIBatcher {
private pending = new Map<string, BatchableRequest[]>();
private timers = new Map<string, NodeJS.Timeout>();
private config: { maxBatch: number; maxWait: number };
constructor(config = { maxBatch: 50, maxWait: 200 }) {
this.config = config;
}
async request(endpoint: string, payload: any): Promise<any> {
return new Promise((resolve, reject) => {
const request: BatchableRequest = {
id: crypto.randomUUID(),
endpoint,
payload,
resolve,
reject
};
if (!this.pending.has(endpoint)) {
this.pending.set(endpoint, []);
}
this.pending.get(endpoint)!.push(request);
if (this.pending.get(endpoint)!.length >= this.config.maxBatch) {
this.flush(endpoint);
} else if (!this.timers.has(endpoint)) {
this.timers.set(
endpoint,
setTimeout(() => this.flush(endpoint), this.config.maxWait)
);
}
});
}
private async flush(endpoint: string): Promise<void> {
const timer = this.timers.get(endpoint);
if (timer) {
clearTimeout(timer);
this.timers.delete(endpoint);
}
const requests = this.pending.get(endpoint) || [];
this.pending.delete(endpoint);
if (requests.length === 0) return;
try {
// Make single batch API call
const response = await fetch(`${endpoint}/batch`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
requests: requests.map(r => ({
id: r.id,
payload: r.payload
}))
})
});
const results = await response.json();
// Resolve individual promises
for (const request of requests) {
const result = results.find((r: any) => r.id === request.id);
if (result?.error) {
request.reject(new Error(result.error));
} else {
request.resolve(result?.data);
}
}
} catch (error) {
// Reject all on batch failure
for (const request of requests) {
request.reject(error as Error);
}
}
}
}
Rate Limiting
Token Bucket Rate Limiter
// rate-limiter.ts
import { Redis } from 'ioredis';
class TokenBucketLimiter {
private redis: Redis;
constructor(redis: Redis) {
this.redis = redis;
}
async acquire(
key: string,
maxTokens: number,
refillRate: number, // tokens per second
tokensNeeded: number = 1
): Promise<{ allowed: boolean; retryAfter?: number }> {
const now = Date.now();
const bucketKey = `ratelimit:${key}`;
// Lua script for atomic token bucket
const script = `
local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_needed = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or max_tokens
local last_refill = tonumber(bucket[2]) or now
-- Refill tokens based on time elapsed
local elapsed = (now - last_refill) / 1000
tokens = math.min(max_tokens, tokens + (elapsed * refill_rate))
if tokens >= tokens_needed then
-- Consume tokens
tokens = tokens - tokens_needed
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return {1, 0}
else
-- Calculate wait time
local wait = (tokens_needed - tokens) / refill_rate * 1000
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return {0, math.ceil(wait)}
end
`;
const result = await this.redis.eval(
script,
1,
bucketKey,
maxTokens,
refillRate,
tokensNeeded,
now
) as [number, number];
return {
allowed: result[0] === 1,
retryAfter: result[1] || undefined
};
}
}
// Usage in webhook processing
const limiter = new TokenBucketLimiter(redis);
async function processWithRateLimit(
provider: string,
process: () => Promise<void>
): Promise<void> {
// Provider-specific rate limits
const limits: Record<string, { max: number; rate: number }> = {
stripe: { max: 100, rate: 100 }, // 100/sec
github: { max: 5000, rate: 83 }, // 5000/min
shopify: { max: 40, rate: 2 } // 40 per store
};
const limit = limits[provider] || { max: 50, rate: 10 };
const { allowed, retryAfter } = await limiter.acquire(
`provider:${provider}`,
limit.max,
limit.rate
);
if (!allowed) {
// Wait and retry
await sleep(retryAfter!);
return processWithRateLimit(provider, process);
}
await process();
}
Sliding Window Rate Limiter
// sliding-window.ts
class SlidingWindowLimiter {
private redis: Redis;
constructor(redis: Redis) {
this.redis = redis;
}
async isAllowed(
key: string,
limit: number,
windowMs: number
): Promise<{ allowed: boolean; remaining: number; resetAt: number }> {
const now = Date.now();
const windowKey = `sliding:${key}`;
const script = `
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_start = now - window
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
-- Count current requests
local count = redis.call('ZCARD', key)
if count < limit then
-- Add new request
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, math.ceil(window / 1000))
return {1, limit - count - 1, now + window}
else
-- Get oldest entry to calculate reset time
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local reset_at = oldest[2] and (tonumber(oldest[2]) + window) or (now + window)
return {0, 0, reset_at}
end
`;
const result = await this.redis.eval(
script,
1,
windowKey,
limit,
windowMs,
now
) as [number, number, number];
return {
allowed: result[0] === 1,
remaining: result[1],
resetAt: result[2]
};
}
}
Performance Optimization
Connection Pooling
// db-pool.ts
import { Pool, PoolConfig } from 'pg';
// Optimized pool for webhook processing
const poolConfig: PoolConfig = {
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 20, // Max connections per worker
min: 5, // Keep connections warm
idleTimeoutMillis: 30000, // Close idle connections
connectionTimeoutMillis: 5000,
statement_timeout: 10000, // Query timeout
};
const pool = new Pool(poolConfig);
// Monitor pool health
pool.on('error', (err) => {
console.error('Pool error:', err);
});
pool.on('connect', () => {
console.log('New connection established');
});
// Use connection for transaction batching
export async function batchInsert<T>(
table: string,
columns: string[],
rows: T[][]
): Promise<void> {
const client = await pool.connect();
try {
await client.query('BEGIN');
// Use COPY for bulk insert (fastest method)
const copyQuery = `
COPY ${table} (${columns.join(', ')})
FROM STDIN WITH (FORMAT csv)
`;
// For simplicity, using multi-value INSERT
const placeholders = rows.map((_, i) =>
`(${columns.map((_, j) => `$${i * columns.length + j + 1}`).join(', ')})`
).join(', ');
await client.query(
`INSERT INTO ${table} (${columns.join(', ')}) VALUES ${placeholders}`,
rows.flat()
);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
Caching Frequently Accessed Data
// cache.ts
class WebhookCache {
private redis: Redis;
private localCache = new Map<string, { value: any; expires: number }>();
constructor(redis: Redis) {
this.redis = redis;
}
// Two-tier caching: local memory + Redis
async get<T>(key: string): Promise<T | null> {
// Check local cache first
const local = this.localCache.get(key);
if (local && local.expires > Date.now()) {
return local.value;
}
// Check Redis
const cached = await this.redis.get(key);
if (cached) {
const value = JSON.parse(cached);
// Store in local cache for 10 seconds
this.localCache.set(key, {
value,
expires: Date.now() + 10000
});
return value;
}
return null;
}
async set<T>(key: string, value: T, ttlSeconds: number): Promise<void> {
await this.redis.setex(key, ttlSeconds, JSON.stringify(value));
this.localCache.set(key, {
value,
expires: Date.now() + Math.min(ttlSeconds * 1000, 10000)
});
}
// Cache webhook secrets to avoid DB lookups
async getWebhookSecret(provider: string): Promise<string | null> {
const key = `webhook_secret:${provider}`;
let secret = await this.get<string>(key);
if (!secret) {
// Load from database
const result = await db.query(
'SELECT secret FROM webhook_configs WHERE provider = $1',
[provider]
);
if (result.rows[0]) {
secret = result.rows[0].secret;
await this.set(key, secret, 3600); // Cache for 1 hour
}
}
return secret;
}
// Cache processed event IDs for idempotency
async isProcessed(eventId: string): Promise<boolean> {
const key = `processed:${eventId}`;
const exists = await this.redis.exists(key);
return exists === 1;
}
async markProcessed(eventId: string): Promise<void> {
const key = `processed:${eventId}`;
await this.redis.setex(key, 86400 * 7, '1'); // Keep for 7 days
}
}
Monitoring and Observability
Prometheus Metrics
// metrics.ts
import { Counter, Histogram, Gauge, Registry } from 'prom-client';
const register = new Registry();
export const metrics = {
webhooksReceived: new Counter({
name: 'webhooks_received_total',
help: 'Total webhooks received',
labelNames: ['provider', 'event_type'],
registers: [register]
}),
webhooksProcessed: new Counter({
name: 'webhooks_processed_total',
help: 'Total webhooks processed',
labelNames: ['provider', 'event_type', 'status'],
registers: [register]
}),
processingDuration: new Histogram({
name: 'webhook_processing_duration_seconds',
help: 'Webhook processing duration',
labelNames: ['provider', 'event_type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10],
registers: [register]
}),
queueDepth: new Gauge({
name: 'webhook_queue_depth',
help: 'Current webhook queue depth',
labelNames: ['queue', 'status'],
registers: [register]
}),
queueLatency: new Histogram({
name: 'webhook_queue_latency_seconds',
help: 'Time spent in queue before processing',
labelNames: ['provider'],
buckets: [0.1, 0.5, 1, 5, 10, 30, 60],
registers: [register]
}),
activeWorkers: new Gauge({
name: 'webhook_active_workers',
help: 'Number of active worker instances',
registers: [register]
}),
batchSize: new Histogram({
name: 'webhook_batch_size',
help: 'Size of processing batches',
buckets: [1, 10, 25, 50, 100, 200, 500],
registers: [register]
})
};
// Update queue metrics periodically
async function updateQueueMetrics(): Promise<void> {
const counts = await webhookQueue.getJobCounts();
metrics.queueDepth.set({ queue: 'webhooks', status: 'waiting' }, counts.waiting);
metrics.queueDepth.set({ queue: 'webhooks', status: 'active' }, counts.active);
metrics.queueDepth.set({ queue: 'webhooks', status: 'delayed' }, counts.delayed);
metrics.queueDepth.set({ queue: 'webhooks', status: 'failed' }, counts.failed);
}
setInterval(updateQueueMetrics, 5000);
// Metrics endpoint
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.send(await register.metrics());
});
Grafana Dashboard Configuration
{
"dashboard": {
"title": "Webhook Processing",
"panels": [
{
"title": "Webhooks Received/sec",
"type": "graph",
"targets": [
{
"expr": "rate(webhooks_received_total[1m])",
"legendFormat": "{{provider}}"
}
]
},
{
"title": "Queue Depth",
"type": "graph",
"targets": [
{
"expr": "webhook_queue_depth",
"legendFormat": "{{status}}"
}
],
"alert": {
"name": "High Queue Depth",
"conditions": [
{
"evaluator": { "type": "gt", "params": [10000] },
"query": { "params": ["A", "5m", "now"] }
}
]
}
},
{
"title": "Processing Latency (p99)",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(webhook_processing_duration_seconds_bucket[5m]))",
"legendFormat": "{{provider}}"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(webhooks_processed_total{status='error'}[5m]) / rate(webhooks_processed_total[5m])",
"legendFormat": "{{provider}}"
}
]
}
]
}
}
Summary
Building scalable webhook infrastructure requires:
- Decouple receiving from processing - Respond immediately, queue for async processing
- Choose the right queue - Redis/BullMQ for most cases, SQS for durability, Kafka for extreme throughput
- Scale workers independently - Auto-scale based on queue depth and latency
- Batch for efficiency - Batch database writes and API calls to reduce overhead
- Implement rate limiting - Protect downstream services from overload
- Monitor everything - Track queue depth, latency percentiles, and error rates
The key insight is that webhook scaling is primarily about managing queues and workers effectively. The receiving layer should be as thin as possible—validate minimally and queue quickly—while the processing layer handles the heavy lifting with proper batching, rate limiting, and error handling.