Skip to content

Workflows

Workflows orchestrate multi-step operations with LLM reasoning, parallel execution, and complex data transformations.

Workflow Structure

json
{
  "id": "analyze-ticket",
  "name": "Analyze JIRA Ticket",
  "workflows": [{
    "name": "main",
    "steps": [
      {
        "id": "fetch",
        "action": "jira.getIssue",
        "params": { "issueKey": "{{input.ticketId}}" }
      },
      {
        "id": "analyze",
        "action": "llm",
        "params": {
          "prompt": "Analyze: {{steps.fetch.result}}",
          "model": "claude-sonnet-4-20250514"
        }
      }
    ]
  }],
  "tools": [
    { "name": "llm", "type": "llm_call" },
    { "name": "jira.getIssue", "type": "api_call", "endpoint": "..." }
  ]
}

Tool Types

llm_call

LLM inference with configurable providers.

json
{
  "name": "generate",
  "type": "llm_call",
  "params": {
    "temperature": 0.7,
    "maxTokens": 4000,
    "provider": "anthropic",
    "model": "claude-sonnet-4-20250514"
  }
}

Step usage:

json
{
  "id": "analyze",
  "action": "generate",
  "params": {
    "prompt": "Analyze: {{steps.fetch.result}}",
    "temperature": 0.3
  }
}

Supported providers: anthropic, openai, google, lmstudio, ollama-cli, claude-cli.

api_call

External API requests.

json
{
  "name": "fetchData",
  "type": "api_call",
  "endpoint": "https://api.example.com/data",
  "method": "POST",
  "params": {
    "headers": { "Authorization": "Bearer {{env.API_KEY}}" }
  }
}

mcp_tool

Model Context Protocol tools.

json
{
  "name": "filesystem",
  "type": "mcp_tool",
  "params": {
    "mcpServer": "filesystem",
    "toolName": "read_file"
  }
}

shell_command

Execute shell commands with CLI provider support.

json
{
  "name": "runScript",
  "type": "shell_command",
  "params": {
    "command": "node scripts/process.js",
    "cwd": "/app",
    "timeout": 30000
  }
}

CLI-based LLM providers:

json
{
  "id": "claude-analyze",
  "action": "shell",
  "params": {
    "command": "claude --print '{{steps.data.result}}' --output-format json",
    "cliProvider": "claude",
    "parseJSON": true
  }
}

git_command

Git operations.

json
{
  "name": "gitStatus",
  "type": "git_command",
  "params": {
    "command": "git status --porcelain",
    "cwd": "/repo"
  }
}

storage

File storage operations (R2, Supabase Storage).

json
{
  "id": "upload",
  "action": "storage",
  "params": {
    "operation": "upload",
    "bucket": "documents",
    "path": "reports/{{input.reportId}}.pdf",
    "data": "{{steps.generate.result}}"
  }
}

Operations: get, put, list, delete, signedUrl, publicUrl.

database

Database operations (Firestore, Supabase).

json
{
  "id": "save",
  "action": "database",
  "params": {
    "provider": "firestore",
    "operation": "insert",
    "collection": "reports",
    "data": "{{steps.analyze.result}}"
  }
}

Operations: get, query, insert, update, upsert, delete.

Multi-Provider Database Workflows

Override the default database provider per step using the provider field:

json
{
  "workflows": [{
    "name": "cross-db-sync",
    "steps": [
      {
        "id": "fetch-users",
        "action": "database",
        "params": {
          "provider": "supabase",
          "operation": "query",
          "collection": "users",
          "where": [{ "field": "active", "operator": "eq", "value": true }]
        }
      },
      {
        "id": "log-audit",
        "action": "database",
        "params": {
          "provider": "firebase",
          "operation": "insert",
          "collection": "audit_log",
          "data": {
            "action": "user_sync",
            "count": "{{steps.fetch-users.result.length}}"
          }
        }
      }
    ]
  }]
}

Supported providers: supabase, firebase, sqlite.

