Skip to content

Latest commit

 

History

History
214 lines (157 loc) · 7.77 KB

File metadata and controls

214 lines (157 loc) · 7.77 KB

pg-workflows

The simplest Postgres workflow engine for TypeScript. Durable execution, event-driven orchestration, and automatic retries - powered entirely by PostgreSQL. No Redis, no message broker, no new infrastructure.

npm version License: MIT Node.js PostgreSQL

npm install pg-workflows pg

A complete workflow

import { WorkflowEngine, workflow } from 'pg-workflows'
import { z } from 'zod'

const onboardUser = workflow(
  'onboard-user',
  async ({ step, input }) => {
    const user = await step.run('create-account', () => db.users.create(input))
    await step.run('send-welcome', () => sendEmail(user.email, 'Welcome!'))
    return { userId: user.id }
  },
  { inputSchema: z.object({ email: z.string().email() }) },
)

const engine = new WorkflowEngine({
  connectionString: process.env.DATABASE_URL,
  workflows: [onboardUser],
})
await engine.start()

await engine.startWorkflow({
  workflowId: 'onboard-user',
  input: { email: 'alice@example.com' },
})

That's it. Each step runs exactly once. Crash, redeploy, or retry - the workflow resumes from where it left off. State lives in your existing PostgreSQL database.


Why pg-workflows

  • Zero new infrastructure - if you have Postgres, you're done. No Redis, no Temporal server, no SaaS bill.
  • Feels like plain TypeScript - workflows are async functions. No DSL, no YAML, no DAG builder.
  • Durable by default - step results are persisted. Process crashes don't lose work or repeat expensive calls.
  • Pause, wait, resume - step.waitFor('event-name') pauses the workflow until your API fires an event. Zero resources consumed while waiting.
  • Schedules & polling built in - step.delay('3 days'), step.waitUntil('2025-01-01'), step.poll(...) - no cron, no external scheduler.
  • Built for AI agents - cache expensive LLM calls, retry on 429s, pause for human review. See AI patterns →
  • Client/worker separation - keep your API service light; run handlers in a worker. See architecture →
  • Idempotent starts - pass an idempotencyKey and duplicate calls safely return the same run.

Quick start

1. Install

npm install pg-workflows pg

pg is a peer dependency. pg-boss is bundled - nothing else to configure. The engine runs migrations automatically on start.

2. Define a workflow

import { workflow } from 'pg-workflows'
import { z } from 'zod'

export const sendWelcome = workflow(
  'send-welcome',
  async ({ step, input }) => {
    const user = await step.run('create-user', async () => {
      return { id: '123', email: input.email }
    })

    await step.run('send-email', async () => {
      await sendEmail(user.email, 'Welcome!')
    })

    // Pause until your API confirms the user. Zero cost while waiting.
    const confirmation = await step.waitFor('await-confirmation', {
      eventName: 'user-confirmed',
      timeout: 24 * 60 * 60 * 1000, // 24 hours
    })

    return { success: true, user, confirmation }
  },
  {
    inputSchema: z.object({ email: z.string().email() }),
    retries: 3,
  },
)

3. Start the engine and run it

import { WorkflowEngine } from 'pg-workflows'
import { sendWelcome } from './workflows'

const engine = new WorkflowEngine({
  connectionString: process.env.DATABASE_URL,
  workflows: [sendWelcome],
})
await engine.start()

const run = await engine.startWorkflow({
  workflowId: 'send-welcome',
  input: { email: 'user@example.com' },
})

// Later - resume the workflow with an event:
await engine.triggerEvent({
  runId: run.id,
  eventName: 'user-confirmed',
  data: { confirmedAt: new Date() },
})

// Track progress anytime:
const progress = await engine.checkProgress({ runId: run.id })
console.log(`${progress.completionPercentage}% complete`)

That's the whole loop. No extra services. Everything durable. Everything queryable with plain SQL.


What can you build?

  • AI agents & LLM pipelines - durable multi-step agents, cached expensive calls, human-in-the-loop
  • User onboarding - signup flows with email verification and conditional paths
  • Payment & checkout - retry-safe payment processing with event-driven confirmations
  • Background job orchestration - replace fragile cron jobs with observable workflows
  • Approval workflows - pause indefinitely until a human reviews
  • Data pipelines - ETL with step-by-step durability and progress tracking

See runnable examples and common patterns →


Documentation

  • Architecture - single-service and microservices (client/worker) setups
  • Core Concepts - workflows, steps, events, delays, polling, pause/resume, idempotency, input validation
  • AI & Agent Workflows - durable LLM pipelines, human-in-the-loop, RAG
  • Examples - conditional steps, batch loops, scheduled reminders, retries, monitoring
  • API Reference - WorkflowEngine, WorkflowClient, WorkflowRef, types
  • Configuration - env vars, database setup, requirements
  • Observability - OpenTelemetry tracing via otelPlugin

Observability with OpenTelemetry

pg-workflows ships a first-party plugin that emits OTel spans for workflow and step execution. @opentelemetry/api is an optional peer dependency — install it only if you want tracing.

npm install @opentelemetry/api @opentelemetry/sdk-node
import { NodeSDK } from '@opentelemetry/sdk-node'
import { trace } from '@opentelemetry/api'
import { workflow, otelPlugin } from 'pg-workflows'

// Initialize your OTel SDK however you normally do — for Node apps the
// NodeSDK registers an AsyncHooks context manager, which is required for
// hierarchical (parent/child) spans across async boundaries.
new NodeSDK({ /* exporters, resource, ... */ }).start()

const tracedWorkflow = workflow.use(otelPlugin())

const myWorkflow = tracedWorkflow('checkout', async ({ step }) => {
  await step.run('charge', async () => { /* ... */ })
  await step.waitFor('await-shipment', { eventName: 'shipped' })
})

The plugin emits a pg_workflows.workflow.run span per worker execution (one per resume cycle), with child spans per step kind (pg_workflows.step.run, pg_workflows.step.waitFor, etc.). Spans carry workflow.id, workflow.run_id, workflow.attempt and, where set, workflow.resource_id. Steps replayed from cache after a pause emit no spans.

Options:

otelPlugin({
  tracer: trace.getTracer('my-app'),                // default: trace.getTracer('pg-workflows')
  spanNamePrefix: 'pg_workflows',                   // default shown
  attributes: (ctx) => ({ tenant: ctx.resourceId }), // extra static attrs on workflow.run
})

Metrics, distributed trace context propagation across child workflows, and HTTP-caller context propagation are not in v1 — see the observability docs for the deferral rationale.


Requirements

  • Node.js >= 18
  • PostgreSQL >= 10

Acknowledgments

Special thanks to the teams behind Temporal, Inngest, Trigger.dev, and DBOS for pioneering durable execution patterns and inspiring this project.

License

MIT