Connect
Overview
Section titled “Overview”Connect is how your app talks to third-party systems—Stripe, Salesforce, Slack, GitHub, and hundreds more via Airbyte-compatible manifests. One HTTP surface covers three primitives:
- Actions — Typed RPC calls (
createLead,postMessage,refundCharge) with dry-run support. - Streams — Bidirectional data replication using the Airbyte wire protocol.
- Webhooks — Inbound event receivers with signature verification and dispatch to jobs, streams, or functions.
Connect is a composition capability: it orchestrates Identity, Vault (credentials), Data (destinations), Storage (large payloads), Functions (transforms), Jobs (durable syncs), and the shared policy engine. Sandbox mode lets agents test syncs and actions before touching production data.
Connect is in beta: native connectors and the manifest adapter are production-ready; the full connector catalog and bidirectional conflict DSL continue to expand.
Key features
Section titled “Key features”- Unified connector contract — Every connector exposes Streams, Actions, and Webhooks through one descriptor.
- Native Rust connectors — Stripe, Slack, Linear, GitHub, Salesforce (phased), Postgres-CDC, S3.
- Manifest adapter — Airbyte Low-Code YAML interpreted in pure Rust (~100+ connectors at v0).
- Sandbox verb — Structured diff, schema inference, and
promote_tokenworkflow. - Airbyte wire protocol —
AirbyteRecordMessage,AirbyteStateMessage,ConfiguredAirbyteCatalog. - Bidirectional sync — Two stream bindings plus conflict policies (
last_write_wins, source of truth). - Webhook ingress — Anonymous hot path gated by receiver token + HMAC verification.
Quickstart
Section titled “Quickstart”Create a Stripe instance, check credentials, and invoke an action.
reactor auth login user@example.com --password '...'
reactor connect catalog show stripe
reactor connect instances create my-stripe --type stripereactor connect instances credentials my-stripe \ --input '{"api_key":"sk_test_..."}'
reactor connect instances check my-stripe
reactor connect actions invoke my-stripe createCustomer \ --input '{"email":"ada@example.com","name":"Ada Lovelace"}' \ --dry-runimport { createClient } from '@reactor/client';import { ReactorConnect } from '@reactor/connect-sdk';
const reactor = createClient({ url: process.env.REACTOR_URL! });await reactor.auth.signInWithPassword({ email, password });reactor.auth.setOrg('acme');
const connect = new ReactorConnect({ client: reactor });
const instance = await connect.instances.create({ typeId: 'stripe', name: 'my-stripe',});
await connect.instances.setCredentials('my-stripe', { api_key: process.env.STRIPE_SECRET_KEY!,});
await connect.instances.check('my-stripe');
const preview = await connect.instances .action('my-stripe', 'createCustomer') .sandbox({ email: 'ada@example.com', name: 'Ada Lovelace' });
console.log(preview.validation);# Browse catalogcurl -s "$REACTOR_URL/connect/v1/catalog/stripe" \ -H "Authorization: Bearer $REACTOR_TOKEN" \ -H "X-Reactor-Org: acme"
# Create instancecurl -s -X POST "$REACTOR_URL/connect/v1/instances" \ -H "Authorization: Bearer $REACTOR_TOKEN" \ -H "X-Reactor-Org: acme" \ -H "Content-Type: application/json" \ -d '{"type_id":"stripe","name":"my-stripe"}'
# Check connectioncurl -s -X POST "$REACTOR_URL/connect/v1/instances/my-stripe/check" \ -H "Authorization: Bearer $REACTOR_TOKEN" \ -H "X-Reactor-Org: acme"
# Invoke action (dry-run)curl -s -X POST "$REACTOR_URL/connect/v1/instances/my-stripe/actions/createCustomer/sandbox" \ -H "Authorization: Bearer $REACTOR_TOKEN" \ -H "X-Reactor-Org: acme" \ -H "Content-Type: application/json" \ -d '{"input":{"email":"ada@example.com","name":"Ada Lovelace"}}'How-to guides
Section titled “How-to guides”Sync Salesforce leads to Data (with sandbox)
Section titled “Sync Salesforce leads to Data (with sandbox)”The CRM migration demo flow: discover, connect, sandbox, promote.
# OAuth flow for Salesforcereactor connect instances create sf-prod --type salesforcereactor connect instances credentials sf-prod # opens browser for OAuth
reactor connect instances discover sf-prod
reactor connect connections create sf-leads-to-pg \ --from sf-prod \ --streams Lead,Contact \ --dest data:salesforce_leads \ --schedule "*/15 * * * *"
reactor connect connections sandbox sf-leads-to-pg --limit-rows 100reactor connect connections promote sf-leads-to-pg --token "$PROMOTE_TOKEN"await connect.instances.create({ typeId: 'salesforce', name: 'sf-prod' });const catalog = await connect.instances.discover('sf-prod');
await connect.connections.create({ name: 'sf-leads-to-pg', source: { instance: 'sf-prod', streams: [{ name: 'Lead', mode: 'incremental_dedup' }] }, destination: { kind: 'data', table: 'salesforce_leads' }, schedule: { cron: '*/15 * * * *' },});
const sandbox = await connect.connections.sandbox('sf-leads-to-pg', { limitRows: 100 });if (sandbox.errors.length === 0) { await connect.connections.promote('sf-leads-to-pg', sandbox.promoteToken);}Create a webhook receiver
Section titled “Create a webhook receiver”reactor connect receivers create my-stripe \ --webhook events \ --dispatch job:process-stripe-event
# Returns ingress_url: /connect/v1/ingress/{token}curl -s -X POST "$REACTOR_URL/connect/v1/instances/my-stripe/receivers" \ -H "Authorization: Bearer $REACTOR_TOKEN" \ -H "X-Reactor-Org: acme" \ -H "Content-Type: application/json" \ -d '{ "webhook": "events", "dispatch": { "kind": "job", "name": "process-stripe-event" } }'Configure the ingress URL in your vendor dashboard (e.g. Stripe webhook endpoint). Connect verifies signatures per the connector descriptor.
Set a bidirectional conflict policy
Section titled “Set a bidirectional conflict policy”When inbound and outbound connections share streams, define how writes conflict.
reactor connect policies conflict create \ --connection-a sf-leads-to-pg \ --connection-b pg-to-sf \ --rules 'Lead:last_write_wins,Account:source_of_truth_a'curl -s -X POST "$REACTOR_URL/connect/v1/policies/conflict" \ -H "Authorization: Bearer $REACTOR_TOKEN" \ -H "X-Reactor-Org: acme" \ -H "Content-Type: application/json" \ -d '{ "pair": { "connection_a": "sf-leads-to-pg", "connection_b": "pg-to-sf" }, "rules": [ { "stream": "Lead", "policy": "last_write_wins", "tiebreak": "source_of_truth_a" } ] }'Configuration
Section titled “Configuration”[connect]# Vault integration for credentialsvault_url = "http://localhost:8010"
[connect.data]url = "http://localhost:8002"api_key = "internal-data-api-key"
[connect.jobs]url = "http://localhost:8005"api_key = "internal-jobs-api-key"
[connect.storage]url = "http://localhost:8003"api_key = "internal-storage-api-key"
# Airbyte container adapter (size 5+ only — requires Docker/Firecracker)[connect.airbyte]enabled = falseEnvironment variables:
| Variable | Description |
|---|---|
REACTOR_CONNECT_BIND | Default 0.0.0.0:8007 (may vary) |
REACTOR_CONNECT_DATABASE_URL | Metadata Postgres |
REACTOR_CONNECT_VAULT_URL | Credential storage |
REACTOR_CONNECT_JOBS_URL | Durable sync execution |
REACTOR_CONNECT_DATA_URL | Default stream destination |
Limits and quotas
Section titled “Limits and quotas”| Limit | Value | Notes |
|---|---|---|
| Sandbox row limit | Configurable per request | Default 100 rows |
| Sandbox TTL | 1 hour | Ephemeral schema auto-cleaned |
| Discover cache | 1 hour | Re-discover to refresh catalog |
| Native connectors (v0.1) | Stripe, Slack, Linear, GitHub | Salesforce + bidirectional in v0.3 |
| Manifest connectors (v0.2) | 30+ Airbyte YAML | Pure Rust interpreter |
| Airbyte container (v0.4) | size 5+ only | Long-tail catalog |
| Webhook replay window | Per descriptor | Typically 5 minutes |
| OAuth credentials | User-provided | Stored in Vault, never in metadata DB |
Native connector feature matrix (v0.1):
| Connector | Actions | Streams | Webhooks |
|---|---|---|---|
| Stripe | ✅ | ✅ | ✅ |
| Slack | ✅ | — | ✅ |
| Linear | ✅ | ✅ | ✅ |
| GitHub | ✅ | ✅ | ✅ |
Dry-run support levels:
| Level | Behavior |
|---|---|
native | Vendor test mode (Stripe test keys, SF sandbox) |
synthesized | Returns outbound request without sending |
unsupported | 422 with suggested_fix |
API and SDK links
Section titled “API and SDK links”- HTTP base path:
/connect/v1/ - OpenAPI reference: Connect API
- TypeScript SDK:
@reactor/connect-sdk - CLI:
reactor connect
| Method | Path | Description |
|---|---|---|
GET | /connect/v1/catalog | List connector types |
GET | /connect/v1/catalog/{type_id} | Full descriptor |
POST | /connect/v1/instances | Create configured instance |
POST | /connect/v1/instances/{name}/discover | Schema discovery |
POST | /connect/v1/connections | Create stream binding |
POST | /connect/v1/connections/{name}/sandbox | Sandbox sync |
POST | /connect/v1/instances/{name}/actions/{action}/invoke | Run action |
POST | /connect/v1/ingress/{token} | Webhook hot path (anonymous) |
Codegen typed clients:
reactor connect codegen --instance sf-prod --out src/connect/sf-prod.tsTroubleshooting
Section titled “Troubleshooting”auth_token_expired during sync or action
Section titled “auth_token_expired during sync or action”OAuth refresh failed. Re-authenticate the instance.
reactor connect instances credentials sf-prod # re-run OAuth flowreactor connect instances check sf-prodStructured errors include suggested_fix and optional docs_url—agents should surface these to users directly.
Sandbox shows schema drift you did not expect
Section titled “Sandbox shows schema drift you did not expect”Review diff_vs_current.added_columns and type_changes. Promote only when the diff is acceptable. Set options.schema_drift on the connection to block_until_approved for production safety.
Webhook returns 401
Section titled “Webhook returns 401”Signature verification failed. Confirm the signing secret in Vault matches the vendor dashboard. Check clock skew for replay protection (timestamp within replay_window).
dry_run_unsupported (422)
Section titled “dry_run_unsupported (422)”The action’s descriptor marks dry-run as unsupported. Use a vendor test instance (e.g. Stripe test mode keys) or invoke in sandbox mode on a stream binding instead.
Sync run failed—where to look
Section titled “Sync run failed—where to look”reactor connect connections runs list sf-leads-to-pg --status failedreactor connect connections runs logs sf-leads-to-pg run_01HZ... --followRuns delegate to Jobs internally; cancel and retry via the Connect façade or Jobs admin API.