Build resilient workflows that handle failures gracefully with compensation, rollbacks, and intelligent retry strategies.
The Saga pattern ensures data consistency across distributed operations by defining compensation actions that undo the effects of completed steps when a later step fails.
// Saga pattern for booking workflow
import { createWorkflow, step, saga } from "@/lib/workflows"
export const bookingWorkflow = createWorkflow({
id: "property-booking",
name: "Property Booking with Compensation",
// Enable saga mode for automatic compensation
mode: "saga",
steps: [
// Step 1: Reserve the property
step({
id: "reserve-property",
agent: "booking-agent",
action: async (ctx) => {
const { propertyId, dates } = ctx.trigger.data
const reservation = await db.reservations.create({
propertyId,
startDate: dates.start,
endDate: dates.end,
status: "pending"
})
return { reservationId: reservation.id }
},
// Compensation: Cancel the reservation
compensate: async (ctx, stepOutput) => {
await db.reservations.update(stepOutput.reservationId, {
status: "cancelled",
cancelledAt: new Date(),
cancelReason: "workflow_compensation"
})
}
}),
// Step 2: Charge the customer
step({
id: "charge-payment",
agent: "payment-agent",
dependsOn: ["reserve-property"],
action: async (ctx) => {
const { customerId, amount } = ctx.trigger.data
const charge = await stripe.charges.create({
customer: customerId,
amount: amount * 100,
currency: "usd",
metadata: {
reservationId: ctx.steps["reserve-property"].output.reservationId
}
})
return { chargeId: charge.id }
},
// Compensation: Refund the charge
compensate: async (ctx, stepOutput) => {
await stripe.refunds.create({
charge: stepOutput.chargeId,
reason: "requested_by_customer"
})
}
}),
// Step 3: Send confirmation (might fail!)
step({
id: "send-confirmation",
agent: "notification-agent",
dependsOn: ["charge-payment"],
action: async (ctx) => {
const { customerEmail } = ctx.trigger.data
const { reservationId } = ctx.steps["reserve-property"].output
// This might fail due to email service issues
await email.send({
to: customerEmail,
template: "booking-confirmation",
data: { reservationId }
})
return { emailSent: true }
},
// No compensation needed - email is idempotent
compensate: null
}),
// Step 4: Update calendar (might fail!)
step({
id: "update-calendar",
agent: "calendar-agent",
dependsOn: ["send-confirmation"],
action: async (ctx) => {
const { propertyId, dates, ownerId } = ctx.trigger.data
// This might fail if calendar API is down
await calendarApi.blockDates({
calendarId: ownerId,
propertyId,
startDate: dates.start,
endDate: dates.end
})
return { calendarUpdated: true }
},
// Compensation: Remove calendar block
compensate: async (ctx) => {
const { propertyId, dates, ownerId } = ctx.trigger.data
await calendarApi.unblockDates({
calendarId: ownerId,
propertyId,
startDate: dates.start,
endDate: dates.end
})
}
})
],
// Saga compensation configuration
saga: {
// Run compensations in reverse order
compensationOrder: "reverse",
// What to do if compensation fails
onCompensationError: async (step, error) => {
// Log for manual intervention
await logging.critical("compensation-failed", {
step: step.id,
error: error.message
})
// Create support ticket
await support.createTicket({
priority: "high",
subject: `Compensation failed: ${step.id}`,
body: `Manual intervention required`
})
}
}
})// Multiple fallback strategies
step({
id: "get-property-valuation",
agent: "valuation-agent",
action: async (ctx) => {
// Primary: Use ML valuation model
return await valuationML.estimate(ctx.trigger.data.property)
},
// Fallback chain - try each in order
fallbacks: [
{
// Fallback 1: Use comparable sales
condition: (error) => error.code === "MODEL_UNAVAILABLE",
action: async (ctx) => {
const comps = await db.sales.findComparables(ctx.trigger.data.property)
return {
value: calculateMedian(comps.map(c => c.price)),
method: "comparable_sales",
confidence: 0.7
}
}
},
{
// Fallback 2: Use price per sqft
condition: (error) => error.code === "INSUFFICIENT_COMPS",
action: async (ctx) => {
const { property } = ctx.trigger.data
const avgPricePerSqft = await market.getAvgPricePerSqft(property.zipCode)
return {
value: property.sqft * avgPricePerSqft,
method: "price_per_sqft",
confidence: 0.5
}
}
},
{
// Fallback 3: Use last known value
condition: () => true, // Catch-all
action: async (ctx) => {
const lastValuation = await db.valuations.findLatest(ctx.trigger.data.property.id)
return {
value: lastValuation?.value || null,
method: "historical",
confidence: 0.3,
stale: true
}
}
}
]
})When all retry attempts fail, move the workflow to a dead letter queue for manual review and intervention.
// Dead letter queue configuration
export const paymentWorkflow = createWorkflow({
id: "payment-processing",
// Global retry policy
retries: {
maxAttempts: 5,
strategy: "exponential",
maxDelay: 300000, // 5 minutes max
},
// Dead letter configuration
deadLetter: {
enabled: true,
// When to move to DLQ
conditions: {
maxRetries: true, // After exhausting retries
specificErrors: ["FRAUD_DETECTED", "ACCOUNT_CLOSED"],
timeout: "24h" // If stuck for 24 hours
},
// DLQ handler
onDeadLetter: async (execution, error) => {
// Store in DLQ table
await db.deadLetterQueue.create({
workflowId: execution.workflowId,
executionId: execution.id,
error: error.message,
context: execution.context,
createdAt: new Date()
})
// Notify operations team
await slack.send({
channel: "#ops-alerts",
text: `🚨 Workflow moved to DLQ: ${execution.workflowId}`,
attachments: [{
color: "danger",
fields: [
{ title: "Execution ID", value: execution.id },
{ title: "Error", value: error.message },
{ title: "Retry Count", value: execution.retryCount }
]
}]
})
// Create PagerDuty incident for critical workflows
if (execution.workflow.critical) {
await pagerduty.createIncident({
title: `Critical workflow failed: ${execution.workflowId}`,
severity: "high"
})
}
}
},
steps: [...]
})
// Manual DLQ processing
export async function processDLQ() {
const items = await db.deadLetterQueue.findPending()
for (const item of items) {
console.log(`Processing DLQ item: ${item.id}`)
// Option 1: Retry the workflow
if (item.retryable) {
await orchestrator.retry(item.executionId)
await db.deadLetterQueue.update(item.id, { status: "retried" })
}
// Option 2: Skip and mark resolved
else if (item.skipApproved) {
await db.deadLetterQueue.update(item.id, {
status: "skipped",
resolvedBy: item.approver,
resolvedAt: new Date()
})
}
// Option 3: Manual compensation
else if (item.compensationRequired) {
await manualCompensation(item)
await db.deadLetterQueue.update(item.id, { status: "compensated" })
}
}
}Prevent cascading failures by temporarily disabling steps that are consistently failing.
// Circuit breaker for external API calls
import { circuitBreaker } from "@/lib/workflows/resilience"
step({
id: "external-api-call",
agent: "integration-agent",
// Wrap action with circuit breaker
action: circuitBreaker({
name: "external-valuation-api",
// Circuit breaker settings
failureThreshold: 5, // Open after 5 failures
successThreshold: 3, // Close after 3 successes
timeout: 30000, // 30 second timeout
resetTimeout: 60000, // Try again after 1 minute
// The actual action
action: async (ctx) => {
return await externalApi.getValuation(ctx.trigger.data)
},
// Fallback when circuit is open
fallback: async (ctx) => {
return {
value: null,
source: "circuit_open",
message: "External API temporarily unavailable"
}
},
// Monitor circuit state changes
onStateChange: async (state, stats) => {
if (state === "open") {
await alerting.send({
severity: "warning",
message: `Circuit breaker opened: external-valuation-api`,
stats
})
}
}
})
})