Hogsend

Events & Ingestion

Push events from any source to trigger journeys and track user behavior.

Overview

Events are the foundation of Hogsend. Every journey enrollment, exit condition check, and contact record flows through a single ingestion pipeline. You can push events via the REST API directly, or wire up webhook sources from external systems like PostHog.

When an event is ingested, five things happen concurrently:

  1. Store -- the event is persisted to the user_events table
  2. Route -- the event is pushed to Hatchet, which routes it to any journey whose trigger matches the event name
  3. Exit check -- all active/waiting journey states for the user are evaluated against exitOn rules
  4. Contact upsert -- the user's contact record is created or updated with the latest properties

This all happens in a single request -- the API returns the result synchronously.

Ingest API

POST /v1/ingest

Send events directly from your application code.

Request body

{
  event: string;        // required -- event name (e.g. "user:signed_up")
  userId: string;       // required -- unique user identifier
  userEmail?: string;   // optional -- user's email address
  properties?: Record<string, unknown>; // optional -- arbitrary event data
  timestamp?: string;   // optional -- ISO 8601 datetime
}

Response 202 Accepted

{
  stored: boolean;
  exits: Array<{
    journeyId: string;
    stateId: string;
    exited: boolean;
  }>;
}

The exits array reports every active journey state that was evaluated. Entries with exited: true indicate the user was removed from that journey because this event matched an exit condition.

Example

curl -X POST https://api.hogsend.com/v1/ingest \
  -H "Content-Type: application/json" \
  -d '{
    "event": "user:signed_up",
    "userId": "user_abc123",
    "userEmail": "[email protected]",
    "properties": {
      "plan": "pro",
      "source": "landing-page"
    }
  }'
{
  "stored": true,
  "exits": []
}

Event data model

Events are stored in the user_events table with the following schema:

ColumnTypeDescription
iduuidAuto-generated primary key
user_idtextThe external user identifier
eventtextEvent name
propertiesjsonbArbitrary event properties
occurred_attimestamptzDefaults to now()

The table is indexed on user_id, event, occurred_at, and the composite (user_id, event, occurred_at) for efficient lookups during journey condition checks.

How routing works

When Hogsend pushes an event to Hatchet, the event payload is serialized as:

{
  userId: string;
  userEmail: string;
  properties: Record<string, string | number | boolean | null>;
}

Properties are filtered to JSON-primitive values only (strings, numbers, booleans, and null). Complex nested objects are stripped before dispatch to Hatchet.

Each journey declares its trigger event in meta.trigger.event. Hatchet routes the event to all journeys listening on onEvents: [trigger.event]. If the journey has trigger.where conditions, they are evaluated inside the journey task before the run() function executes.

Exit conditions

Every ingested event also triggers an exit condition scan. Hogsend queries all active or waiting journey states for the user, then checks each journey's exitOn rules:

exitOn?: Array<{
  event: string;              // event name to match
  where?: PropertyCondition[]; // optional property conditions
}>;

If the ingested event name matches and all where conditions pass (or none are defined), the journey state is set to "exited" and the exitedAt timestamp is recorded. This happens regardless of where the user is in the journey flow.

Contact upsert

Every ingested event upserts a record in the contacts table using userId as the external identifier. This ensures contact records are created automatically the first time a user triggers any event. The contact's email, properties, and lastSeenAt fields are updated on each event. If the upsert fails, it logs a warning but does not block the event from being processed.

Webhook sources

Webhook sources let external systems push events into Hogsend through dedicated endpoints at POST /v1/webhooks/:sourceId. Each source defines its own authentication, payload validation, and transform logic.

How webhook sources work

  1. A request arrives at /v1/webhooks/:sourceId
  2. The source is looked up by ID -- returns 404 if unknown
  3. Authentication is verified by comparing the configured header against the expected secret from environment variables
  4. If a Zod schema is defined, the payload is validated -- returns 400 with details on failure
  5. The transform() function converts the external payload into an IngestEvent
  6. If transform() returns null, the event is skipped (returns 200 with skipped: true)
  7. Otherwise, the event is passed to ingestEvent() and processed through the full pipeline

Built-in: PostHog source

Hogsend ships with a PostHog webhook source out of the box.

Endpoint: POST /v1/webhooks/posthog

