close
Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(webapp): address review on per-org basin migration
- Use `org.id` (cuid, fixed-length, unique-by-construction) as the
  basin-name suffix instead of a truncated `org.slug`. The slug
  approach could silently collide two orgs whose slugs share a prefix
  past the truncation point, since the create call treats S2's 409 as
  success — a real cross-tenant isolation risk.
- `resolveRetentionForOrg` now distinguishes "billing not configured"
  from "billing call failed". OSS / self-hosted installs (no billing
  client) get `defaultRetention()` and the worker job converges; cloud
  installs that experience a transient billing failure throw and get
  retried by redis-worker. Previously every install without billing
  hit a permafail loop.
- `reconfigureBasinForOrg` throws when no S2 access token is
  configured instead of silently returning, so a misconfigured cloud
  install surfaces as a worker failure rather than stale retention.
- Duration env vars (`*_RETENTION*`, `*_DELETE_ON_EMPTY_MIN_AGE`)
  validated at boot via a `durationString()` Zod schema, so a
  misconfigured value fails fast at startup instead of at first basin
  operation.
- Admin reconfigure route's `retention` body field validated against
  the same duration shape — bad input is now a clean 400 rather than
  a 500 from `parseDuration`.
- Extract duration parsing into a shared `duration.server.ts` so the
  env validator and the provisioner share one source of truth.

Verified end-to-end with chat.agent locally — fresh chat lands in the
per-org basin, no leakage to the global fallback.
  • Loading branch information
ericallam committed May 4, 2026
commit 054d1afcdb8260e0951d57007d8f4b91a78cb27c
23 changes: 18 additions & 5 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ import { MachinePresetName } from "@trigger.dev/core/v3";
import { BoolEnv } from "./utils/boolEnv";
import { isValidDatabaseUrl } from "./utils/db";
import { isValidRegex } from "./utils/regex";
import { isValidDuration } from "./services/realtime/duration.server";

/**
* `z.string()` constrained to a duration string parseable by
* `parseDuration` (e.g. `7d`, `30d`, `365d`, `1h`). Validated at boot
* so a typo'd retention env var fails fast at startup rather than
* lurking until the first basin operation.
*/
function durationString() {
return z
.string()
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
}

// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
Expand Down Expand Up @@ -1521,19 +1534,19 @@ const EnvironmentSchema = z
/// Used at org-create and as the fallback when no plan-specific
/// retention is resolved. Operators that don't run a billing API
/// only need this one.
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: z.string().default("30d"),
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"),
/// Plan-specific retention overrides — only consulted by the
/// optional `streamBasinRetentionByPlan` shim. Operators that
/// don't map plans to retention (OSS, self-hosted) can ignore
/// these and rely on the default above.
REALTIME_STREAMS_BASIN_RETENTION_FREE: z.string().default("7d"),
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: z.string().default("30d"),
REALTIME_STREAMS_BASIN_RETENTION_PRO: z.string().default("365d"),
REALTIME_STREAMS_BASIN_RETENTION_FREE: durationString().default("7d"),
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: durationString().default("30d"),
REALTIME_STREAMS_BASIN_RETENTION_PRO: durationString().default("365d"),
/// Storage class applied to per-org basins at create time.
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
/// `delete_on_empty_min_age` applied to per-org basins. Streams
/// that go empty for this long are reaped automatically.
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: z.string().default("1h"),
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"),
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),

