Webhook systems must handle failures gracefully—events will fail due to bugs, network issues, rate limits, and downstream outages. This guide covers comprehensive error handling strategies including dead letter queues, circuit breakers, automatic recovery, and alerting patterns for production-grade webhook infrastructure.
Error Handling Architecture
┌────────────────────────────────────────────────────────────────────────────────┐
│ WEBHOOK ERROR HANDLING FLOW │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ ┌────────────┐ │
│ │ Webhook │────▶│ Queue │────▶│ Worker │────▶│ Success │ │
│ │ Received │ │ │ │ │ │ Handler │ │
│ └──────────┘ └──────────┘ └──────┬───────┘ └────────────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ Error? │ │
│ └──────┬───────┘ │
│ │ │
│ ┌─────────────────┬┴─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Transient │ │ Permanent │ │ Circuit │ │
│ │ Error │ │ Error │ │ Open │ │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Retry │ │ DLQ │ │ DLQ + │ │
│ │ w/Backoff │ │ (no retry)│ │ Alert │ │
│ └──────┬─────┘ └────────────┘ └────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ Max Retries? │ │
│ └─────────┬─────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ DLQ │ │
│ │ (after retries) │ │
│ └─────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Error Classification
Categorizing Errors
// errors.ts
export enum ErrorCategory {
TRANSIENT = 'transient', // Retry with backoff
PERMANENT = 'permanent', // Send to DLQ immediately
RATE_LIMITED = 'rate_limited', // Retry with longer delay
CIRCUIT_OPEN = 'circuit_open' // Skip until circuit closes
}
export interface ClassifiedError {
category: ErrorCategory;
original: Error;
retryable: boolean;
retryDelay?: number;
metadata?: Record<string, any>;
}
export function classifyError(error: any): ClassifiedError {
// HTTP status-based classification
if (error.status || error.statusCode) {
const status = error.status || error.statusCode;
// Rate limited
if (status === 429) {
const retryAfter = parseInt(error.headers?.['retry-after'] || '60');
return {
category: ErrorCategory.RATE_LIMITED,
original: error,
retryable: true,
retryDelay: retryAfter * 1000
};
}
// Client errors - permanent (except rate limit)
if (status >= 400 && status < 500) {
return {
category: ErrorCategory.PERMANENT,
original: error,
retryable: false,
metadata: { status, reason: 'Client error' }
};
}
// Server errors - transient
if (status >= 500) {
return {
category: ErrorCategory.TRANSIENT,
original: error,
retryable: true
};
}
}
// Network/connection errors - transient
if (
error.code === 'ECONNREFUSED' ||
error.code === 'ECONNRESET' ||
error.code === 'ETIMEDOUT' ||
error.code === 'ENOTFOUND' ||
error.message?.includes('timeout')
) {
return {
category: ErrorCategory.TRANSIENT,
original: error,
retryable: true
};
}
// Database errors
if (error.code === '23505') { // Unique violation
return {
category: ErrorCategory.PERMANENT,
original: error,
retryable: false,
metadata: { reason: 'Duplicate key - likely already processed' }
};
}
if (error.code === '40001') { // Serialization failure
return {
category: ErrorCategory.TRANSIENT,
original: error,
retryable: true
};
}
// Validation errors - permanent
if (error.name === 'ValidationError' || error.type === 'validation') {
return {
category: ErrorCategory.PERMANENT,
original: error,
retryable: false,
metadata: { reason: 'Invalid payload' }
};
}
// Default to transient (safer to retry)
return {
category: ErrorCategory.TRANSIENT,
original: error,
retryable: true
};
}
Error Handler Middleware
// error-handler.ts
import { Job } from 'bullmq';
import { classifyError, ErrorCategory } from './errors';
import { metrics } from './metrics';
import { deadLetterQueue } from './dlq';
import { alerting } from './alerting';
export async function handleProcessingError(
job: Job,
error: Error
): Promise<void> {
const classified = classifyError(error);
const eventId = job.data.eventId;
const provider = job.data.provider;
const eventType = job.data.eventType;
// Log the error
console.error({
msg: 'Webhook processing error',
eventId,
provider,
eventType,
category: classified.category,
error: error.message,
attempt: job.attemptsMade,
maxAttempts: job.opts.attempts
});
// Update metrics
metrics.webhookErrors.inc({
provider,
eventType,
category: classified.category
});
switch (classified.category) {
case ErrorCategory.PERMANENT:
// Send directly to DLQ - no more retries
await deadLetterQueue.add(job, {
reason: 'Permanent error',
error: error.message,
metadata: classified.metadata
});
// Mark job as completed to prevent retries
await job.moveToCompleted('sent_to_dlq', job.token!, false);
break;
case ErrorCategory.RATE_LIMITED:
// Update job delay for next retry
if (classified.retryDelay && job.attemptsMade < (job.opts.attempts || 3)) {
await job.updateData({
...job.data,
rateLimitDelay: classified.retryDelay
});
}
// Let the error propagate to trigger retry
throw error;
case ErrorCategory.CIRCUIT_OPEN:
// Queue for later when circuit might be closed
await deadLetterQueue.add(job, {
reason: 'Circuit breaker open',
error: error.message,
scheduledRetry: Date.now() + 60000 // Retry in 1 minute
});
await job.moveToCompleted('circuit_open', job.token!, false);
break;
case ErrorCategory.TRANSIENT:
default:
// Check if we've exhausted retries
if (job.attemptsMade >= (job.opts.attempts || 3) - 1) {
await deadLetterQueue.add(job, {
reason: 'Max retries exceeded',
error: error.message,
attempts: job.attemptsMade + 1
});
await alerting.notify({
level: 'warning',
title: 'Webhook sent to DLQ',
message: `${provider}/${eventType} exhausted retries`,
eventId
});
}
// Let the error propagate to trigger retry
throw error;
}
}
Dead Letter Queue Implementation
DLQ Structure
// dlq.ts
import { Queue, Job } from 'bullmq';
import { db } from './database';
interface DLQEntry {
id: string;
originalJobId: string;
provider: string;
eventId: string;
eventType: string;
payload: any;
headers: Record<string, string>;
receivedAt: Date;
failedAt: Date;
reason: string;
errorMessage: string;
attempts: number;
metadata?: Record<string, any>;
status: 'pending' | 'investigating' | 'resolved' | 'reprocessed';
}
class DeadLetterQueue {
private queue: Queue;
constructor() {
this.queue = new Queue('webhook-dlq', {
defaultJobOptions: {
removeOnComplete: false, // Keep for audit
removeOnFail: false
}
});
}
async add(
originalJob: Job,
context: {
reason: string;
error: string;
metadata?: Record<string, any>;
scheduledRetry?: number;
}
): Promise<void> {
const entry: DLQEntry = {
id: crypto.randomUUID(),
originalJobId: originalJob.id!,
provider: originalJob.data.provider,
eventId: originalJob.data.eventId,
eventType: originalJob.data.eventType,
payload: originalJob.data.payload,
headers: originalJob.data.headers,
receivedAt: new Date(originalJob.data.receivedAt),
failedAt: new Date(),
reason: context.reason,
errorMessage: context.error,
attempts: originalJob.attemptsMade + 1,
metadata: context.metadata,
status: 'pending'
};
// Store in database for persistence
await db.query(`
INSERT INTO webhook_dlq (
id, original_job_id, provider, event_id, event_type,
payload, headers, received_at, failed_at, reason,
error_message, attempts, metadata, status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
`, [
entry.id,
entry.originalJobId,
entry.provider,
entry.eventId,
entry.eventType,
JSON.stringify(entry.payload),
JSON.stringify(entry.headers),
entry.receivedAt,
entry.failedAt,
entry.reason,
entry.errorMessage,
entry.attempts,
JSON.stringify(entry.metadata),
entry.status
]);
// Also add to queue for potential scheduled retry
if (context.scheduledRetry) {
await this.queue.add('dlq-entry', entry, {
delay: context.scheduledRetry - Date.now(),
jobId: entry.id
});
}
// Update metrics
metrics.dlqDepth.inc({ provider: entry.provider });
}
async getEntries(filters: {
provider?: string;
eventType?: string;
status?: string;
since?: Date;
limit?: number;
}): Promise<DLQEntry[]> {
const conditions = ['1=1'];
const params: any[] = [];
if (filters.provider) {
params.push(filters.provider);
conditions.push(`provider = $${params.length}`);
}
if (filters.eventType) {
params.push(filters.eventType);
conditions.push(`event_type = $${params.length}`);
}
if (filters.status) {
params.push(filters.status);
conditions.push(`status = $${params.length}`);
}
if (filters.since) {
params.push(filters.since);
conditions.push(`failed_at >= $${params.length}`);
}
const limit = filters.limit || 100;
params.push(limit);
const result = await db.query(`
SELECT * FROM webhook_dlq
WHERE ${conditions.join(' AND ')}
ORDER BY failed_at DESC
LIMIT $${params.length}
`, params);
return result.rows;
}
async reprocess(entryId: string): Promise<{ success: boolean; error?: string }> {
const entry = await this.getEntry(entryId);
if (!entry) {
return { success: false, error: 'Entry not found' };
}
// Check if already reprocessed
if (entry.status === 'reprocessed') {
return { success: false, error: 'Already reprocessed' };
}
try {
// Add back to main queue
await webhookQueue.add(`${entry.provider}-webhook`, {
provider: entry.provider,
eventId: entry.eventId,
eventType: entry.eventType,
payload: entry.payload,
headers: entry.headers,
receivedAt: entry.receivedAt.getTime(),
reprocessedFrom: entryId,
reprocessedAt: Date.now()
});
// Update DLQ entry
await db.query(`
UPDATE webhook_dlq
SET status = 'reprocessed', reprocessed_at = NOW()
WHERE id = $1
`, [entryId]);
metrics.dlqDepth.dec({ provider: entry.provider });
return { success: true };
} catch (error) {
return { success: false, error: String(error) };
}
}
async bulkReprocess(filters: {
provider?: string;
eventType?: string;
reason?: string;
}): Promise<{ processed: number; failed: number }> {
const entries = await this.getEntries({
...filters,
status: 'pending',
limit: 1000
});
let processed = 0;
let failed = 0;
for (const entry of entries) {
const result = await this.reprocess(entry.id);
if (result.success) {
processed++;
} else {
failed++;
}
// Rate limit to avoid overwhelming the system
await sleep(100);
}
return { processed, failed };
}
private async getEntry(id: string): Promise<DLQEntry | null> {
const result = await db.query(
'SELECT * FROM webhook_dlq WHERE id = $1',
[id]
);
return result.rows[0] || null;
}
}
export const deadLetterQueue = new DeadLetterQueue();
DLQ Management API
// dlq-api.ts
import express from 'express';
import { deadLetterQueue } from './dlq';
const router = express.Router();
// List DLQ entries
router.get('/dlq', async (req, res) => {
const entries = await deadLetterQueue.getEntries({
provider: req.query.provider as string,
eventType: req.query.eventType as string,
status: req.query.status as string,
since: req.query.since ? new Date(req.query.since as string) : undefined,
limit: parseInt(req.query.limit as string) || 100
});
res.json({
count: entries.length,
entries
});
});
// Get DLQ stats
router.get('/dlq/stats', async (req, res) => {
const stats = await db.query(`
SELECT
provider,
event_type,
reason,
status,
COUNT(*) as count,
MIN(failed_at) as oldest,
MAX(failed_at) as newest
FROM webhook_dlq
WHERE failed_at > NOW() - INTERVAL '7 days'
GROUP BY provider, event_type, reason, status
ORDER BY count DESC
`);
const totals = await db.query(`
SELECT status, COUNT(*) as count
FROM webhook_dlq
GROUP BY status
`);
res.json({
byCategory: stats.rows,
totals: totals.rows
});
});
// Reprocess single entry
router.post('/dlq/:id/reprocess', async (req, res) => {
const result = await deadLetterQueue.reprocess(req.params.id);
if (result.success) {
res.json({ success: true, message: 'Event queued for reprocessing' });
} else {
res.status(400).json({ success: false, error: result.error });
}
});
// Bulk reprocess
router.post('/dlq/bulk-reprocess', async (req, res) => {
const { provider, eventType, reason } = req.body;
// Require at least one filter
if (!provider && !eventType && !reason) {
return res.status(400).json({
error: 'Must specify at least one filter (provider, eventType, or reason)'
});
}
const result = await deadLetterQueue.bulkReprocess({
provider,
eventType,
reason
});
res.json({
message: `Reprocessed ${result.processed} events, ${result.failed} failed`,
...result
});
});
// Mark as resolved (won't reprocess)
router.post('/dlq/:id/resolve', async (req, res) => {
await db.query(`
UPDATE webhook_dlq
SET status = 'resolved', resolved_at = NOW(), resolution_note = $2
WHERE id = $1
`, [req.params.id, req.body.note || 'Manually resolved']);
res.json({ success: true });
});
export default router;
Circuit Breaker Pattern
Circuit Breaker Implementation
// circuit-breaker.ts
import { Redis } from 'ioredis';
enum CircuitState {
CLOSED = 'closed',
OPEN = 'open',
HALF_OPEN = 'half_open'
}
interface CircuitBreakerConfig {
failureThreshold: number; // Failures before opening
recoveryTimeout: number; // Time before half-open (ms)
successThreshold: number; // Successes to close from half-open
windowSize: number; // Rolling window for counting (ms)
}
class CircuitBreaker {
private redis: Redis;
private name: string;
private config: CircuitBreakerConfig;
constructor(
redis: Redis,
name: string,
config: Partial<CircuitBreakerConfig> = {}
) {
this.redis = redis;
this.name = name;
this.config = {
failureThreshold: config.failureThreshold || 5,
recoveryTimeout: config.recoveryTimeout || 30000,
successThreshold: config.successThreshold || 3,
windowSize: config.windowSize || 60000
};
}
private key(suffix: string): string {
return `circuit:${this.name}:${suffix}`;
}
async getState(): Promise<CircuitState> {
const state = await this.redis.get(this.key('state'));
return (state as CircuitState) || CircuitState.CLOSED;
}
async isAllowed(): Promise<boolean> {
const state = await this.getState();
switch (state) {
case CircuitState.CLOSED:
return true;
case CircuitState.OPEN:
// Check if recovery timeout has passed
const openedAt = await this.redis.get(this.key('opened_at'));
if (openedAt && Date.now() - parseInt(openedAt) > this.config.recoveryTimeout) {
// Transition to half-open
await this.setState(CircuitState.HALF_OPEN);
return true;
}
return false;
case CircuitState.HALF_OPEN:
// Allow limited requests to test
return true;
default:
return true;
}
}
async recordSuccess(): Promise<void> {
const state = await this.getState();
if (state === CircuitState.HALF_OPEN) {
// Increment success counter
const successes = await this.redis.incr(this.key('half_open_successes'));
if (successes >= this.config.successThreshold) {
// Close the circuit
await this.setState(CircuitState.CLOSED);
await this.redis.del(
this.key('half_open_successes'),
this.key('failures')
);
console.log(`Circuit ${this.name} closed after recovery`);
}
} else if (state === CircuitState.CLOSED) {
// Clear any old failure counts
const now = Date.now();
await this.redis.zremrangebyscore(
this.key('failures'),
0,
now - this.config.windowSize
);
}
}
async recordFailure(): Promise<void> {
const state = await this.getState();
if (state === CircuitState.HALF_OPEN) {
// Any failure in half-open reopens the circuit
await this.setState(CircuitState.OPEN);
await this.redis.set(this.key('opened_at'), Date.now().toString());
console.log(`Circuit ${this.name} reopened after half-open failure`);
return;
}
if (state === CircuitState.CLOSED) {
// Add to rolling failure window
const now = Date.now();
await this.redis.zadd(this.key('failures'), now, now.toString());
// Clean old failures
await this.redis.zremrangebyscore(
this.key('failures'),
0,
now - this.config.windowSize
);
// Check if threshold exceeded
const failureCount = await this.redis.zcard(this.key('failures'));
if (failureCount >= this.config.failureThreshold) {
await this.setState(CircuitState.OPEN);
await this.redis.set(this.key('opened_at'), now.toString());
console.log(`Circuit ${this.name} opened after ${failureCount} failures`);
// Alert
await alerting.notify({
level: 'critical',
title: `Circuit breaker opened: ${this.name}`,
message: `${failureCount} failures in ${this.config.windowSize}ms window`
});
}
}
}
private async setState(state: CircuitState): Promise<void> {
await this.redis.set(this.key('state'), state);
metrics.circuitBreakerState.set(
{ circuit: this.name },
state === CircuitState.CLOSED ? 0 : state === CircuitState.HALF_OPEN ? 1 : 2
);
}
// Wrap an async function with circuit breaker
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (!(await this.isAllowed())) {
throw new CircuitOpenError(`Circuit ${this.name} is open`);
}
try {
const result = await fn();
await this.recordSuccess();
return result;
} catch (error) {
await this.recordFailure();
throw error;
}
}
}
class CircuitOpenError extends Error {
constructor(message: string) {
super(message);
this.name = 'CircuitOpenError';
}
}
// Circuit breakers for external services
export const circuits = {
database: new CircuitBreaker(redis, 'database', {
failureThreshold: 3,
recoveryTimeout: 10000
}),
stripeApi: new CircuitBreaker(redis, 'stripe-api', {
failureThreshold: 5,
recoveryTimeout: 30000
}),
emailService: new CircuitBreaker(redis, 'email-service', {
failureThreshold: 5,
recoveryTimeout: 60000
})
};
Using Circuit Breakers in Handlers
// webhook-handler-with-circuit.ts
import { circuits } from './circuit-breaker';
async function handlePaymentSucceeded(event: StripeEvent): Promise<void> {
const paymentIntent = event.data.object;
// Database operations with circuit breaker
await circuits.database.execute(async () => {
await db.query(`
UPDATE orders SET status = 'paid' WHERE payment_intent_id = $1
`, [paymentIntent.id]);
});
// Email with circuit breaker (non-critical, can fail gracefully)
try {
await circuits.emailService.execute(async () => {
await sendOrderConfirmation(paymentIntent.metadata.order_id);
});
} catch (error) {
if (error instanceof CircuitOpenError) {
// Queue for later
await emailQueue.add('order-confirmation', {
orderId: paymentIntent.metadata.order_id,
delayedDueToCircuit: true
}, { delay: 60000 });
} else {
throw error;
}
}
}
Alerting System
Alert Configuration
// alerting.ts
import { WebClient } from '@slack/web-api';
interface Alert {
level: 'info' | 'warning' | 'critical';
title: string;
message: string;
eventId?: string;
provider?: string;
metadata?: Record<string, any>;
}
interface AlertRule {
name: string;
condition: (metrics: any) => boolean;
level: Alert['level'];
message: (metrics: any) => string;
cooldown: number; // ms between alerts
}
class AlertingSystem {
private slack: WebClient;
private lastAlerts = new Map<string, number>();
private rules: AlertRule[] = [];
constructor() {
this.slack = new WebClient(process.env.SLACK_ALERT_TOKEN);
this.initializeRules();
}
private initializeRules(): void {
this.rules = [
{
name: 'high_error_rate',
condition: (m) => m.errorRate > 0.05, // 5%
level: 'warning',
message: (m) => `Error rate: ${(m.errorRate * 100).toFixed(1)}%`,
cooldown: 300000 // 5 minutes
},
{
name: 'critical_error_rate',
condition: (m) => m.errorRate > 0.20, // 20%
level: 'critical',
message: (m) => `Critical error rate: ${(m.errorRate * 100).toFixed(1)}%`,
cooldown: 60000 // 1 minute
},
{
name: 'dlq_growing',
condition: (m) => m.dlqGrowthRate > 10, // 10 events/minute
level: 'warning',
message: (m) => `DLQ growing at ${m.dlqGrowthRate} events/min`,
cooldown: 600000 // 10 minutes
},
{
name: 'queue_backlog',
condition: (m) => m.queueDepth > 10000,
level: 'critical',
message: (m) => `Queue backlog: ${m.queueDepth} events`,
cooldown: 300000
},
{
name: 'high_latency',
condition: (m) => m.p99Latency > 10000, // 10 seconds
level: 'warning',
message: (m) => `P99 latency: ${m.p99Latency}ms`,
cooldown: 600000
}
];
}
async notify(alert: Alert): Promise<void> {
// Check cooldown
const lastAlert = this.lastAlerts.get(alert.title);
if (lastAlert && Date.now() - lastAlert < 60000) {
return; // Skip - too recent
}
this.lastAlerts.set(alert.title, Date.now());
// Log
console.log({
msg: 'Alert',
...alert
});
// Send to Slack
await this.sendSlackAlert(alert);
// PagerDuty for critical
if (alert.level === 'critical') {
await this.triggerPagerDuty(alert);
}
}
private async sendSlackAlert(alert: Alert): Promise<void> {
const colors = {
info: '#36a64f',
warning: '#ffcc00',
critical: '#ff0000'
};
await this.slack.chat.postMessage({
channel: '#webhook-alerts',
text: `${this.getLevelEmoji(alert.level)} ${alert.title}`,
attachments: [{
color: colors[alert.level],
fields: [
{
title: 'Message',
value: alert.message,
short: false
},
...(alert.eventId ? [{
title: 'Event ID',
value: alert.eventId,
short: true
}] : []),
...(alert.provider ? [{
title: 'Provider',
value: alert.provider,
short: true
}] : [])
],
ts: Math.floor(Date.now() / 1000).toString()
}]
});
}
private getLevelEmoji(level: Alert['level']): string {
switch (level) {
case 'critical': return '🚨';
case 'warning': return '⚠️';
case 'info': return 'ℹ️';
}
}
private async triggerPagerDuty(alert: Alert): Promise<void> {
await fetch('https://events.pagerduty.com/v2/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
routing_key: process.env.PAGERDUTY_ROUTING_KEY,
event_action: 'trigger',
payload: {
summary: alert.title,
severity: alert.level,
source: 'webhook-system',
custom_details: {
message: alert.message,
eventId: alert.eventId,
provider: alert.provider,
...alert.metadata
}
}
})
});
}
// Periodic check against rules
async checkRules(): Promise<void> {
const metrics = await this.collectMetrics();
for (const rule of this.rules) {
if (rule.condition(metrics)) {
const lastAlert = this.lastAlerts.get(rule.name);
if (!lastAlert || Date.now() - lastAlert > rule.cooldown) {
await this.notify({
level: rule.level,
title: `Alert: ${rule.name}`,
message: rule.message(metrics)
});
this.lastAlerts.set(rule.name, Date.now());
}
}
}
}
private async collectMetrics(): Promise<any> {
const [queueStats, errorStats, latencyStats, dlqStats] = await Promise.all([
webhookQueue.getJobCounts(),
this.getErrorRate(),
this.getLatencyStats(),
this.getDLQStats()
]);
return {
queueDepth: queueStats.waiting + queueStats.active,
errorRate: errorStats.rate,
p99Latency: latencyStats.p99,
dlqGrowthRate: dlqStats.growthRate
};
}
}
export const alerting = new AlertingSystem();
// Run checks every minute
setInterval(() => alerting.checkRules(), 60000);
Recovery Procedures
Automated Recovery
// recovery.ts
interface RecoveryPlan {
name: string;
trigger: (metrics: any) => boolean;
actions: (() => Promise<void>)[];
cooldown: number;
}
class RecoveryManager {
private lastRecovery = new Map<string, number>();
private plans: RecoveryPlan[] = [];
constructor() {
this.initializePlans();
}
private initializePlans(): void {
this.plans = [
{
name: 'queue_backlog_recovery',
trigger: (m) => m.queueDepth > 5000,
actions: [
async () => {
// Scale up workers
await this.scaleWorkers(10);
console.log('Scaled up workers to 10');
},
async () => {
// Enable aggressive batching
await this.enableBatching();
console.log('Enabled aggressive batching');
}
],
cooldown: 600000 // 10 minutes
},
{
name: 'high_dlq_recovery',
trigger: (m) => m.dlqDepth > 1000,
actions: [
async () => {
// Notify team
await alerting.notify({
level: 'warning',
title: 'DLQ depth high - review required',
message: 'Manual review recommended for DLQ entries'
});
},
async () => {
// Auto-resolve known patterns
await this.autoResolveDLQ();
}
],
cooldown: 1800000 // 30 minutes
}
];
}
async checkAndRecover(): Promise<void> {
const metrics = await this.collectMetrics();
for (const plan of this.plans) {
if (plan.trigger(metrics)) {
const lastRun = this.lastRecovery.get(plan.name);
if (!lastRun || Date.now() - lastRun > plan.cooldown) {
console.log(`Executing recovery plan: ${plan.name}`);
for (const action of plan.actions) {
try {
await action();
} catch (error) {
console.error(`Recovery action failed:`, error);
}
}
this.lastRecovery.set(plan.name, Date.now());
}
}
}
}
private async autoResolveDLQ(): Promise<void> {
// Auto-resolve known safe patterns
const patterns = [
{
reason: 'Duplicate key',
resolution: 'Auto-resolved: Already processed'
},
{
reason: 'Entity not found',
resolution: 'Auto-resolved: Target entity deleted'
}
];
for (const pattern of patterns) {
await db.query(`
UPDATE webhook_dlq
SET status = 'resolved',
resolved_at = NOW(),
resolution_note = $1
WHERE status = 'pending'
AND error_message LIKE $2
`, [pattern.resolution, `%${pattern.reason}%`]);
}
}
private async scaleWorkers(count: number): Promise<void> {
// Implementation depends on infrastructure
// Kubernetes: kubectl scale deployment webhook-workers --replicas=count
// AWS: Update ECS service desired count
}
private async enableBatching(): Promise<void> {
// Update configuration to enable more aggressive batching
await redis.set('config:batch_size', '100');
await redis.set('config:batch_timeout', '200');
}
}
Manual Recovery Runbook
// recovery-cli.ts
import { Command } from 'commander';
import { deadLetterQueue } from './dlq';
import { db } from './database';
const program = new Command();
program
.command('status')
.description('Get system status')
.action(async () => {
const [queue, dlq, errors] = await Promise.all([
webhookQueue.getJobCounts(),
db.query('SELECT COUNT(*) FROM webhook_dlq WHERE status = $1', ['pending']),
db.query(`
SELECT provider, event_type, COUNT(*) as count
FROM webhook_dlq
WHERE status = 'pending' AND failed_at > NOW() - INTERVAL '1 hour'
GROUP BY provider, event_type
ORDER BY count DESC
LIMIT 10
`)
]);
console.log('\n=== System Status ===');
console.log(`Queue: ${queue.waiting} waiting, ${queue.active} active, ${queue.failed} failed`);
console.log(`DLQ Pending: ${dlq.rows[0].count}`);
console.log('\nTop DLQ Failures (last hour):');
console.table(errors.rows);
});
program
.command('drain')
.description('Drain queue for maintenance')
.action(async () => {
console.log('Pausing queue...');
await webhookQueue.pause();
console.log('Waiting for active jobs to complete...');
let active = (await webhookQueue.getJobCounts()).active;
while (active > 0) {
console.log(` ${active} jobs still active...`);
await sleep(5000);
active = (await webhookQueue.getJobCounts()).active;
}
console.log('Queue drained. Remember to resume with: recovery resume');
});
program
.command('resume')
.description('Resume paused queue')
.action(async () => {
await webhookQueue.resume();
console.log('Queue resumed');
});
program
.command('reprocess-dlq')
.description('Reprocess DLQ entries')
.option('--provider <provider>', 'Filter by provider')
.option('--event-type <type>', 'Filter by event type')
.option('--dry-run', 'Show what would be reprocessed')
.action(async (options) => {
const entries = await deadLetterQueue.getEntries({
provider: options.provider,
eventType: options.eventType,
status: 'pending',
limit: options.dryRun ? 100 : 1000
});
if (options.dryRun) {
console.log(`Would reprocess ${entries.length} entries:`);
console.table(entries.slice(0, 10).map(e => ({
id: e.id,
provider: e.provider,
eventType: e.eventType,
reason: e.reason
})));
return;
}
console.log(`Reprocessing ${entries.length} entries...`);
const result = await deadLetterQueue.bulkReprocess({
provider: options.provider,
eventType: options.eventType
});
console.log(`Processed: ${result.processed}, Failed: ${result.failed}`);
});
program.parse();
Monitoring Dashboard
Key Metrics to Display
// dashboard-data.ts
export async function getDashboardData(): Promise<DashboardData> {
const [
queueStats,
processingStats,
errorStats,
dlqStats,
providerStats
] = await Promise.all([
getQueueStats(),
getProcessingStats(),
getErrorStats(),
getDLQStats(),
getProviderStats()
]);
return {
overview: {
eventsProcessed24h: processingStats.total24h,
successRate: processingStats.successRate,
avgLatency: processingStats.avgLatency,
dlqPending: dlqStats.pending
},
queue: {
waiting: queueStats.waiting,
active: queueStats.active,
delayed: queueStats.delayed,
failed: queueStats.failed
},
errors: {
byType: errorStats.byType,
byProvider: errorStats.byProvider,
trend: errorStats.trend
},
dlq: {
pending: dlqStats.pending,
resolved: dlqStats.resolved,
reprocessed: dlqStats.reprocessed,
oldestPending: dlqStats.oldestPending
},
providers: providerStats
};
}
Summary
Comprehensive webhook error handling requires:
- Error Classification - Distinguish transient from permanent errors
- Dead Letter Queues - Preserve failed events for investigation and retry
- Circuit Breakers - Protect failing dependencies and enable graceful degradation
- Alerting - Proactive notification of issues with intelligent thresholds
- Recovery Automation - Automated responses to common failure patterns
- Observability - Metrics, dashboards, and tooling for manual intervention
The key principle is fail gracefully: acknowledge webhooks quickly, process asynchronously, retry transient failures, and preserve data in DLQs when processing ultimately fails. This ensures no webhook events are lost while maintaining system stability under adverse conditions.