FieldTypeRequiredDescription
providersupabase|firebase|sqliteNoOverride default provider
operationget|query|insert|update|upsert|deleteYesDatabase operation
collection / tablestringYesTable/collection name
dataobjectFor insert/updateData to write
where / filtersarrayFor queriesQuery filters
orderBystringNoSort order
limitnumberNoMax results

notification

Multi-channel notifications.

json
{
  "id": "notify",
  "action": "notification",
  "params": {
    "channels": ["push", "email"],
    "title": "Report Ready",
    "body": "Your report is ready: {{steps.upload.result.url}}",
    "userId": "{{input.userId}}"
  }
}

workflow

Nested workflow execution.

json
{
  "id": "nested",
  "action": "nested-workflow",
  "params": {
    "config": {
      "id": "sub-workflow",
      "workflows": [...]
    }
  }
}

rag

Vector search with RAG modes.

json
{
  "id": "search",
  "action": "rag",
  "input": {
    "query": "{{input.question}}",
    "mode": "single",
    "limit": 5
  }
}

Modes: single, accumulate, iterative.

cache

Cache operations.

json
{
  "id": "invalidate",
  "action": "cache",
  "params": {
    "operation": "invalidate",
    "pattern": "reports:*"
  }
}

Operations: get, set, delete, invalidate.

Parallel Execution

Using parallel: true

Steps with parallel: true run concurrently.

json
{
  "workflows": [{
    "name": "parallel-fetch",
    "steps": [
      {
        "id": "fetch-users",
        "action": "api.getUsers",
        "parallel": true
      },
      {
        "id": "fetch-orders",
        "action": "api.getOrders",
        "parallel": true
      },
      {
        "id": "combine",
        "action": "transform",
        "dependsOn": ["fetch-users", "fetch-orders"],
        "params": {
          "users": "{{steps.fetch-users.result}}",
          "orders": "{{steps.fetch-orders.result}}"
        }
      }
    ]
  }]
}

Using dependsOn

Define step dependencies for DAG-based execution.

json
{
  "steps": [
    { "id": "A", "action": "step-a" },
    { "id": "B", "action": "step-b" },
    { "id": "C", "action": "step-c", "dependsOn": ["A"] },
    { "id": "D", "action": "step-d", "dependsOn": ["A", "B"] },
    { "id": "E", "action": "step-e", "dependsOn": ["C", "D"] }
  ]
}

Execution order: A and B run in parallel → C waits for A, D waits for A+B → E waits for C+D.

Early Return (Background Execution)

Return immediately to the client while workflow continues in background. Useful for long-running operations where you want quick acknowledgment.

Using earlyReturn: true

json
{
  "steps": [
    {
      "id": "delete-files",
      "action": "r2",
      "earlyReturn": true,
      "params": { "operation": "deleteByPrefix", "prefix": "{{input.tenantId}}/projects/{{input.projectId}}/" }
    },
    { "id": "cleanup-db", "action": "database", "params": { "operation": "delete", "collection": "projects", "id": "{{input.projectId}}" } },
    { "id": "invalidate-cache", "action": "cache", "dependsOn": ["cleanup-db"], "params": { "operation": "invalidate", "pattern": "projects:*" } }
  ]
}

When the workflow encounters a step with earlyReturn: true, it returns immediately:

json
{
  "status": "accepted",
  "executionId": "exec-123",
  "channel": "workflow:exec-123",
  "message": "Workflow started in background"
}

All steps (including the earlyReturn step) continue executing in the background.

WebSocket Progress

Subscribe to the channel for real-time progress:

typescript
bf.workflows.subscribe('exec-123', {
  onProgress: (e) => console.log(`${e.stepId}: ${e.status}`),
  onComplete: (r) => console.log('All steps done'),
  onError: (e) => console.error('Workflow failed:', e)
});

Events:

  • step_started - Step begins execution
  • step_completed - Step succeeds
  • step_failed - Step fails
  • workflow_completed - All steps done
  • workflow_failed - Workflow failed

Polling Status

bash
GET /agent/workflow/{executionId}/status

Response:

json
{
  "executionId": "exec-123",
  "workflowId": "project-delete",
  "status": "running",
  "completedSteps": ["delete-files", "cleanup-db"],
  "pendingSteps": ["invalidate-cache"]
}