Expand Down
1 change: 0 additions & 1 deletion apps/webapp/app/models/organization.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ export async function createOrganization(
try {
await provisionBasinForOrg({
id: organization.id,
slug: organization.slug,
streamBasinName: organization.streamBasinName,
// No `retention` — provisioner uses `defaultRetention()`.
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { json, type ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
import { isValidDuration } from "~/services/realtime/duration.server";
import {
isPerOrgBasinsEnabled,
reconfigureBasinForOrg,
Expand All @@ -23,7 +24,10 @@ import { commonWorker } from "~/v3/commonWorker.server";
const BodySchema = z
.object({
orgId: z.string(),
retention: z.string().optional(),
retention: z
.string()
.refine(isValidDuration, "retention must be a duration like 7d, 30d, 365d, 1h, 1y")
.optional(),
})
.strict();

Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/services/platform.v3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ function initializeClient() {
}

const client = singleton("billingClient", initializeClient);

/**
* `true` when the billing client was instantiated — i.e. we're running
* in a cloud-style install with `BILLING_API_URL` + `BILLING_API_KEY`
* configured. OSS / self-hosted installs return `false` here, which
* lets callers distinguish "no billing wired up, fall back to
* defaults" from "billing wired up but the call failed, retry."
*/
export function isBillingConfigured(): boolean {
return client !== undefined;
}
// Failures from @trigger.dev/platform billing client calls are tracked via
// this metric (with low-cardinality {function, kind} labels) rather than
// logged. Every task invocation hits these paths, so per-call logs were too
Expand Down
49 changes: 49 additions & 0 deletions apps/webapp/app/services/realtime/duration.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Duration string parsing for stream-basin retention / delete-on-empty
* configuration. Used by `streamBasinProvisioner` (to convert to S2's
* integer-seconds wire format) and by `env.server.ts` (to validate
* duration-shaped env vars at boot rather than at first use).
*
* Accepts the short forms (`7d`, `30d`, `365d`, `1h`, `90m`, `45s`,
* `2w`, `1y`) and the human forms (`7days`, `1week`, `1year`).
*/

const PATTERN =
/^(\d+)\s*(s|sec|secs|seconds?|m|min|mins|minutes?|h|hour|hours?|d|day|days?|w|week|weeks?|y|year|years?)$/;

export function isValidDuration(input: string): boolean {
return PATTERN.test(input.trim().toLowerCase());
}

/**
* Parse a duration string into seconds. Throws on garbage so a
* misconfigured env var fails loudly. Use {@link isValidDuration}
* for non-throwing validation (e.g. inside a Zod `.refine()`).
*/
export function parseDuration(input: string): number {
const trimmed = input.trim().toLowerCase();
const match = trimmed.match(PATTERN);
if (!match) {
throw new Error(`Invalid duration string: ${input}`);
}
const value = parseInt(match[1]!, 10);
const unit = match[2]!;
const multiplier =
/^s/.test(unit)
? 1
: /^m(?:in|ins|inute|inutes)?$/.test(unit)
? 60
: /^h/.test(unit)
? 3600
: /^d/.test(unit)
? 86400
: /^w/.test(unit)
? 604800
: /^y/.test(unit)
? 31_536_000
: NaN;
if (!Number.isFinite(multiplier)) {
throw new Error(`Invalid duration unit: ${unit}`);
}
return value * multiplier;
}
79 changes: 27 additions & 52 deletions apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import type { PrismaClientOrTransaction } from "~/db.server";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { parseDuration } from "./duration.server";

export function isPerOrgBasinsEnabled(): boolean {
return env.REALTIME_STREAMS_PER_ORG_BASINS_ENABLED === "true";
Expand All @@ -48,36 +49,28 @@ export function defaultRetention(): string {
}

/**
* Build the basin name for an org. Format: `{prefix}-{env}-org-{slug}`.
* The org slug is already lowercase-and-hyphenated by
* `createOrganization`, so it satisfies S2 basin-name rules without
* further normalization. We truncate defensively to keep total length
* under 63 chars (a common bucket convention; verify against S2 docs
* before raising).
* Build the basin name for an org. Format: `{prefix}-{env}-org-{id}`.
*
* Throws if `REALTIME_STREAMS_BASIN_NAME_PREFIX` +
* `REALTIME_STREAMS_BASIN_NAME_ENV` are configured so long that no
* room remains for the slug — without this guard, `slice(0, 0)` would
* return an empty string and every org would share the same name,
* silently colliding via S2's 409-on-create.
* We use the org's `id` (cuid, fixed-length, unique-by-construction)
* rather than the slug. Slugs are user-influenced, can change, and —
* critically — could collide across orgs once truncated to fit the
* S2 basin-name length cap. cuid is short (25 chars) and never
* collides, so the basin name is stable and tenant-isolated by
* construction.
*
* Format check: `triggerdotdev-prod-org-{25 chars}` is 47 chars total,
* comfortably under the conventional 63-char cap. If you change the
* prefix / env-name to something extreme, this still fails fast at
* S2's validator.
*/
export function basinNameForOrg(org: { slug: string }): string {
export function basinNameForOrg(org: { id: string }): string {
const prefix = env.REALTIME_STREAMS_BASIN_NAME_PREFIX;
const envName = env.REALTIME_STREAMS_BASIN_NAME_ENV;
const head = `${prefix}-${envName}-org-`;
const budget = 63 - head.length;
if (budget <= 0) {
throw new Error(
`[streamBasinProvisioner] REALTIME_STREAMS_BASIN_NAME_PREFIX + REALTIME_STREAMS_BASIN_NAME_ENV too long: head="${head}" leaves no room for the org slug (budget=${budget}). Shorten the prefix or env-name values.`
);
}
const slug = org.slug.slice(0, budget);
return `${head}${slug}`;
return `${prefix}-${envName}-org-${org.id}`;
}

type ProvisionInput = {
id: string;
slug: string;
/// Duration string passed straight to S2. Defaults to
/// `defaultRetention()` when omitted. Caller decides; the provisioner
/// has no opinion about what retention is appropriate.
Expand Down Expand Up @@ -156,7 +149,15 @@ export async function reconfigureBasinForOrg(
if (!isPerOrgBasinsEnabled()) return;

const accessToken = env.REALTIME_STREAMS_S2_ACCESS_TOKEN;
if (!accessToken) return;
if (!accessToken) {
// Per-org basins are enabled but no token is configured — that's a
// misconfiguration, not a no-op condition. Throw so the worker job
// surfaces in the queue's failure log instead of silently leaving
// retention stale on the basin.
throw new Error(
"REALTIME_STREAMS_S2_ACCESS_TOKEN must be set when REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true"
);
}

Comment thread
ericallam marked this conversation as resolved.
const org = await prisma.organization.findFirst({
where: { id: orgId },
Expand Down Expand Up @@ -197,8 +198,8 @@ async function s2CreateBasin(name: string, opts: CreateBasinOptions): Promise<vo
create_stream_on_read: true,
default_stream_config: {
storage_class: opts.storageClass,
retention_policy: { age: durationToSeconds(opts.retentionPolicy) },
delete_on_empty: { min_age_secs: durationToSeconds(opts.deleteOnEmptyMinAge) },
retention_policy: { age: parseDuration(opts.retentionPolicy) },
delete_on_empty: { min_age_secs: parseDuration(opts.deleteOnEmptyMinAge) },
},
},
};
Expand Down Expand Up @@ -235,7 +236,7 @@ async function s2ReconfigureBasin(name: string, opts: ReconfigureBasinOptions):
const url = `https://aws.s2.dev/v1/basins/${encodeURIComponent(name)}`;
const body = {
default_stream_config: {
retention_policy: { age: durationToSeconds(opts.retentionPolicy) },
retention_policy: { age: parseDuration(opts.retentionPolicy) },
},
};

Expand All @@ -258,29 +259,3 @@ async function s2ReconfigureBasin(name: string, opts: ReconfigureBasinOptions):
throw new Error(`S2 reconfigureBasin failed: ${res.status} ${res.statusText} ${text}`);
}

/**
* Parse a short duration string (e.g. `7d`, `30d`, `365d`, `1h`, `90m`,
* `45s`, `2w`) into seconds. Tolerant of `7days` and `1week` forms too.
* Throws on garbage so a misconfigured env var fails loudly at first use.
*/
function durationToSeconds(input: string): number {
const trimmed = input.trim().toLowerCase();
const match = trimmed.match(/^(\d+)\s*(s|sec|secs|seconds?|m|min|mins|minutes?|h|hour|hours?|d|day|days?|w|week|weeks?|y|year|years?)$/);
if (!match) {
throw new Error(`Invalid duration string: ${input}`);
}
const value = parseInt(match[1]!, 10);
const unit = match[2]!;
const multiplier =
/^s/.test(unit) ? 1
: /^m(?:in|ins|inute|inutes)?$/.test(unit) ? 60
: /^h/.test(unit) ? 3600
: /^d/.test(unit) ? 86400
: /^w/.test(unit) ? 604800
: /^y/.test(unit) ? 31_536_000
: NaN;
if (!Number.isFinite(multiplier)) {
throw new Error(`Invalid duration unit: ${unit}`);
}
return value * multiplier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,36 @@
* path falls back to `defaultRetention()`.
*/
import { env } from "~/env.server";
import { getCurrentPlan } from "~/services/platform.v3.server";
import { getCurrentPlan, isBillingConfigured } from "~/services/platform.v3.server";
import { defaultRetention } from "./streamBasinProvisioner.server";

/**
* Resolve the retention duration for an org based on its current plan.
*
* - Returns the configured retention for the plan when the billing
* API has data.
* - Returns `defaultRetention()` when no billing client is configured
* (OSS / non-cloud installs that flipped per-org basins on without
* wiring billing).
* - **Throws** when billing is configured but the call failed, so
* the redis-worker retry kicks in and we don't silently downgrade
* a paid org's retention.
* - When billing is **not configured** (OSS / self-hosted installs),
* returns `defaultRetention()` — the worker job converges, the
* backfill completes, and operators get a sane default without
* having to wire up a billing API.
* - When billing **is configured** and the call succeeds, maps the
* plan code to a retention duration.
* - When billing **is configured** but the call failed (transient
* outage / 5xx), **throws** so the redis-worker retry kicks in
* and we don't silently downgrade a paid org's retention.
*/
export async function resolveRetentionForOrg(orgId: string): Promise<string> {
const plan = await getCurrentPlan(orgId);
if (!isBillingConfigured()) {
// No billing wired up — operator either runs OSS or hasn't set
// BILLING_API_URL / BILLING_API_KEY. Fall back to the default;
// the org-create path uses the same default, so this is just the
// backfill's catch-up path arriving at the same answer.
return defaultRetention();
}

const plan = await getCurrentPlan(orgId);
if (plan === undefined) {
// We can't tell from `getCurrentPlan` alone whether the billing
// client isn't configured (OSS) or whether the call failed
// (transient cloud outage). Today we conservatively throw so
// cloud installs retry. OSS installs that hit this path either:
// (a) flipped the per-org-basins flag on without wiring billing
// and should configure `BILLING_API_URL` / `BILLING_API_KEY`,
// or
// (b) shouldn't be calling this at all and should pass an
// explicit retention to the provisioner.
// Billing client exists but the call failed. Throw so redis-worker
// retries — silently defaulting to free would clip a paid org's
// retention if a backfill landed during a transient billing outage.
throw new Error(
`[streamBasinRetentionByPlan] billing plan unavailable for org ${orgId}; will retry`
);
Expand Down
1 change: 0 additions & 1 deletion apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ function initializeWorker() {
where: { id: payload.orgId },
select: {
id: true,
slug: true,
streamBasinName: true,
},
});
Expand Down
Loading