Keep Codex chat updates attached to real sessions#705
Keep Codex chat updates attached to real sessions#705anyongjin wants to merge 1 commit intositeboon:mainfrom
Conversation
Codex may emit the usable thread id only on thread.started, and its SDK can deliver assistant text as full item updates instead of token deltas. The UI previously announced a synthetic session id and then rendered stale realtime entries next to persisted messages, so users could miss updates until refresh or see duplicated content.\n\nThis waits for the real Codex thread id before announcing a new session, converts full assistant message updates into synthetic stream_delta chunks, refreshes persisted messages after completion, and dedupes short-lived realtime text echoes against server messages.\n\nConstraint: Codex SDK event shape currently exposes the real id as thread_id on thread.started.\nRejected: Announce codex-pending ids immediately | route navigation can attach the UI to an id that never has persisted messages.\nConfidence: high\nScope-risk: moderate\nTested: npm run typecheck; npm run lint; npm run build\nNot-tested: Live Codex SDK integration in this checkout; verified equivalent patch manually on deployed CloudCLI UI.
📝 WalkthroughWalkthroughThe PR enhances session lifecycle management in streaming scenarios by implementing proper session-key tracking with placeholder IDs, remapping to real thread IDs when available, deduplicating text-echo messages within 10-second windows, and scheduling delayed server refresh on completion with project metadata. Changes
Sequence DiagramsequenceDiagram
participant UI as Client/UI
participant Codex as OpenAI Codex Server
participant Stream as Real-time Stream
participant Store as Session Store
UI->>Codex: initiate query with pending session id
Codex->>Codex: create activeSessionKey (codex-pending-*)
Codex->>UI: emit session_created (deferred)
Codex->>Stream: connect streaming
Stream->>Codex: thread.started event with real thread_id
Codex->>Codex: moveActiveSession (remap pending→real id)
Codex->>UI: emit session_created (with real threadId)
Stream->>Codex: agent_message updates
Codex->>Codex: synthesize stream_delta via text diffing
Codex->>UI: emit stream_delta events
Stream->>Codex: completion event
Codex->>UI: emit complete message (with actualSessionId)
UI->>Store: refreshFromServer(sid, provider, project)
Store->>Store: deduplicate text-echo messages (10s window)
Store->>UI: update session state
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/components/chat/hooks/useChatRealtimeHandlers.ts (1)
266-274: Track the timeout handle so duplicatecompleteevents don't pile on redundant fetches.The 250ms
setTimeouthere isn't tracked or cleared. If acompleteevent arrives more than once for the samesid(e.g., reconnects, retries, or the existing legacycompletepath at lines 285-296 that also scheduleswindow.refreshProjects500ms later), each fires its ownrefreshFromServer, generating redundant/api/sessions/.../messagescalls within the same window. Storing the handle in a ref (keyed bysid) and clearing it on each newcompletewould coalesce them.Not a correctness bug —
refreshFromServeris idempotent — but worth tightening for hot reconnect paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/components/chat/hooks/useChatRealtimeHandlers.ts` around lines 266 - 274, Wrap the 250ms setTimeout in a tracked timeout map so duplicate "complete" events for the same sid don't schedule redundant refreshes: create a ref (e.g., timeoutsRef: React.MutableRefObject<Record<string, number | null>>) inside useChatRealtimeHandlers, before calling setTimeout clear any existing timeout for the sid (clearTimeout on timeoutsRef.current[sid]) then store the new handle in timeoutsRef.current[sid], and ensure you clear the timeout (and delete the entry) when the refresh runs or when the component/unsubscribe logic runs (also clear the handle where the legacy complete path schedules window.refreshProjects) so multiple complete events coalesce into a single sessionStore.refreshFromServer call.server/openai-codex.js (1)
243-260: Synthetic streaming delays stall the event loop and exaggerate latency for short messages.
sendSyntheticStreamisawait-ed inside thefor await (const event of streamedTurn.events)loop, so subsequent SDK events (includingturn.completed,item.completed, abort checks at line 308-311) cannot be processed until the artificial chunking finishes. WithchunkSize=1, delayMs=45for ≤160-char texts, a 120-character agent reply costs ~5.4s of pure setTimeout sleeping; a back-to-back stream of smallitem.updateddeltas (e.g., 5 chars each at 45ms/char) compounds the lag and makes abort responsiveness worse.The client already buffers
stream_deltaevents on a 100ms timer (useChatRealtimeHandlers.tslines 189-196), so the artificial pacing mostly trades latency for a smoother typewriter effect. Consider:
- Capping the total artificial duration (e.g., max 500-800 ms regardless of text length), or
- Skipping the per-chunk delay for
item.updateddeltas (they already arrive progressively) and only chunk-stream theitem.completedfallback at line 353, or- Running
sendSyntheticStreamwithoutawait(fire-and-forget) so it does not block subsequent events — but then ordering withstream_endneeds care.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/openai-codex.js` around lines 243 - 260, sendSyntheticStream currently sleeps per-chunk and is awaited inside the for-await loop, blocking subsequent SDK events; change it to cap total artificial streaming time and avoid blocking for progressive deltas: inside sendSyntheticStream (the function defined at top of the diff) compute the total planned delay and, if it exceeds a configurable max (e.g., 500–800ms), scale down delayMs or increase chunkSize so total duration ≤ max; also alter call sites in the for-await (where sendSyntheticStream is awaited for streamedTurn.events / item.updated / item.completed) so you do not await sendSyntheticStream for incremental/item.updated events (fire-and-forget) but continue to await it for the final/item.completed fallback to preserve ordering with stream_end/turn.completed; ensure you reference sendSyntheticStream and the for-await over streamedTurn.events when making these changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/stores/useSessionStore.ts`:
- Around line 105-146: The dedupe logic in
computeMerged/isDuplicatePersistedText is causing false positives and is O(R*S);
update it by (1) tightening the duplicate window constant
(REALTIME_DUPLICATE_WINDOW_MS) to a realistic latency (e.g., 1000–2000 ms) and
change isDuplicatePersistedText (which uses getTextDuplicateKey and
getTimestampMs) to also require realtimeTime >= persistedTime before considering
it an echo, and (2) make computeMerged linear by pre-bucketing server messages
into a Map keyed by getTextDuplicateKey => minimum persisted timestamp (and
still keep serverIds as a Set) so each realtime message only does O(1) lookups
against that map instead of server.some(...) per message.
---
Nitpick comments:
In `@server/openai-codex.js`:
- Around line 243-260: sendSyntheticStream currently sleeps per-chunk and is
awaited inside the for-await loop, blocking subsequent SDK events; change it to
cap total artificial streaming time and avoid blocking for progressive deltas:
inside sendSyntheticStream (the function defined at top of the diff) compute the
total planned delay and, if it exceeds a configurable max (e.g., 500–800ms),
scale down delayMs or increase chunkSize so total duration ≤ max; also alter
call sites in the for-await (where sendSyntheticStream is awaited for
streamedTurn.events / item.updated / item.completed) so you do not await
sendSyntheticStream for incremental/item.updated events (fire-and-forget) but
continue to await it for the final/item.completed fallback to preserve ordering
with stream_end/turn.completed; ensure you reference sendSyntheticStream and the
for-await over streamedTurn.events when making these changes.
In `@src/components/chat/hooks/useChatRealtimeHandlers.ts`:
- Around line 266-274: Wrap the 250ms setTimeout in a tracked timeout map so
duplicate "complete" events for the same sid don't schedule redundant refreshes:
create a ref (e.g., timeoutsRef: React.MutableRefObject<Record<string, number |
null>>) inside useChatRealtimeHandlers, before calling setTimeout clear any
existing timeout for the sid (clearTimeout on timeoutsRef.current[sid]) then
store the new handle in timeoutsRef.current[sid], and ensure you clear the
timeout (and delete the entry) when the refresh runs or when the
component/unsubscribe logic runs (also clear the handle where the legacy
complete path schedules window.refreshProjects) so multiple complete events
coalesce into a single sessionStore.refreshFromServer call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 18332012-4c7b-4ddd-84c4-ccab638f779b
📒 Files selected for processing (3)
server/openai-codex.jssrc/components/chat/hooks/useChatRealtimeHandlers.tssrc/stores/useSessionStore.ts
| const REALTIME_DUPLICATE_WINDOW_MS = 10_000; | ||
|
|
||
| function getTextDuplicateKey(message: NormalizedMessage): string | null { | ||
| if (message.kind !== 'text') return null; | ||
| if (!message.content) return null; | ||
| return `${message.provider}:${message.role ?? ''}:${message.content}`; | ||
| } | ||
|
|
||
| function getTimestampMs(message: NormalizedMessage): number | null { | ||
| const value = new Date(message.timestamp).getTime(); | ||
| return Number.isFinite(value) ? value : null; | ||
| } | ||
|
|
||
| function isDuplicatePersistedText(realtimeMessage: NormalizedMessage, persistedMessage: NormalizedMessage): boolean { | ||
| const realtimeKey = getTextDuplicateKey(realtimeMessage); | ||
| if (!realtimeKey || realtimeKey !== getTextDuplicateKey(persistedMessage)) { | ||
| return false; | ||
| } | ||
|
|
||
| const realtimeTime = getTimestampMs(realtimeMessage); | ||
| const persistedTime = getTimestampMs(persistedMessage); | ||
| if (realtimeTime === null || persistedTime === null) { | ||
| return false; | ||
| } | ||
|
|
||
| return Math.abs(realtimeTime - persistedTime) <= REALTIME_DUPLICATE_WINDOW_MS; | ||
| } | ||
|
|
||
| /** | ||
| * Compute merged messages: server + realtime, deduped by id. | ||
| * Compute merged messages: server + realtime, deduped by id and short-lived | ||
| * text echoes. | ||
| * Server messages take priority (they're the persisted source of truth). | ||
| * Realtime messages that aren't yet in server stay (in-flight streaming). | ||
| */ | ||
| function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] { | ||
| if (realtime.length === 0) return server; | ||
| if (server.length === 0) return realtime; | ||
| const serverIds = new Set(server.map(m => m.id)); | ||
| const extra = realtime.filter(m => !serverIds.has(m.id)); | ||
| const extra = realtime.filter(m => ( | ||
| !serverIds.has(m.id) && | ||
| !server.some(serverMessage => isDuplicatePersistedText(m, serverMessage)) | ||
| )); |
There was a problem hiding this comment.
Text-echo dedup can briefly hide a legitimate repeat assistant message within the 10s window.
Two issues with this dedup, in priority order:
-
False positives for legitimate same-content messages. The dedup matches purely on
{provider}:{role}:{content}plus a 10s timestamp window. If the assistant emits the same short content twice within 10 seconds (e.g., two consecutive "Done." or "OK" replies), the second realtime echo is filtered against the first persisted server message and disappears from the UI untilrefreshFromServerruns (250ms aftercomplete, peruseChatRealtimeHandlers.tslines 266-274). The window is brief, but the user can see content "vanish" mid-render. Consider tightening the window (e.g., 1-2s, sized to realistic realtime→persistence lag) and/or also requiring the realtime message's timestamp be>=the persisted one (an echo never predates its own persistence). -
O(R * S)merge cost.realtime.filter(m => server.some(...))runsserver.someper realtime message. WithMAX_REALTIME_MESSAGES = 500and server lists that grow over long sessions, this is invoked on everyappendRealtime. Pre-bucketing server messages once intoMap<duplicateKey, timestampMs[]>(or aSetof keys plus a min timestamp index) keeps this nearO(R + S).
♻️ Sketch of bucketed dedup
function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] {
if (realtime.length === 0) return server;
if (server.length === 0) return realtime;
const serverIds = new Set(server.map(m => m.id));
+ const serverTextIndex = new Map<string, number[]>();
+ for (const s of server) {
+ const key = getTextDuplicateKey(s);
+ const ts = key ? getTimestampMs(s) : null;
+ if (key && ts !== null) {
+ const arr = serverTextIndex.get(key);
+ if (arr) arr.push(ts);
+ else serverTextIndex.set(key, [ts]);
+ }
+ }
const extra = realtime.filter(m => {
if (serverIds.has(m.id)) return false;
- return !server.some(serverMessage => isDuplicatePersistedText(m, serverMessage));
+ const key = getTextDuplicateKey(m);
+ const rt = key ? getTimestampMs(m) : null;
+ if (!key || rt === null) return true;
+ const candidates = serverTextIndex.get(key);
+ if (!candidates) return true;
+ return !candidates.some(ts => Math.abs(rt - ts) <= REALTIME_DUPLICATE_WINDOW_MS);
});
if (extra.length === 0) return server;
return [...server, ...extra];
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const REALTIME_DUPLICATE_WINDOW_MS = 10_000; | |
| function getTextDuplicateKey(message: NormalizedMessage): string | null { | |
| if (message.kind !== 'text') return null; | |
| if (!message.content) return null; | |
| return `${message.provider}:${message.role ?? ''}:${message.content}`; | |
| } | |
| function getTimestampMs(message: NormalizedMessage): number | null { | |
| const value = new Date(message.timestamp).getTime(); | |
| return Number.isFinite(value) ? value : null; | |
| } | |
| function isDuplicatePersistedText(realtimeMessage: NormalizedMessage, persistedMessage: NormalizedMessage): boolean { | |
| const realtimeKey = getTextDuplicateKey(realtimeMessage); | |
| if (!realtimeKey || realtimeKey !== getTextDuplicateKey(persistedMessage)) { | |
| return false; | |
| } | |
| const realtimeTime = getTimestampMs(realtimeMessage); | |
| const persistedTime = getTimestampMs(persistedMessage); | |
| if (realtimeTime === null || persistedTime === null) { | |
| return false; | |
| } | |
| return Math.abs(realtimeTime - persistedTime) <= REALTIME_DUPLICATE_WINDOW_MS; | |
| } | |
| /** | |
| * Compute merged messages: server + realtime, deduped by id. | |
| * Compute merged messages: server + realtime, deduped by id and short-lived | |
| * text echoes. | |
| * Server messages take priority (they're the persisted source of truth). | |
| * Realtime messages that aren't yet in server stay (in-flight streaming). | |
| */ | |
| function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] { | |
| if (realtime.length === 0) return server; | |
| if (server.length === 0) return realtime; | |
| const serverIds = new Set(server.map(m => m.id)); | |
| const extra = realtime.filter(m => !serverIds.has(m.id)); | |
| const extra = realtime.filter(m => ( | |
| !serverIds.has(m.id) && | |
| !server.some(serverMessage => isDuplicatePersistedText(m, serverMessage)) | |
| )); | |
| const REALTIME_DUPLICATE_WINDOW_MS = 10_000; | |
| function getTextDuplicateKey(message: NormalizedMessage): string | null { | |
| if (message.kind !== 'text') return null; | |
| if (!message.content) return null; | |
| return `${message.provider}:${message.role ?? ''}:${message.content}`; | |
| } | |
| function getTimestampMs(message: NormalizedMessage): number | null { | |
| const value = new Date(message.timestamp).getTime(); | |
| return Number.isFinite(value) ? value : null; | |
| } | |
| function isDuplicatePersistedText(realtimeMessage: NormalizedMessage, persistedMessage: NormalizedMessage): boolean { | |
| const realtimeKey = getTextDuplicateKey(realtimeMessage); | |
| if (!realtimeKey || realtimeKey !== getTextDuplicateKey(persistedMessage)) { | |
| return false; | |
| } | |
| const realtimeTime = getTimestampMs(realtimeMessage); | |
| const persistedTime = getTimestampMs(persistedMessage); | |
| if (realtimeTime === null || persistedTime === null) { | |
| return false; | |
| } | |
| return Math.abs(realtimeTime - persistedTime) <= REALTIME_DUPLICATE_WINDOW_MS; | |
| } | |
| /** | |
| * Compute merged messages: server + realtime, deduped by id and short-lived | |
| * text echoes. | |
| * Server messages take priority (they're the persisted source of truth). | |
| * Realtime messages that aren't yet in server stay (in-flight streaming). | |
| */ | |
| function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] { | |
| if (realtime.length === 0) return server; | |
| if (server.length === 0) return realtime; | |
| const serverIds = new Set(server.map(m => m.id)); | |
| const serverTextIndex = new Map<string, number[]>(); | |
| for (const s of server) { | |
| const key = getTextDuplicateKey(s); | |
| const ts = key ? getTimestampMs(s) : null; | |
| if (key && ts !== null) { | |
| const arr = serverTextIndex.get(key); | |
| if (arr) arr.push(ts); | |
| else serverTextIndex.set(key, [ts]); | |
| } | |
| } | |
| const extra = realtime.filter(m => { | |
| if (serverIds.has(m.id)) return false; | |
| const key = getTextDuplicateKey(m); | |
| const rt = key ? getTimestampMs(m) : null; | |
| if (!key || rt === null) return true; | |
| const candidates = serverTextIndex.get(key); | |
| if (!candidates) return true; | |
| return !candidates.some(ts => Math.abs(rt - ts) <= REALTIME_DUPLICATE_WINDOW_MS); | |
| }); | |
| if (extra.length === 0) return server; | |
| return [...server, ...extra]; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/stores/useSessionStore.ts` around lines 105 - 146, The dedupe logic in
computeMerged/isDuplicatePersistedText is causing false positives and is O(R*S);
update it by (1) tightening the duplicate window constant
(REALTIME_DUPLICATE_WINDOW_MS) to a realistic latency (e.g., 1000–2000 ms) and
change isDuplicatePersistedText (which uses getTextDuplicateKey and
getTimestampMs) to also require realtimeTime >= persistedTime before considering
it an echo, and (2) make computeMerged linear by pre-bucketing server messages
into a Map keyed by getTextDuplicateKey => minimum persisted timestamp (and
still keep serverIds as a Set) so each realtime message only does O(1) lookups
against that map instead of server.some(...) per message.
Codex may emit the usable thread id only on thread.started, and its SDK can deliver assistant text as full item updates instead of token deltas. The UI previously announced a synthetic session id and then rendered stale realtime entries next to persisted messages, so users could miss updates until refresh or see duplicated content.
This waits for the real Codex thread id before announcing a new session, converts full assistant message updates into synthetic stream_delta chunks, refreshes persisted messages after completion, and dedupes short-lived realtime text echoes against server messages.
Constraint: Codex SDK event shape currently exposes the real id as thread_id on thread.started.
Rejected: Announce codex-pending ids immediately | route navigation can attach the UI to an id that never has persisted messages.
Confidence: high
Scope-risk: moderate
Tested: npm run typecheck; npm run lint; npm run build
Not-tested: Live Codex SDK integration in this checkout; verified equivalent patch manually on deployed CloudCLI UI.
Summary by CodeRabbit