SDK Usage

typescript
const result = await bf.workflows.execute(config);

if (result.status === 'accepted') {
  // Workflow running in background
  console.log('Started:', result.executionId);

  // Poll for status
  const status = await bf.workflows.getStatus(result.executionId);
  console.log('Progress:', status.completedSteps.length, '/', status.completedSteps.length + status.pendingSteps.length);
}

Queue as Job

For persistent, distributed background execution, add queueAsJob: true to the early return step. This queues the workflow continuation as a job instead of running it immediately in-process.

json
{
  "id": "delete-r2-files",
  "action": "r2",
  "earlyReturn": true,
  "queueAsJob": true,
  "params": { "operation": "deleteByPrefix", "prefix": "projects/{{projectId}}" }
}

When queueAsJob is enabled:

  • The workflow is queued to the workflows queue
  • Returns a jobId in the response for tracking
  • Execution is handled by a dedicated job worker
  • Survives server restarts (if using Redis/BullMQ queue)

Response with queueAsJob:

json
{
  "status": "accepted",
  "executionId": "exec-123",
  "channel": "workflow:exec-123",
  "jobId": "workflows:1234567890:abc",
  "message": "Workflow queued for execution"
}

Use queueAsJob when:

  • You need persistence across server restarts
  • Running in a distributed environment
  • Want job-level retry/failure handling
  • Need to limit concurrent background executions

Parallel JIRA Ticket Analysis

json
{
  "id": "multi-ticket-analysis",
  "name": "Analyze Multiple Tickets",
  "workflows": [{
    "name": "parallel-analysis",
    "steps": [
      {
        "id": "ticket1",
        "action": "jira.getIssue",
        "parallel": true,
        "params": { "issueKey": "PROJ-101" }
      },
      {
        "id": "ticket2",
        "action": "jira.getIssue",
        "parallel": true,
        "params": { "issueKey": "PROJ-102" }
      },
      {
        "id": "ticket3",
        "action": "jira.getIssue",
        "parallel": true,
        "params": { "issueKey": "PROJ-103" }
      },
      {
        "id": "analyze-all",
        "action": "llm",
        "dependsOn": ["ticket1", "ticket2", "ticket3"],
        "params": {
          "prompt": "Analyze these tickets and find common themes:\n{{steps.ticket1.result | json}}\n{{steps.ticket2.result | json}}\n{{steps.ticket3.result | json}}"
        }
      }
    ]
  }]
}

Conditional Execution

Step Conditions

json
{
  "id": "send-alert",
  "action": "slack.sendMessage",
  "conditions": {
    "steps.analyze.result.severity": "high"
  },
  "params": {
    "channel": "#alerts",
    "text": "High severity issue detected"
  }
}

Condition Operators

json
{
  "conditions": {
    "steps.count.result": { "$gt": 100 },
    "steps.status.result": { "$in": ["active", "pending"] },
    "steps.data.result.enabled": { "$eq": true }
  }
}
OperatorDescription
$eqEqual
$neNot equal
$gtGreater than
$gteGreater than or equal
$ltLess than
$lteLess than or equal
$inIn array
$existsField exists

Expression Conditions

json
{
  "conditions": {
    "if": "{{steps.analyze.result.score}} > 80"
  }
}

Template Variables

Basic Variables

VariableDescription
{{input.field}}Workflow input
{{steps.stepId.result}}Step result
{{steps.stepId.output}}Step output
{{steps.stepId.stdout}}Shell command stdout
{{env.VAR}}Environment variable
{{secret:KEY}}Tenant secret

Template Filters

json
{
  "prompt": "Data: {{steps.fetch.result | json}}"
}
FilterDescription
jsonJSON stringify
default: 'value'Default if undefined

Conditional Templates

json
{
  "prompt": "{{#if steps.data.result}}Has data: {{steps.data.result}}{{else}}No data{{/if}}"
}

Pause and Resume

Pause Points

