Skip to content

Hardonian/JobForge

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

107 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

JobForge

Agent Router for Multi-Tenant SaaS

JobForge routes autonomous agent workloads through PostgreSQL. No Redis, no Kafka, no message bus—just SQL, RPC, and determinism.

Built for engineers who need agents that complete work with observability and guarantees.

What It Does

JobForge is an agent router: it takes jobs from AI agents, SaaS webhooks, or internal services and guarantees they run exactly once, in order, with full observability.

┌──────────────────────────────────────────────────────────────┐
│  Agent / Webhook / Service                                    │
└──────────────────┬───────────────────────────────────────────┘
                   │ HTTP / RPC
                   ▼
┌──────────────────────────────────────────────────────────────┐
│  PostgreSQL/Supabase                                         │
│  ┌─────────────────────┐  ┌──────────────────────────────┐    │
│  │ jobforge_jobs       │  │ RPC Functions              │    │
│  │ - job queue         │  │ - enqueue (idempotent)     │    │
│  │ - result storage    │  │ - claim (SKIP LOCKED)      │    │
│  │ - attempt tracking  │  │ - complete / fail          │    │
│  └─────────────────────┘  └──────────────────────────────┘    │
└──────────────────┬───────────────────────────────────────────┘
                   │ Poll via RPC
                   ▼
┌──────────────────────────────────────────────────────────────┐
│  Workers (TypeScript / Python)                              │
│  - Poll for jobs via claim()                                 │
│  - Execute with trace_id correlation                         │
│  - Return results or retry with backoff                      │
└──────────────────────────────────────────────────────────────┘

Key Design Decisions:

  • Postgres as Router: Job state, ordering, and durability are Postgres's problem
  • Idempotent Enqueue: Same (tenant, type, key) = same job_id, no duplicates
  • RLS Isolation: Workers only see jobs for tenants they have access to
  • Deterministic Traces: Every execution produces input snapshot + decision trace + output artifact
  • No External Dependencies: Works with stock Postgres 14+, no extensions needed

Quick Start

1. Database Setup

# Using Supabase CLI
cd supabase
supabase db push

# Or plain psql
psql $DATABASE_URL -f supabase/migrations/001_jobforge_core.sql

2. Enqueue Work

import { JobForgeClient } from '@jobforge/sdk-ts'

const client = new JobForgeClient({
  supabaseUrl: process.env.SUPABASE_URL!,
  supabaseKey: process.env.SUPABASE_SERVICE_ROLE_KEY!,
})

// Route an AI agent task
const job = await client.enqueueJob({
  tenant_id: 'tenant-uuid',
  type: 'autopilot.ops.scan',
  payload: {
    target: 'production',
    scan_type: 'cost_optimization',
  },
  idempotency_key: 'daily-cost-scan-2024-01-15',
})

console.log(`Routed: ${job.id}`)

3. Run Worker

# TypeScript Worker
cd services/worker-ts
cp .env.example .env
# Add SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY
pnpm install
pnpm start

# Python Worker
cd services/worker-py
cp .env.example .env
pip install -r requirements.txt
python -m jobforge_worker.cli run

For Contributors

Add a Runner

Runners are job processors grouped by domain:

  1. Create runner config in packages/shared/src/runner-contract-enforcement.ts:
const myRunner: RunnerConfig = {
  runner_id: 'my-custom-runner',
  runner_type: 'ops',
  version: '1.0.0',
  methods: {
    execute: true,
    validate: true,
    health: true,
    trace: true,
  },
  determinism: {
    input_snapshot: true,
    decision_trace: true,
    output_artifact: true,
    replayable: true,
  },
  // ... see runner-contract-enforcement.ts for full schema
}
  1. Add handler in services/worker-ts/src/handlers/my-domain/:
export const myJobHandler: JobHandler = async (payload, context) => {
  // 1. Validate input (schema already checked, but validate business rules)
  // 2. Execute with trace logging
  // 3. Return deterministic output
  return { success: true, result: 'processed' }
}
  1. Register handler in services/worker-ts/src/lib/registry.ts:
registerHandler('my.job.type', myJobHandler)
  1. Add contract tests in packages/shared/test/contract-tests.ts:
// Golden test: input → expected output
GOLDEN_CONTRACT_TESTS.ops.push({
  name: 'my_job_valid_input',
  input: {
    /* ... */
  },
  expected_output: { success: true },
  expected_trace_keys: ['timestamp', 'runner_id', 'decision'],
  expected_artifact_keys: ['result'],
  deterministic: true,
})