Authentication: Set POSTHOG_WEBHOOK_SECRET in your environment and configure PostHog to send it via the x-posthog-webhook-secret header.

Payload format:

{
  event: {
    uuid?: string;
    event: string;         // becomes the Hogsend event name
    distinct_id: string;   // becomes userId
    timestamp?: string;
    properties?: Record<string, unknown>;
    url?: string;
  };
  person?: {
    id?: string;
    name?: string;
    url?: string;
    properties?: {
      email?: string;      // used as userEmail
      [key: string]: unknown;
    };
  };
  groups?: Record<string, unknown>;
  project?: Record<string, unknown>;
}

The transform merges event.properties and person.properties into a single properties object. If event.uuid is present, it is preserved as _posthogEventId.

Creating a custom webhook source

Use defineWebhookSource() to create a new source. Each source needs a unique ID, authentication config, an optional Zod schema for validation, and a transform function.

1. Define the source

src/webhook-sources/stripe.ts
import { z } from "zod";
import { defineWebhookSource } from "./define-webhook-source.js";

const stripeEventSchema = z.object({
  id: z.string(),
  type: z.string(),
  data: z.object({
    object: z.record(z.string(), z.unknown()),
  }),
});

export const stripeSource = defineWebhookSource({
  meta: {
    id: "stripe",
    name: "Stripe",
    description: "Receives Stripe webhook events.",
  },
  auth: {
    header: "x-stripe-secret",
    envKey: "STRIPE_WEBHOOK_SECRET",
    type: "match",
  },
  schema: stripeEventSchema,
  async transform(payload) {
    const userId = payload.data.object.customer as string;
    if (!userId) return null; // skip events without a customer

    return {
      event: `stripe:${payload.type}`,
      userId,
      userEmail: (payload.data.object.email as string) ?? "",
      properties: {
        stripeEventId: payload.id,
        ...payload.data.object,
      },
    };
  },
});

2. Register it

Add the source to the allSources array in src/webhook-sources/index.ts:

src/webhook-sources/index.ts
import { posthogSource } from "./posthog.js";
import { stripeSource } from "./stripe.js";

const allSources: DefinedWebhookSource[] = [posthogSource, stripeSource];

3. Set the environment variable

Add STRIPE_WEBHOOK_SECRET to your .env and deployment configuration.

The new source is immediately available at POST /v1/webhooks/stripe.

The defineWebhookSource API

interface WebhookSourceMeta {
  id: string;          // URL-safe identifier, used in the endpoint path
  name: string;        // Human-readable name
  description?: string;
}

interface WebhookSourceAuth {
  header: string;      // HTTP header containing the secret
  envKey: string;      // Environment variable name for the expected secret
  type: "match";       // Auth strategy (exact string match)
}

interface WebhookSourceCtx {
  db: Database;        // Drizzle database instance
  logger: Logger;      // Structured logger
}

function defineWebhookSource<T>(def: {
  meta: WebhookSourceMeta;
  auth: WebhookSourceAuth;
  schema?: z.ZodSchema<T>;       // Optional Zod schema for payload validation
  transform(
    payload: T,
    ctx: WebhookSourceCtx
  ): Promise<IngestEvent | null>; // Return null to skip the event
}): DefinedWebhookSource<T>;

The transform function receives the validated payload and a context object with database and logger access. Return an IngestEvent to process the event, or null to silently skip it.

The IngestEvent type

Every event -- whether from the API or a webhook source -- must conform to this shape before entering the pipeline:

interface IngestEvent {
  event: string;                     // event name
  userId: string;                    // unique user identifier
  userEmail: string;                 // user email (empty string if unknown)
  properties: Record<string, unknown>; // arbitrary event data
}

Best practices

  • Use namespaced event names -- prefer user:signed_up or feature:first_use over generic names. Journeys match on exact event names.
  • Keep properties flat -- nested objects are stripped when routing to Hatchet. Use flat key-value pairs for properties that journey conditions need to evaluate.
  • Include email when possible -- the userEmail field powers contact upserts and journey email delivery. Without it, Hogsend can track behavior but cannot send emails.
  • Return null from transform for irrelevant events -- webhook sources often receive events you do not care about. Returning null from transform() skips processing cleanly without errors.
  • Validate with Zod schemas -- defining a schema on your webhook source catches malformed payloads at the boundary, before they reach your transform logic.

On this page