json
{
  "id": "approval",
  "action": "analyze",
  "pausePoint": {
    "message": "Review analysis before proceeding",
    "requireApproval": true,
    "notify": {
      "enabled": true,
      "userId": "{{input.approver}}",
      "channels": ["push", "email"]
    }
  }
}

Resume Execution

bash
POST /agent/workflow/resume
Content-Type: application/json

{
  "executionId": "exec-123",
  "continueExecution": true,
  "userInput": { "approved": true }
}

Cancel Execution

bash
POST /agent/workflow/cancel
Content-Type: application/json

{
  "executionId": "exec-123"
}

Execute Workflow

Run Inline Workflow

bash
POST /agent/workflow
Content-Type: application/json

{
  "workflow": {
    "id": "my-workflow",
    "workflows": [{ "name": "main", "steps": [...] }],
    "tools": [...]
  },
  "input": { "ticketId": "PROJ-123" }
}

Run Saved Workflow

bash
POST /agent/workflow/run
Content-Type: application/json

{
  "workflowId": "analyze-ticket",
  "input": { "ticketId": "PROJ-123" }
}

Plan Only (Dry Run)

bash
POST /agent/workflow
Content-Type: application/json

{
  "workflow": { ... },
  "planOnly": true
}

WebSocket Progress

json
{
  "broadcast": true
}

Connect to ws://server/workflow:exec-123 to receive progress updates.

Workflow Status

bash
GET /agent/workflow/status?workflowId=abc123

Response:

json
{
  "workflowId": "abc123",
  "status": "completed",
  "executionId": "exec-456",
  "duration": 1234,
  "steps": [
    { "id": "fetch", "status": "success", "output": {...} },
    { "id": "analyze", "status": "success", "output": {...} }
  ]
}

Error Handling

json
{
  "id": "risky-step",
  "action": "external-api",
  "onError": {
    "action": "continue",
    "fallback": {
      "id": "fallback-step",
      "action": "llm",
      "params": { "prompt": "Generate fallback response" }
    }
  }
}

Advanced Examples

Multi-Agent Orchestration

json
{
  "id": "multi-agent",
  "name": "Multi-Agent Analysis",
  "agent": {
    "llm_prompt": "You are analyzing code and providing recommendations."
  },
  "workflows": [{
    "name": "orchestration",
    "steps": [
      {
        "id": "code-review",
        "action": "llm",
        "parallel": true,
        "params": { "prompt": "Review code quality: {{input.code}}" }
      },
      {
        "id": "security-scan",
        "action": "llm",
        "parallel": true,
        "params": { "prompt": "Identify security issues: {{input.code}}" }
      },
      {
        "id": "performance-check",
        "action": "llm",
        "parallel": true,
        "params": { "prompt": "Find performance issues: {{input.code}}" }
      },
      {
        "id": "synthesize",
        "action": "llm",
        "dependsOn": ["code-review", "security-scan", "performance-check"],
        "params": {
          "prompt": "Combine findings:\n- Code Review: {{steps.code-review.result}}\n- Security: {{steps.security-scan.result}}\n- Performance: {{steps.performance-check.result}}"
        }
      }
    ]
  }],
  "tools": [{ "name": "llm", "type": "llm_call" }]
}

E-Commerce Order Processing

json
{
  "id": "process-order",
  "name": "Process E-Commerce Order",
  "workflows": [{
    "name": "order-flow",
    "steps": [
      {
        "id": "validate-inventory",
        "action": "database",
        "params": {
          "operation": "query",
          "collection": "inventory",
          "where": [{ "field": "sku", "operator": "in", "value": "{{input.skus}}" }]
        }
      },
      {
        "id": "check-stock",
        "action": "llm",
        "params": {
          "prompt": "Check if all items are in stock: {{steps.validate-inventory.result}}"
        }
      },
      {
        "id": "process-payment",
        "action": "stripe.charge",
        "conditions": { "steps.check-stock.result.inStock": true },
        "params": {
          "amount": "{{input.total}}",
          "currency": "usd",
          "customer": "{{input.customerId}}"
        }
      },
      {
        "id": "create-shipment",
        "action": "shipping.createLabel",
        "dependsOn": ["process-payment"],
        "params": {
          "address": "{{input.shippingAddress}}",
          "items": "{{input.items}}"
        }
      },
      {
        "id": "send-confirmation",
        "action": "sendgrid.sendTemplate",
        "dependsOn": ["create-shipment"],
        "params": {
          "to": "{{input.email}}",
          "templateId": "order-confirmation",
          "data": {
            "orderId": "{{input.orderId}}",
            "trackingNumber": "{{steps.create-shipment.result.trackingNumber}}"
          }
        }
      }
    ]
  }]
}