Add a Connector

Connectors define integration capabilities:

  1. Create metadata in connectors/my-connector/metadata.json:
{
  "connector_id": "my.api",
  "version": "1.0.0",
  "status": "stable",
  "maturity": "production",
  "supported_job_types": ["my.api.call"],
  "capabilities": {
    "bidirectional": false,
    "streaming": false,
    "batch": true,
    "real_time": false,
    "webhook": true,
    "polling": true
  },
  "auth": {
    "required": true,
    "methods": ["api_key"],
    "credentials_storage": "env"
  },
  "rate_limits": {
    "requests_per_second": 10,
    "burst_size": 20,
    "quota_period": "minute",
    "retry_after_header": true
  },
  "failure_modes": [
    {
      "type": "rate_limit_exceeded",
      "retryable": true,
      "retry_strategy": "exponential_backoff",
      "max_retries": 3,
      "fallback_behavior": "queue",
      "circuit_breaker": true
    }
  ],
  "observability": {
    "metrics": true,
    "logs": true,
    "traces": true,
    "health_check": true
  }
}
  1. Run registry validation:
pnpm exec tsx scripts/validate-connector.ts connectors/my-connector
  1. Generate registry files:
pnpm exec tsx scripts/generate-registry.ts --output docs/connectors/

Build a Connector in 5 Minutes

JobForge connectors are deterministic functions that implement the runConnector interface. Here's how to build one:

1. Install the SDK

npm install @jobforge/sdk
# or
pnpm add @jobforge/sdk

2. Create Your Connector

import { z } from 'zod'
import { type ConnectorFn, EvidenceBuilder, hashOutput } from '@jobforge/sdk'

// Define input schema
const MyConnectorInputSchema = z.object({
  message: z.string().min(1),
  uppercase: z.boolean().default(false),
})

// Implement the connector function
export const myConnector: ConnectorFn = async (params) => {
  const builder = new EvidenceBuilder({
    connector_id: params.config.connector_id,
    trace_id: params.context.trace_id,
    tenant_id: params.context.tenant_id,
    input: params.input,
  })

  try {
    // Validate input
    const validated = MyConnectorInputSchema.parse(params.input.payload)

    // Process
    const result = validated.uppercase ? validated.message.toUpperCase() : validated.message

    // Return success with evidence
    return {
      ok: true,
      data: { result },
      evidence: builder.buildSuccess({ result }),
    }
  } catch (error) {
    // Return failure with evidence
    return {
      ok: false,
      error: {
        code: 'VALIDATION_ERROR',
        message: 'Invalid input',
        retryable: false,
      },
      evidence: builder.buildFailure({
        code: 'VALIDATION_ERROR',
        message: 'Invalid input',
        retryable: false,
      }),
    }
  }
}

3. Test Your Connector

import { runConnector } from '@jobforge/sdk'

const result = await runConnector(myConnector, {
  config: {
    connector_id: 'my-connector',
    auth_type: 'none',
    settings: {},
    retry_policy: {
      max_retries: 0,
      base_delay_ms: 1000,
      max_delay_ms: 1000,
      backoff_multiplier: 1,
    },
    timeout_ms: 5000,
  },
  input: {
    operation: 'process',
    payload: { message: 'hello', uppercase: true },
  },
  context: {
    trace_id: 'test-123',
    tenant_id: '00000000-0000-0000-0000-000000000001',
    dry_run: false,
    attempt_no: 1,
  },
})

console.log(result.data?.result) // "HELLO"

4. Add Metadata (Optional)

Create metadata.json for the connector registry:

{
  "connector_id": "my-connector",
  "version": "1.0.0",
  "status": "stable",
  "name": "My Connector",
  "description": "Processes messages with optional uppercase transformation",
  "supported_job_types": ["process"],
  "capabilities": {
    "bidirectional": false,
    "streaming": false,
    "batch": false,
    "real_time": false,
    "webhook": false,
    "polling": false
  },
  "auth": {
    "required": false,
    "methods": ["none"]
  },
  "rate_limits": {
    "requests_per_second": 10,
    "burst_size": 20
  }
}

5. Publish to Registry

Submit a PR to add your connector to the JobForge registry. ControlPlane will automatically load it!

How ControlPlane Loads Connectors

ControlPlane discovers and loads connectors through the JobForge registry:

