Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .server-changes/realtime-redis-connection-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
area: webapp
type: fix
---

Fix Redis connection leak in realtime streams and broken abort signal propagation.

**Redis connections**: Non-blocking methods (ingestData, appendPart, getLastChunkIndex) now share a single Redis connection instead of creating one per request. streamResponse still uses dedicated connections (required for XREAD BLOCK) but now tears them down immediately via disconnect() instead of graceful quit(), with a 15s inactivity fallback.
Comment thread
ericallam marked this conversation as resolved.

**Abort signal**: request.signal is broken in Remix/Express due to a Node.js undici GC bug (nodejs/node#55428) that severs the signal chain when Remix clones the Request internally. Added getRequestAbortSignal() wired to Express res.on("close") via httpAsyncStorage, which fires reliably on client disconnect. All SSE/streaming routes updated to use it.
11 changes: 11 additions & 0 deletions apps/webapp/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ Use the `chrome-devtools` MCP server to visually verify local dashboard changes.
Routes use Remix flat-file convention with dot-separated segments:
`api.v1.tasks.$taskId.trigger.ts` -> `/api/v1/tasks/:taskId/trigger`

## Abort Signals

**Never use `request.signal`** for detecting client disconnects. It is broken due to a Node.js bug ([nodejs/node#55428](https://github.com/nodejs/node/issues/55428)) where the AbortSignal chain is severed when Remix internally clones the Request object. Instead, use `getRequestAbortSignal()` from `app/services/httpAsyncStorage.server.ts`, which is wired directly to Express `res.on("close")` and fires reliably.

```typescript
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";

// In route handlers, SSE streams, or any server-side code:
const signal = getRequestAbortSignal();
```

## Environment Variables

Access via `env` export from `app/env.server.ts`. **Never use `process.env` directly.**
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type TaskRunAttempt } from "@trigger.dev/database";
import { eventStream } from "remix-utils/sse/server";
import { type PrismaClient, prisma } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";
import { projectPubSub } from "~/v3/services/projectPubSub.server";

Expand Down Expand Up @@ -63,7 +64,9 @@ export class TasksStreamPresenter {

const subscriber = await projectPubSub.subscribe(`project:${project.id}:*`);

return eventStream(request.signal, (send, close) => {
const signal = getRequestAbortSignal();

return eventStream(signal, (send, close) => {
const safeSend = (args: { event?: string; data: string }) => {
try {
send(args);
Expand Down Expand Up @@ -95,7 +98,7 @@ export class TasksStreamPresenter {
});

pinger = setInterval(() => {
if (request.signal.aborted) {
if (signal.aborted) {
return close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand Down Expand Up @@ -129,7 +130,7 @@ export const loader = createLoaderApiRoute(
run.realtimeStreamsVersion
);

return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, request.signal, {
return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), {
lastEventId,
timeoutInSeconds,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import {
getInputStreamWaitpoint,
deleteInputStreamWaitpoint,
Expand Down Expand Up @@ -162,7 +163,7 @@ const loader = createLoaderApiRoute(
request,
run.friendlyId,
`$trigger.input:${params.streamId}`,
request.signal,
getRequestAbortSignal(),
{
lastEventId,
timeoutInSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { $replica } from "~/db.server";
import { useEnvironment } from "~/hooks/useEnvironment";
import { useOrganization } from "~/hooks/useOrganizations";
import { useProject } from "~/hooks/useProject";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { requireUserId } from "~/services/session.server";
import { cn } from "~/utils/cn";
Expand Down Expand Up @@ -89,7 +90,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
run.realtimeStreamsVersion
);

return realtimeStream.streamResponse(request, run.friendlyId, streamKey, request.signal, {
return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), {
lastEventId,
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { z } from "zod";
import { env } from "~/env.server";
import { findProjectBySlug } from "~/models/project.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { requireUserId } from "~/services/session.server";
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
import { inflate } from "node:zlib";
Expand Down Expand Up @@ -92,7 +93,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
const result = streamText({
model: openai(env.AI_RUN_FILTER_MODEL ?? "gpt-5-mini"),
temperature: 1,
abortSignal: request.signal,
abortSignal: getRequestAbortSignal(),
system: systemPrompt,
prompt,
tools: {
Expand Down
13 changes: 13 additions & 0 deletions apps/webapp/app/services/httpAsyncStorage.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type HttpLocalStorage = {
path: string;
host: string;
method: string;
abortController: AbortController;
};

const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
Expand All @@ -18,3 +19,15 @@ export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T
export function getHttpContext(): HttpLocalStorage | undefined {
return httpLocalStorage.getStore();
}

// Fallback signal that is never aborted, safe for tests and non-Express contexts.
const neverAbortedSignal = new AbortController().signal;

/**
* Returns an AbortSignal wired to the Express response's "close" event.
* This bypasses the broken request.signal chain in @remix-run/express
* (caused by Node.js undici GC bug nodejs/node#55428).
*/
export function getRequestAbortSignal(): AbortSignal {
return httpLocalStorage.getStore()?.abortController.signal ?? neverAbortedSignal;
}
48 changes: 23 additions & 25 deletions apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export type RealtimeStreamsOptions = {
redis: RedisOptions | undefined;
logger?: Logger;
logLevel?: LogLevel;
inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 60000)
inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 15000)
};

// Legacy constant for backward compatibility (no longer written, but still recognized when reading)
Expand All @@ -23,10 +23,23 @@ type StreamChunk =
export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
private logger: Logger;
private inactivityTimeoutMs: number;
// Shared connection for short-lived non-blocking operations (XADD, XREVRANGE, EXPIRE).
// Lazily created on first use so we don't open a connection if only streamResponse is called.
private _sharedRedis: Redis | undefined;

constructor(private options: RealtimeStreamsOptions) {
this.logger = options.logger ?? new Logger("RedisRealtimeStreams", options.logLevel ?? "info");
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 15000; // Default: 15 seconds
}

private get sharedRedis(): Redis {
if (!this._sharedRedis) {
this._sharedRedis = new Redis({
...this.options.redis,
connectionName: "realtime:shared",
});
}
return this._sharedRedis;
Comment thread
ericallam marked this conversation as resolved.
}

async initializeStream(
Expand All @@ -43,7 +56,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
signal: AbortSignal,
options?: StreamResponseOptions
): Promise<Response> {
const redis = new Redis(this.options.redis ?? {});
const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" });
const streamKey = `stream:${runId}:${streamId}`;
let isCleanedUp = false;

Expand Down Expand Up @@ -269,7 +282,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
async function cleanup() {
if (isCleanedUp) return;
isCleanedUp = true;
await redis.quit().catch(console.error);
// disconnect() tears down the TCP socket immediately, which causes any
// pending XREAD BLOCK to reject right away instead of waiting for the
// block timeout to elapse. quit() would queue behind the blocking command.
redis.disconnect();
}

signal.addEventListener("abort", cleanup, { once: true });
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Expand All @@ -290,22 +306,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
clientId: string,
resumeFromChunk?: number
): Promise<Response> {
const redis = new Redis(this.options.redis ?? {});
const redis = this.sharedRedis;
const streamKey = `stream:${runId}:${streamId}`;
const startChunk = resumeFromChunk ?? 0;
// Start counting from the resume point, not from 0
let currentChunkIndex = startChunk;

const self = this;

async function cleanup() {
try {
await redis.quit();
} catch (error) {
self.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error });
}
}

try {
const textStream = stream.pipeThrough(new TextDecoderStream());
const reader = textStream.getReader();
Expand Down Expand Up @@ -361,13 +367,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
this.logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error });

return new Response(null, { status: 500 });
} finally {
await cleanup();
}
}

async appendPart(part: string, partId: string, runId: string, streamId: string): Promise<void> {
const redis = new Redis(this.options.redis ?? {});
const redis = this.sharedRedis;
const streamKey = `stream:${runId}:${streamId}`;

await redis.xadd(
Expand All @@ -386,12 +390,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {

// Set TTL for cleanup when stream is done
await redis.expire(streamKey, env.REALTIME_STREAM_TTL);

await redis.quit();
}

async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> {
const redis = new Redis(this.options.redis ?? {});
const redis = this.sharedRedis;
const streamKey = `stream:${runId}:${streamId}`;

try {
Expand Down Expand Up @@ -460,10 +462,6 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
});
// Return -1 to indicate we don't know what the server has
return -1;
} finally {
await redis.quit().catch((err) => {
this.logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:", { err });
});
}
}

Expand Down
9 changes: 6 additions & 3 deletions apps/webapp/app/utils/sse.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { eventStream } from "remix-utils/sse/server";
import { env } from "~/env.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";

type SseProps = {
Expand All @@ -22,6 +23,8 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
return new Response("SSE disabled", { status: 200 });
}

const signal = getRequestAbortSignal();

let pinger: NodeJS.Timeout | undefined = undefined;
let updater: NodeJS.Timeout | undefined = undefined;
let timeout: NodeJS.Timeout | undefined = undefined;
Expand All @@ -32,7 +35,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
clearTimeout(timeout);
};

return eventStream(request.signal, (send, close) => {
return eventStream(signal, (send, close) => {
const safeSend = (args: { event?: string; data: string }) => {
try {
send(args);
Expand Down Expand Up @@ -60,15 +63,15 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
};

pinger = setInterval(() => {
if (request.signal.aborted) {
if (signal.aborted) {
return abort();
}

safeSend({ event: "ping", data: new Date().toISOString() });
}, pingInterval);

updater = setInterval(() => {
if (request.signal.aborted) {
if (signal.aborted) {
return abort();
}

Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/utils/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type LoaderFunctionArgs } from "@remix-run/node";
import { type Params } from "@remix-run/router";
import { eventStream } from "remix-utils/sse/server";
import { setInterval } from "timers/promises";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";

export type SendFunction = Parameters<Parameters<typeof eventStream>[1]>[0];

Expand Down Expand Up @@ -89,15 +90,17 @@ export function createSSELoader(options: SSEOptions) {
throw new Response("Internal Server Error", { status: 500 });
});

const requestAbortSignal = getRequestAbortSignal();

const combinedSignal = AbortSignal.any([
request.signal,
requestAbortSignal,
timeoutSignal,
internalController.signal,
]);

log("Start");

request.signal.addEventListener(
requestAbortSignal.addEventListener(
"abort",
() => {
log(`request signal aborted`);
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/remix.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module.exports = {
"redlock",
"parse-duration",
"uncrypto",
"std-env",
],
browserNodeBuiltinsPolyfill: {
modules: {
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ if (ENABLE_CLUSTER && cluster.isPrimary) {
app.use((req, res, next) => {
// Generate a unique request ID for each request
const requestId = nanoid();
const abortController = new AbortController();
res.on("close", () => abortController.abort());

runWithHttpContext(
{ requestId, path: req.url, host: req.hostname, method: req.method },
{ requestId, path: req.url, host: req.hostname, method: req.method, abortController },
next
);
});
Expand Down
Loading