Batch Processing with ForEach

json
{
  "id": "process-batch",
  "workflows": [{
    "name": "batch",
    "steps": [
      {
        "id": "fetch-items",
        "action": "jira.search",
        "params": { "jql": "status = Open" }
      },
      {
        "id": "process-each",
        "type": "forEach",
        "items": "{{steps.fetch-items.result.issues}}",
        "step": {
          "action": "llm",
          "params": { "prompt": "Summarize: {{item.fields.summary}}" }
        }
      },
      {
        "id": "aggregate",
        "action": "llm",
        "params": {
          "prompt": "Create summary report from: {{steps.process-each.result}}"
        }
      }
    ]
  }]
}

SDK Usage

Execute Workflows via SDK

typescript
import { createBackflow } from '@backflow/sdk';

const bf = createBackflow({
  tenantId: 'my-tenant',
  apiKey: process.env.BACKFLOW_API_KEY,
  endpoint: 'http://localhost:3000'  // Local development
});

// Execute inline workflow
const result = await bf.workflows.execute({
  steps: [
    { id: 'fetch', action: 'jira.getIssue', params: { issueKey: 'PROJ-123' } },
    { id: 'analyze', action: 'llm', params: { prompt: 'Analyze: {{steps.fetch.result}}' } }
  ]
});

// Run saved workflow
const result = await bf.workflows.run('analyze-ticket', { ticketId: 'PROJ-123' });

// Subscribe to progress
bf.workflows.subscribe(result.executionId, {
  onProgress: (e) => console.log(`${e.stepId}: ${e.status}`),
  onComplete: (r) => console.log('Done:', r),
  onError: (e) => console.error(e)
});

// Control execution
await bf.workflows.pause(result.executionId);
await bf.workflows.resume(result.executionId);
await bf.workflows.cancel(result.executionId);

CLI Script for Workflows

typescript
// scripts/run-workflow.ts
import { createBackflow } from '@backflow/sdk';

const bf = createBackflow({
  tenantId: process.env.BACKFLOW_TENANT_ID!,
  apiKey: process.env.BACKFLOW_API_KEY!,
  endpoint: process.env.BACKFLOW_ENDPOINT || 'http://localhost:3000'
});

const [,, workflowId, inputJson] = process.argv;
const input = inputJson ? JSON.parse(inputJson) : {};

bf.workflows.run(workflowId, input)
  .then(console.log)
  .catch(console.error);

Run workflows from terminal:

bash
npx tsx scripts/run-workflow.ts analyze-ticket '{"ticketId":"PROJ-123"}'

CLI-Based Code Analysis

json
{
  "id": "code-analysis",
  "name": "Analyze Code with Claude CLI",
  "workflows": [{
    "name": "analysis",
    "steps": [
      {
        "id": "read-file",
        "action": "filesystem",
        "params": { "path": "{{input.filePath}}" }
      },
      {
        "id": "analyze",
        "action": "shell",
        "params": {
          "command": "claude --print 'Analyze this code for bugs and improvements:\n{{steps.read-file.stdout}}' --output-format json",
          "cliProvider": "claude",
          "parseJSON": true
        }
      },
      {
        "id": "save-report",
        "action": "storage",
        "params": {
          "operation": "upload",
          "bucket": "reports",
          "path": "analysis/{{input.fileName}}.json",
          "data": "{{steps.analyze.result}}"
        }
      }
    ]
  }],
  "tools": [
    { "name": "filesystem", "type": "mcp_tool" },
    { "name": "shell", "type": "shell_command" },
    { "name": "storage", "type": "storage" }
  ]
}

Released under the ISC License.