Registry Discovery

  1. Registry Index: ControlPlane fetches https://registry.jobforge.dev/index.json containing all published connectors
  2. Metadata Validation: Each connector's metadata.json is validated against the schema
  3. Capability Filtering: Connectors are filtered by runtime capabilities (Node.js version, network access, etc.)
  4. Dependency Resolution: Required packages are installed in isolated environments

Runtime Loading

  1. Container Provisioning: Each connector runs in a secure container with resource limits
  2. Code Execution: Connectors are loaded via import() with timeout protection
  3. Evidence Collection: All executions produce tamper-proof evidence packets
  4. Health Monitoring: Connectors are monitored for performance and reliability

Security Model

  • Isolated Execution: Each connector runs in its own container
  • Resource Limits: CPU, memory, and network access are restricted
  • Evidence Verification: All outputs are cryptographically signed
  • Audit Logging: Every execution is logged with tenant attribution

Integration Examples

See examples/integrations/ for working code:

  • ReadyLayer (readylayer-example.ts) - CDN cache warming, asset optimization
  • Settler (settler-example.ts) - Contract lifecycle management
  • AIAS (aias-example.ts) - AI agent task routing
  • TruthCore (truthcore-example.ts) - Data verification pipelines

Architecture

JobForge separates concerns into layers:

Router Layer (Postgres)

  • jobforge_jobs - Queue with SKIP LOCKED claim
  • jobforge_job_results - Immutable execution results
  • jobforge_triggers - Event-to-job routing
  • RLS policies enforce tenant boundaries

Runner Layer (Workers)

  • Poll via claim_jobs() RPC
  • Execute with trace_id correlation
  • Retry with exponential backoff
  • Dead-letter after max attempts

Contract Layer (Validation)

  • Runner schemas enforce determinism
  • Golden tests validate behavior
  • Registry tracks connector metadata
  • CI blocks merge on contract drift

See ARCHITECTURE.md for RPC definitions, RLS policies, and concurrency model.

Monorepo Structure

jobforge/
├── supabase/
│   ├── migrations/          # SQL schema and RPC functions
│   └── tests/               # RLS isolation tests
├── packages/
│   ├── sdk-ts/              # TypeScript client SDK
│   ├── sdk-py/              # Python client SDK
│   ├── shared/              # Contract enforcement, validation
│   │   ├── src/
│   │   │   ├── runner-contract-enforcement.ts
│   │   │   ├── connector-registry.ts
│   │   │   └── invocation-determinism.ts
│   │   └── test/
│   │       └── contract-tests.ts       # Golden tests
│   └── adapters/
│       ├── readylayer/       # CDN integration
│       ├── settler/          # Contract management
│       ├── aias/             # AI agent routing
│       └── keys/             # API key management
├── services/
│   ├── worker-ts/           # TypeScript worker
│   └── worker-py/           # Python worker
├── examples/
│   └── integrations/        # Working integration examples
└── docs/
    ├── ARCHITECTURE.md      # Design docs
    ├── RUNBOOK.md           # Ops guide
    └── integrations/        # Adapter-specific guides

CLI

Command Purpose
pnpm jobforge:doctor Health checks
pnpm jobforge:impact:show --run <id> View execution impact
pnpm run contract-tests Validate runner contracts
pnpm run test Unit tests
pnpm run verify:fast Lint + typecheck + build

See docs/cli.md for full reference.

Development

# Setup
pnpm install

# Verify (fast)
pnpm run verify:fast

# Full verification (includes tests)
pnpm run verify:full

# Run specific checks
pnpm run lint
pnpm run typecheck
pnpm run test
pnpm run build

Health Check

The web app exposes a health endpoint:

GET /api/health

The response includes a correlation ID and is rate-limited in memory.

E2E Tests

Smoke + failure-path workflows run via the shared E2E suite:

pnpm run e2e:smoke

Documentation

License

MIT - See LICENSE

Contributing

See CONTRIBUTING.md


JobForge - Route agent work through PostgreSQL. Idempotent execution with tenant isolation.

Repository Operations Standards

  • Squash-only merges
  • Auto-delete branches
  • Security scanning in CI

About

Language-Agnostic Job Orchestrator for Postgres Problem: A Postgres-native job framework: Language-agnostic workers (Python, Node, Go) Idempotency, retries, backoff built-in RPC-first, RLS-aware. Core spec in SQL + JSON Reference workers in Python & TS No Redis, no Kafka—boringly correct OSS hook: Perfect for Supabase users tired of ad-hoc queues.

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors