999 lines
33 KiB
TypeScript
999 lines
33 KiB
TypeScript
import { randomUUID } from 'crypto'
|
||
import type {
|
||
SDKPartialAssistantMessage,
|
||
StdoutMessage,
|
||
} from 'src/entrypoints/sdk/controlTypes.js'
|
||
import { decodeJwtExpiry } from '../../bridge/jwtUtils.js'
|
||
import { logForDebugging } from '../../utils/debug.js'
|
||
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
|
||
import { errorMessage, getErrnoCode } from '../../utils/errors.js'
|
||
import { createAxiosInstance } from '../../utils/proxy.js'
|
||
import {
|
||
registerSessionActivityCallback,
|
||
unregisterSessionActivityCallback,
|
||
} from '../../utils/sessionActivity.js'
|
||
import {
|
||
getSessionIngressAuthHeaders,
|
||
getSessionIngressAuthToken,
|
||
} from '../../utils/sessionIngressAuth.js'
|
||
import type {
|
||
RequiresActionDetails,
|
||
SessionState,
|
||
} from '../../utils/sessionState.js'
|
||
import { sleep } from '../../utils/sleep.js'
|
||
import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
|
||
import {
|
||
RetryableError,
|
||
SerialBatchEventUploader,
|
||
} from './SerialBatchEventUploader.js'
|
||
import type { SSETransport, StreamClientEvent } from './SSETransport.js'
|
||
import { WorkerStateUploader } from './WorkerStateUploader.js'
|
||
|
||
/** Default interval between heartbeat events (20s; server TTL is 60s). */
|
||
const DEFAULT_HEARTBEAT_INTERVAL_MS = 20_000
|
||
|
||
/**
|
||
* stream_event messages accumulate in a delay buffer for up to this many ms
|
||
* before enqueue. Mirrors HybridTransport's batching window. text_delta
|
||
* events for the same content block accumulate into a single full-so-far
|
||
* snapshot per flush — each emitted event is self-contained so a client
|
||
* connecting mid-stream sees complete text, not a fragment.
|
||
*/
|
||
const STREAM_EVENT_FLUSH_INTERVAL_MS = 100
|
||
|
||
/** Hoisted axios validateStatus callback to avoid per-request closure allocation. */
|
||
function alwaysValidStatus(): boolean {
|
||
return true
|
||
}
|
||
|
||
export type CCRInitFailReason =
|
||
| 'no_auth_headers'
|
||
| 'missing_epoch'
|
||
| 'worker_register_failed'
|
||
|
||
/** Thrown by initialize(); carries a typed reason for the diag classifier. */
|
||
export class CCRInitError extends Error {
|
||
constructor(readonly reason: CCRInitFailReason) {
|
||
super(`CCRClient init failed: ${reason}`)
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Consecutive 401/403 with a VALID-LOOKING token before giving up. An
|
||
* expired JWT short-circuits this (exits immediately — deterministic,
|
||
* retry is futile). This threshold is for the uncertain case: token's
|
||
* exp is in the future but server says 401 (userauth down, KMS hiccup,
|
||
* clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
|
||
*/
|
||
const MAX_CONSECUTIVE_AUTH_FAILURES = 10
|
||
|
||
type EventPayload = {
|
||
uuid: string
|
||
type: string
|
||
[key: string]: unknown
|
||
}
|
||
|
||
type ClientEvent = {
|
||
payload: EventPayload
|
||
ephemeral?: boolean
|
||
}
|
||
|
||
/**
|
||
* Structural subset of a stream_event carrying a text_delta. Not a narrowing
|
||
* of SDKPartialAssistantMessage — RawMessageStreamEvent's delta is a union and
|
||
* narrowing through two levels defeats the discriminant.
|
||
*/
|
||
type CoalescedStreamEvent = {
|
||
type: 'stream_event'
|
||
uuid: string
|
||
session_id: string
|
||
parent_tool_use_id: string | null
|
||
event: {
|
||
type: 'content_block_delta'
|
||
index: number
|
||
delta: { type: 'text_delta'; text: string }
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Accumulator state for text_delta coalescing. Keyed by API message ID so
|
||
* lifetime is tied to the assistant message — cleared when the complete
|
||
* SDKAssistantMessage arrives (writeEvent), which is reliable even when
|
||
* abort/error paths skip content_block_stop/message_stop delivery.
|
||
*/
|
||
export type StreamAccumulatorState = {
|
||
/** API message ID (msg_...) → blocks[blockIndex] → chunk array. */
|
||
byMessage: Map<string, string[][]>
|
||
/**
|
||
* {session_id}:{parent_tool_use_id} → active message ID.
|
||
* content_block_delta events don't carry the message ID (only
|
||
* message_start does), so we track which message is currently streaming
|
||
* for each scope. At most one message streams per scope at a time.
|
||
*/
|
||
scopeToMessage: Map<string, string>
|
||
}
|
||
|
||
export function createStreamAccumulator(): StreamAccumulatorState {
|
||
return { byMessage: new Map(), scopeToMessage: new Map() }
|
||
}
|
||
|
||
function scopeKey(m: {
|
||
session_id: string
|
||
parent_tool_use_id: string | null
|
||
}): string {
|
||
return `${m.session_id}:${m.parent_tool_use_id ?? ''}`
|
||
}
|
||
|
||
/**
|
||
* Accumulate text_delta stream_events into full-so-far snapshots per content
|
||
* block. Each flush emits ONE event per touched block containing the FULL
|
||
* accumulated text from the start of the block — a client connecting
|
||
* mid-stream receives a self-contained snapshot, not a fragment.
|
||
*
|
||
* Non-text-delta events pass through unchanged. message_start records the
|
||
* active message ID for the scope; content_block_delta appends chunks;
|
||
* the snapshot event reuses the first text_delta UUID seen for that block in
|
||
* this flush so server-side idempotency remains stable across retries.
|
||
*
|
||
* Cleanup happens in writeEvent when the complete assistant message arrives
|
||
* (reliable), not here on stop events (abort/error paths skip those).
|
||
*/
|
||
export function accumulateStreamEvents(
|
||
buffer: SDKPartialAssistantMessage[],
|
||
state: StreamAccumulatorState,
|
||
): EventPayload[] {
|
||
const out: EventPayload[] = []
|
||
// chunks[] → snapshot already in `out` this flush. Keyed by the chunks
|
||
// array reference (stable per {messageId, index}) so subsequent deltas
|
||
// rewrite the same entry instead of emitting one event per delta.
|
||
const touched = new Map<string[], CoalescedStreamEvent>()
|
||
for (const msg of buffer) {
|
||
switch (msg.event.type) {
|
||
case 'message_start': {
|
||
const id = msg.event.message.id
|
||
const prevId = state.scopeToMessage.get(scopeKey(msg))
|
||
if (prevId) state.byMessage.delete(prevId)
|
||
state.scopeToMessage.set(scopeKey(msg), id)
|
||
state.byMessage.set(id, [])
|
||
out.push(msg)
|
||
break
|
||
}
|
||
case 'content_block_delta': {
|
||
if (msg.event.delta.type !== 'text_delta') {
|
||
out.push(msg)
|
||
break
|
||
}
|
||
const messageId = state.scopeToMessage.get(scopeKey(msg))
|
||
const blocks = messageId ? state.byMessage.get(messageId) : undefined
|
||
if (!blocks) {
|
||
// Delta without a preceding message_start (reconnect mid-stream,
|
||
// or message_start was in a prior buffer that got dropped). Pass
|
||
// through raw — can't produce a full-so-far snapshot without the
|
||
// prior chunks anyway.
|
||
out.push(msg)
|
||
break
|
||
}
|
||
const chunks = (blocks[msg.event.index] ??= [])
|
||
chunks.push(msg.event.delta.text)
|
||
const existing = touched.get(chunks)
|
||
if (existing) {
|
||
existing.event.delta.text = chunks.join('')
|
||
break
|
||
}
|
||
const snapshot: CoalescedStreamEvent = {
|
||
type: 'stream_event',
|
||
uuid: msg.uuid,
|
||
session_id: msg.session_id,
|
||
parent_tool_use_id: msg.parent_tool_use_id,
|
||
event: {
|
||
type: 'content_block_delta',
|
||
index: msg.event.index,
|
||
delta: { type: 'text_delta', text: chunks.join('') },
|
||
},
|
||
}
|
||
touched.set(chunks, snapshot)
|
||
out.push(snapshot)
|
||
break
|
||
}
|
||
default:
|
||
out.push(msg)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
/**
|
||
* Clear accumulator entries for a completed assistant message. Called from
|
||
* writeEvent when the SDKAssistantMessage arrives — the reliable end-of-stream
|
||
* signal that fires even when abort/interrupt/error skip SSE stop events.
|
||
*/
|
||
export function clearStreamAccumulatorForMessage(
|
||
state: StreamAccumulatorState,
|
||
assistant: {
|
||
session_id: string
|
||
parent_tool_use_id: string | null
|
||
message: { id: string }
|
||
},
|
||
): void {
|
||
state.byMessage.delete(assistant.message.id)
|
||
const scope = scopeKey(assistant)
|
||
if (state.scopeToMessage.get(scope) === assistant.message.id) {
|
||
state.scopeToMessage.delete(scope)
|
||
}
|
||
}
|
||
|
||
type RequestResult = { ok: true } | { ok: false; retryAfterMs?: number }
|
||
|
||
type WorkerEvent = {
|
||
payload: EventPayload
|
||
is_compaction?: boolean
|
||
agent_id?: string
|
||
}
|
||
|
||
export type InternalEvent = {
|
||
event_id: string
|
||
event_type: string
|
||
payload: Record<string, unknown>
|
||
event_metadata?: Record<string, unknown> | null
|
||
is_compaction: boolean
|
||
created_at: string
|
||
agent_id?: string
|
||
}
|
||
|
||
type ListInternalEventsResponse = {
|
||
data: InternalEvent[]
|
||
next_cursor?: string
|
||
}
|
||
|
||
type WorkerStateResponse = {
|
||
worker?: {
|
||
external_metadata?: Record<string, unknown>
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Manages the worker lifecycle protocol with CCR v2:
|
||
* - Epoch management: reads worker_epoch from CLAUDE_CODE_WORKER_EPOCH env var
|
||
* - Runtime state reporting: PUT /sessions/{id}/worker
|
||
* - Heartbeat: POST /sessions/{id}/worker/heartbeat for liveness detection
|
||
*
|
||
* All writes go through this.request().
|
||
*/
|
||
export class CCRClient {
|
||
private workerEpoch = 0
|
||
private readonly heartbeatIntervalMs: number
|
||
private readonly heartbeatJitterFraction: number
|
||
private heartbeatTimer: NodeJS.Timeout | null = null
|
||
private heartbeatInFlight = false
|
||
private closed = false
|
||
private consecutiveAuthFailures = 0
|
||
private currentState: SessionState | null = null
|
||
private readonly sessionBaseUrl: string
|
||
private readonly sessionId: string
|
||
private readonly http = createAxiosInstance({ keepAlive: true })
|
||
|
||
// stream_event delay buffer — accumulates content deltas for up to
|
||
// STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
|
||
// and enables text_delta coalescing). Mirrors HybridTransport's pattern.
|
||
private streamEventBuffer: SDKPartialAssistantMessage[] = []
|
||
private streamEventTimer: ReturnType<typeof setTimeout> | null = null
|
||
// Full-so-far text accumulator. Persists across flushes so each emitted
|
||
// text_delta event carries the complete text from the start of the block —
|
||
// mid-stream reconnects see a self-contained snapshot. Keyed by API message
|
||
// ID; cleared in writeEvent when the complete assistant message arrives.
|
||
private streamTextAccumulator = createStreamAccumulator()
|
||
|
||
private readonly workerState: WorkerStateUploader
|
||
private readonly eventUploader: SerialBatchEventUploader<ClientEvent>
|
||
private readonly internalEventUploader: SerialBatchEventUploader<WorkerEvent>
|
||
private readonly deliveryUploader: SerialBatchEventUploader<{
|
||
eventId: string
|
||
status: 'received' | 'processing' | 'processed'
|
||
}>
|
||
|
||
/**
|
||
* Called when the server returns 409 (a newer worker epoch superseded ours).
|
||
* Default: process.exit(1) — correct for spawn-mode children where the
|
||
* parent bridge re-spawns. In-process callers (replBridge) MUST override
|
||
* this to close gracefully instead; exit would kill the user's REPL.
|
||
*/
|
||
private readonly onEpochMismatch: () => never
|
||
|
||
/**
|
||
* Auth header source. Defaults to the process-wide session-ingress token
|
||
* (CLAUDE_CODE_SESSION_ACCESS_TOKEN env var). Callers managing multiple
|
||
* concurrent sessions with distinct JWTs MUST inject this — the env-var
|
||
* path is a process global and would stomp across sessions.
|
||
*/
|
||
private readonly getAuthHeaders: () => Record<string, string>
|
||
|
||
constructor(
|
||
transport: SSETransport,
|
||
sessionUrl: URL,
|
||
opts?: {
|
||
onEpochMismatch?: () => never
|
||
heartbeatIntervalMs?: number
|
||
heartbeatJitterFraction?: number
|
||
/**
|
||
* Per-instance auth header source. Omit to read the process-wide
|
||
* CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers — REPL,
|
||
* daemon). Required for concurrent multi-session callers.
|
||
*/
|
||
getAuthHeaders?: () => Record<string, string>
|
||
},
|
||
) {
|
||
this.onEpochMismatch =
|
||
opts?.onEpochMismatch ??
|
||
(() => {
|
||
// eslint-disable-next-line custom-rules/no-process-exit
|
||
process.exit(1)
|
||
})
|
||
this.heartbeatIntervalMs =
|
||
opts?.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS
|
||
this.heartbeatJitterFraction = opts?.heartbeatJitterFraction ?? 0
|
||
this.getAuthHeaders = opts?.getAuthHeaders ?? getSessionIngressAuthHeaders
|
||
// Session URL: https://host/v1/code/sessions/{id}
|
||
if (sessionUrl.protocol !== 'http:' && sessionUrl.protocol !== 'https:') {
|
||
throw new Error(
|
||
`CCRClient: Expected http(s) URL, got ${sessionUrl.protocol}`,
|
||
)
|
||
}
|
||
const pathname = sessionUrl.pathname.replace(/\/$/, '')
|
||
this.sessionBaseUrl = `${sessionUrl.protocol}//${sessionUrl.host}${pathname}`
|
||
// Extract session ID from the URL path (last segment)
|
||
this.sessionId = pathname.split('/').pop() || ''
|
||
|
||
this.workerState = new WorkerStateUploader({
|
||
send: body =>
|
||
this.request(
|
||
'put',
|
||
'/worker',
|
||
{ worker_epoch: this.workerEpoch, ...body },
|
||
'PUT worker',
|
||
).then(r => r.ok),
|
||
baseDelayMs: 500,
|
||
maxDelayMs: 30_000,
|
||
jitterMs: 500,
|
||
})
|
||
|
||
this.eventUploader = new SerialBatchEventUploader<ClientEvent>({
|
||
maxBatchSize: 100,
|
||
maxBatchBytes: 10 * 1024 * 1024,
|
||
// flushStreamEventBuffer() enqueues a full 100ms window of accumulated
|
||
// stream_events in one call. A burst of mixed delta types that don't
|
||
// fold into a single snapshot could exceed the old cap (50) and deadlock
|
||
// on the SerialBatchEventUploader backpressure check. Match
|
||
// HybridTransport's bound — high enough to be memory-only.
|
||
maxQueueSize: 100_000,
|
||
send: async batch => {
|
||
const result = await this.request(
|
||
'post',
|
||
'/worker/events',
|
||
{ worker_epoch: this.workerEpoch, events: batch },
|
||
'client events',
|
||
)
|
||
if (!result.ok) {
|
||
throw new RetryableError(
|
||
'client event POST failed',
|
||
result.retryAfterMs,
|
||
)
|
||
}
|
||
},
|
||
baseDelayMs: 500,
|
||
maxDelayMs: 30_000,
|
||
jitterMs: 500,
|
||
})
|
||
|
||
this.internalEventUploader = new SerialBatchEventUploader<WorkerEvent>({
|
||
maxBatchSize: 100,
|
||
maxBatchBytes: 10 * 1024 * 1024,
|
||
maxQueueSize: 200,
|
||
send: async batch => {
|
||
const result = await this.request(
|
||
'post',
|
||
'/worker/internal-events',
|
||
{ worker_epoch: this.workerEpoch, events: batch },
|
||
'internal events',
|
||
)
|
||
if (!result.ok) {
|
||
throw new RetryableError(
|
||
'internal event POST failed',
|
||
result.retryAfterMs,
|
||
)
|
||
}
|
||
},
|
||
baseDelayMs: 500,
|
||
maxDelayMs: 30_000,
|
||
jitterMs: 500,
|
||
})
|
||
|
||
this.deliveryUploader = new SerialBatchEventUploader<{
|
||
eventId: string
|
||
status: 'received' | 'processing' | 'processed'
|
||
}>({
|
||
maxBatchSize: 64,
|
||
maxQueueSize: 64,
|
||
send: async batch => {
|
||
const result = await this.request(
|
||
'post',
|
||
'/worker/events/delivery',
|
||
{
|
||
worker_epoch: this.workerEpoch,
|
||
updates: batch.map(d => ({
|
||
event_id: d.eventId,
|
||
status: d.status,
|
||
})),
|
||
},
|
||
'delivery batch',
|
||
)
|
||
if (!result.ok) {
|
||
throw new RetryableError('delivery POST failed', result.retryAfterMs)
|
||
}
|
||
},
|
||
baseDelayMs: 500,
|
||
maxDelayMs: 30_000,
|
||
jitterMs: 500,
|
||
})
|
||
|
||
// Ack each received client_event so CCR can track delivery status.
|
||
// Wired here (not in initialize()) so the callback is registered the
|
||
// moment new CCRClient() returns — remoteIO must be free to call
|
||
// transport.connect() immediately after without racing the first
|
||
// SSE catch-up frame against an unwired onEventCallback.
|
||
transport.setOnEvent((event: StreamClientEvent) => {
|
||
this.reportDelivery(event.event_id, 'received')
|
||
})
|
||
}
|
||
|
||
/**
|
||
* Initialize the session worker:
|
||
* 1. Take worker_epoch from the argument, or fall back to
|
||
* CLAUDE_CODE_WORKER_EPOCH (set by env-manager / bridge spawner)
|
||
* 2. Report state as 'idle'
|
||
* 3. Start heartbeat timer
|
||
*
|
||
* In-process callers (replBridge) pass the epoch directly — they
|
||
* registered the worker themselves and there is no parent process
|
||
* setting env vars.
|
||
*/
|
||
async initialize(epoch?: number): Promise<Record<string, unknown> | null> {
|
||
const startMs = Date.now()
|
||
if (Object.keys(this.getAuthHeaders()).length === 0) {
|
||
throw new CCRInitError('no_auth_headers')
|
||
}
|
||
if (epoch === undefined) {
|
||
const rawEpoch = process.env.CLAUDE_CODE_WORKER_EPOCH
|
||
epoch = rawEpoch ? parseInt(rawEpoch, 10) : NaN
|
||
}
|
||
if (isNaN(epoch)) {
|
||
throw new CCRInitError('missing_epoch')
|
||
}
|
||
this.workerEpoch = epoch
|
||
|
||
// Concurrent with the init PUT — neither depends on the other.
|
||
const restoredPromise = this.getWorkerState()
|
||
|
||
const result = await this.request(
|
||
'put',
|
||
'/worker',
|
||
{
|
||
worker_status: 'idle',
|
||
worker_epoch: this.workerEpoch,
|
||
// Clear stale pending_action/task_summary left by a prior
|
||
// worker crash — the in-session clears don't survive process restart.
|
||
external_metadata: {
|
||
pending_action: null,
|
||
task_summary: null,
|
||
},
|
||
},
|
||
'PUT worker (init)',
|
||
)
|
||
if (!result.ok) {
|
||
// 409 → onEpochMismatch may throw, but request() catches it and returns
|
||
// false. Without this check we'd continue to startHeartbeat(), leaking a
|
||
// 20s timer against a dead epoch. Throw so connect()'s rejection handler
|
||
// fires instead of the success path.
|
||
throw new CCRInitError('worker_register_failed')
|
||
}
|
||
this.currentState = 'idle'
|
||
this.startHeartbeat()
|
||
|
||
// sessionActivity's refcount-gated timer fires while an API call or tool
|
||
// is in-flight; without a write the container lease can expire mid-wait.
|
||
// v1 wires this in WebSocketTransport per-connection.
|
||
registerSessionActivityCallback(() => {
|
||
void this.writeEvent({ type: 'keep_alive' })
|
||
})
|
||
|
||
logForDebugging(`CCRClient: initialized, epoch=${this.workerEpoch}`)
|
||
logForDiagnosticsNoPII('info', 'cli_worker_lifecycle_initialized', {
|
||
epoch: this.workerEpoch,
|
||
duration_ms: Date.now() - startMs,
|
||
})
|
||
|
||
// Await the concurrent GET and log state_restored here, after the PUT
|
||
// has succeeded — logging inside getWorkerState() raced: if the GET
|
||
// resolved before the PUT failed, diagnostics showed both init_failed
|
||
// and state_restored for the same session.
|
||
const { metadata, durationMs } = await restoredPromise
|
||
if (!this.closed) {
|
||
logForDiagnosticsNoPII('info', 'cli_worker_state_restored', {
|
||
duration_ms: durationMs,
|
||
had_state: metadata !== null,
|
||
})
|
||
}
|
||
return metadata
|
||
}
|
||
|
||
// Control_requests are marked processed and not re-delivered on
|
||
// restart, so read back what the prior worker wrote.
|
||
private async getWorkerState(): Promise<{
|
||
metadata: Record<string, unknown> | null
|
||
durationMs: number
|
||
}> {
|
||
const startMs = Date.now()
|
||
const authHeaders = this.getAuthHeaders()
|
||
if (Object.keys(authHeaders).length === 0) {
|
||
return { metadata: null, durationMs: 0 }
|
||
}
|
||
const data = await this.getWithRetry<WorkerStateResponse>(
|
||
`${this.sessionBaseUrl}/worker`,
|
||
authHeaders,
|
||
'worker_state',
|
||
)
|
||
return {
|
||
metadata: data?.worker?.external_metadata ?? null,
|
||
durationMs: Date.now() - startMs,
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Send an authenticated HTTP request to CCR. Handles auth headers,
|
||
* 409 epoch mismatch, and error logging. Returns { ok: true } on 2xx.
|
||
* On 429, reads Retry-After (integer seconds) so the uploader can honor
|
||
* the server's backoff hint instead of blindly exponentiating.
|
||
*/
|
||
private async request(
|
||
method: 'post' | 'put',
|
||
path: string,
|
||
body: unknown,
|
||
label: string,
|
||
{ timeout = 10_000 }: { timeout?: number } = {},
|
||
): Promise<RequestResult> {
|
||
const authHeaders = this.getAuthHeaders()
|
||
if (Object.keys(authHeaders).length === 0) return { ok: false }
|
||
|
||
try {
|
||
const response = await this.http[method](
|
||
`${this.sessionBaseUrl}${path}`,
|
||
body,
|
||
{
|
||
headers: {
|
||
...authHeaders,
|
||
'Content-Type': 'application/json',
|
||
'anthropic-version': '2023-06-01',
|
||
'User-Agent': getClaudeCodeUserAgent(),
|
||
},
|
||
validateStatus: alwaysValidStatus,
|
||
timeout,
|
||
},
|
||
)
|
||
|
||
if (response.status >= 200 && response.status < 300) {
|
||
this.consecutiveAuthFailures = 0
|
||
return { ok: true }
|
||
}
|
||
if (response.status === 409) {
|
||
this.handleEpochMismatch()
|
||
}
|
||
if (response.status === 401 || response.status === 403) {
|
||
// A 401 with an expired JWT is deterministic — no retry will
|
||
// ever succeed. Check the token's own exp before burning
|
||
// wall-clock on the threshold loop.
|
||
const tok = getSessionIngressAuthToken()
|
||
const exp = tok ? decodeJwtExpiry(tok) : null
|
||
if (exp !== null && exp * 1000 < Date.now()) {
|
||
logForDebugging(
|
||
`CCRClient: session_token expired (exp=${new Date(exp * 1000).toISOString()}) — no refresh was delivered, exiting`,
|
||
{ level: 'error' },
|
||
)
|
||
logForDiagnosticsNoPII('error', 'cli_worker_token_expired_no_refresh')
|
||
this.onEpochMismatch()
|
||
}
|
||
// Token looks valid but server says 401 — possible server-side
|
||
// blip (userauth down, KMS hiccup). Count toward threshold.
|
||
this.consecutiveAuthFailures++
|
||
if (this.consecutiveAuthFailures >= MAX_CONSECUTIVE_AUTH_FAILURES) {
|
||
logForDebugging(
|
||
`CCRClient: ${this.consecutiveAuthFailures} consecutive auth failures with a valid-looking token — server-side auth unrecoverable, exiting`,
|
||
{ level: 'error' },
|
||
)
|
||
logForDiagnosticsNoPII('error', 'cli_worker_auth_failures_exhausted')
|
||
this.onEpochMismatch()
|
||
}
|
||
}
|
||
logForDebugging(`CCRClient: ${label} returned ${response.status}`, {
|
||
level: 'warn',
|
||
})
|
||
logForDiagnosticsNoPII('warn', 'cli_worker_request_failed', {
|
||
method,
|
||
path,
|
||
status: response.status,
|
||
})
|
||
if (response.status === 429) {
|
||
const raw = response.headers?.['retry-after']
|
||
const seconds = typeof raw === 'string' ? parseInt(raw, 10) : NaN
|
||
if (!isNaN(seconds) && seconds >= 0) {
|
||
return { ok: false, retryAfterMs: seconds * 1000 }
|
||
}
|
||
}
|
||
return { ok: false }
|
||
} catch (error) {
|
||
logForDebugging(`CCRClient: ${label} failed: ${errorMessage(error)}`, {
|
||
level: 'warn',
|
||
})
|
||
logForDiagnosticsNoPII('warn', 'cli_worker_request_error', {
|
||
method,
|
||
path,
|
||
error_code: getErrnoCode(error),
|
||
})
|
||
return { ok: false }
|
||
}
|
||
}
|
||
|
||
/** Report worker state to CCR via PUT /sessions/{id}/worker. */
|
||
reportState(state: SessionState, details?: RequiresActionDetails): void {
|
||
if (state === this.currentState && !details) return
|
||
this.currentState = state
|
||
this.workerState.enqueue({
|
||
worker_status: state,
|
||
requires_action_details: details
|
||
? {
|
||
tool_name: details.tool_name,
|
||
action_description: details.action_description,
|
||
request_id: details.request_id,
|
||
}
|
||
: null,
|
||
})
|
||
}
|
||
|
||
/** Report external metadata to CCR via PUT /worker. */
|
||
reportMetadata(metadata: Record<string, unknown>): void {
|
||
this.workerState.enqueue({ external_metadata: metadata })
|
||
}
|
||
|
||
/**
|
||
* Handle epoch mismatch (409 Conflict). A newer CC instance has replaced
|
||
* this one — exit immediately.
|
||
*/
|
||
private handleEpochMismatch(): never {
|
||
logForDebugging('CCRClient: Epoch mismatch (409), shutting down', {
|
||
level: 'error',
|
||
})
|
||
logForDiagnosticsNoPII('error', 'cli_worker_epoch_mismatch')
|
||
this.onEpochMismatch()
|
||
}
|
||
|
||
/** Start periodic heartbeat. */
|
||
private startHeartbeat(): void {
|
||
this.stopHeartbeat()
|
||
const schedule = (): void => {
|
||
const jitter =
|
||
this.heartbeatIntervalMs *
|
||
this.heartbeatJitterFraction *
|
||
(2 * Math.random() - 1)
|
||
this.heartbeatTimer = setTimeout(tick, this.heartbeatIntervalMs + jitter)
|
||
}
|
||
const tick = (): void => {
|
||
void this.sendHeartbeat()
|
||
// stopHeartbeat nulls the timer; check after the fire-and-forget send
|
||
// but before rescheduling so close() during sendHeartbeat is honored.
|
||
if (this.heartbeatTimer === null) return
|
||
schedule()
|
||
}
|
||
schedule()
|
||
}
|
||
|
||
/** Stop heartbeat timer. */
|
||
private stopHeartbeat(): void {
|
||
if (this.heartbeatTimer) {
|
||
clearTimeout(this.heartbeatTimer)
|
||
this.heartbeatTimer = null
|
||
}
|
||
}
|
||
|
||
/** Send a heartbeat via POST /sessions/{id}/worker/heartbeat. */
|
||
private async sendHeartbeat(): Promise<void> {
|
||
if (this.heartbeatInFlight) return
|
||
this.heartbeatInFlight = true
|
||
try {
|
||
const result = await this.request(
|
||
'post',
|
||
'/worker/heartbeat',
|
||
{ session_id: this.sessionId, worker_epoch: this.workerEpoch },
|
||
'Heartbeat',
|
||
{ timeout: 5_000 },
|
||
)
|
||
if (result.ok) {
|
||
logForDebugging('CCRClient: Heartbeat sent')
|
||
}
|
||
} finally {
|
||
this.heartbeatInFlight = false
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Write a StdoutMessage as a client event via POST /sessions/{id}/worker/events.
|
||
* These events are visible to frontend clients via the SSE stream.
|
||
* Injects a UUID if missing to ensure server-side idempotency on retry.
|
||
*
|
||
* stream_event messages are held in a 100ms delay buffer and accumulated
|
||
* (text_deltas for the same content block emit a full-so-far snapshot per
|
||
* flush). A non-stream_event write flushes the buffer first so downstream
|
||
* ordering is preserved.
|
||
*/
|
||
async writeEvent(message: StdoutMessage): Promise<void> {
|
||
if (message.type === 'stream_event') {
|
||
this.streamEventBuffer.push(message)
|
||
if (!this.streamEventTimer) {
|
||
this.streamEventTimer = setTimeout(
|
||
() => void this.flushStreamEventBuffer(),
|
||
STREAM_EVENT_FLUSH_INTERVAL_MS,
|
||
)
|
||
}
|
||
return
|
||
}
|
||
await this.flushStreamEventBuffer()
|
||
if (message.type === 'assistant') {
|
||
clearStreamAccumulatorForMessage(this.streamTextAccumulator, message)
|
||
}
|
||
await this.eventUploader.enqueue(this.toClientEvent(message))
|
||
}
|
||
|
||
/** Wrap a StdoutMessage as a ClientEvent, injecting a UUID if missing. */
|
||
private toClientEvent(message: StdoutMessage): ClientEvent {
|
||
const msg = message as unknown as Record<string, unknown>
|
||
return {
|
||
payload: {
|
||
...msg,
|
||
uuid: typeof msg.uuid === 'string' ? msg.uuid : randomUUID(),
|
||
} as EventPayload,
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Drain the stream_event delay buffer: accumulate text_deltas into
|
||
* full-so-far snapshots, clear the timer, enqueue the resulting events.
|
||
* Called from the timer, from writeEvent on a non-stream message, and from
|
||
* flush(). close() drops the buffer — call flush() first if you need
|
||
* delivery.
|
||
*/
|
||
private async flushStreamEventBuffer(): Promise<void> {
|
||
if (this.streamEventTimer) {
|
||
clearTimeout(this.streamEventTimer)
|
||
this.streamEventTimer = null
|
||
}
|
||
if (this.streamEventBuffer.length === 0) return
|
||
const buffered = this.streamEventBuffer
|
||
this.streamEventBuffer = []
|
||
const payloads = accumulateStreamEvents(
|
||
buffered,
|
||
this.streamTextAccumulator,
|
||
)
|
||
await this.eventUploader.enqueue(
|
||
payloads.map(payload => ({ payload, ephemeral: true })),
|
||
)
|
||
}
|
||
|
||
/**
|
||
* Write an internal worker event via POST /sessions/{id}/worker/internal-events.
|
||
* These events are NOT visible to frontend clients — they store worker-internal
|
||
* state (transcript messages, compaction markers) needed for session resume.
|
||
*/
|
||
async writeInternalEvent(
|
||
eventType: string,
|
||
payload: Record<string, unknown>,
|
||
{
|
||
isCompaction = false,
|
||
agentId,
|
||
}: {
|
||
isCompaction?: boolean
|
||
agentId?: string
|
||
} = {},
|
||
): Promise<void> {
|
||
const event: WorkerEvent = {
|
||
payload: {
|
||
type: eventType,
|
||
...payload,
|
||
uuid: typeof payload.uuid === 'string' ? payload.uuid : randomUUID(),
|
||
} as EventPayload,
|
||
...(isCompaction && { is_compaction: true }),
|
||
...(agentId && { agent_id: agentId }),
|
||
}
|
||
await this.internalEventUploader.enqueue(event)
|
||
}
|
||
|
||
/**
|
||
* Flush pending internal events. Call between turns and on shutdown
|
||
* to ensure transcript entries are persisted.
|
||
*/
|
||
flushInternalEvents(): Promise<void> {
|
||
return this.internalEventUploader.flush()
|
||
}
|
||
|
||
/**
|
||
* Flush pending client events (writeEvent queue). Call before close()
|
||
* when the caller needs delivery confirmation — close() abandons the
|
||
* queue. Resolves once the uploader drains or rejects; returns
|
||
* regardless of whether individual POSTs succeeded (check server state
|
||
* separately if that matters).
|
||
*/
|
||
async flush(): Promise<void> {
|
||
await this.flushStreamEventBuffer()
|
||
return this.eventUploader.flush()
|
||
}
|
||
|
||
/**
|
||
* Read foreground agent internal events from
|
||
* GET /sessions/{id}/worker/internal-events.
|
||
* Returns transcript entries from the last compaction boundary, or null on failure.
|
||
* Used for session resume.
|
||
*/
|
||
async readInternalEvents(): Promise<InternalEvent[] | null> {
|
||
return this.paginatedGet('/worker/internal-events', {}, 'internal_events')
|
||
}
|
||
|
||
/**
|
||
* Read all subagent internal events from
|
||
* GET /sessions/{id}/worker/internal-events?subagents=true.
|
||
* Returns a merged stream across all non-foreground agents, each from its
|
||
* compaction point. Used for session resume.
|
||
*/
|
||
async readSubagentInternalEvents(): Promise<InternalEvent[] | null> {
|
||
return this.paginatedGet(
|
||
'/worker/internal-events',
|
||
{ subagents: 'true' },
|
||
'subagent_events',
|
||
)
|
||
}
|
||
|
||
/**
|
||
* Paginated GET with retry. Fetches all pages from a list endpoint,
|
||
* retrying each page on failure with exponential backoff + jitter.
|
||
*/
|
||
private async paginatedGet(
|
||
path: string,
|
||
params: Record<string, string>,
|
||
context: string,
|
||
): Promise<InternalEvent[] | null> {
|
||
const authHeaders = this.getAuthHeaders()
|
||
if (Object.keys(authHeaders).length === 0) return null
|
||
|
||
const allEvents: InternalEvent[] = []
|
||
let cursor: string | undefined
|
||
|
||
do {
|
||
const url = new URL(`${this.sessionBaseUrl}${path}`)
|
||
for (const [k, v] of Object.entries(params)) {
|
||
url.searchParams.set(k, v)
|
||
}
|
||
if (cursor) {
|
||
url.searchParams.set('cursor', cursor)
|
||
}
|
||
|
||
const page = await this.getWithRetry<ListInternalEventsResponse>(
|
||
url.toString(),
|
||
authHeaders,
|
||
context,
|
||
)
|
||
if (!page) return null
|
||
|
||
allEvents.push(...(page.data ?? []))
|
||
cursor = page.next_cursor
|
||
} while (cursor)
|
||
|
||
logForDebugging(
|
||
`CCRClient: Read ${allEvents.length} internal events from ${path}${params.subagents ? ' (subagents)' : ''}`,
|
||
)
|
||
return allEvents
|
||
}
|
||
|
||
/**
|
||
* Single GET request with retry. Returns the parsed response body
|
||
* on success, null if all retries are exhausted.
|
||
*/
|
||
private async getWithRetry<T>(
|
||
url: string,
|
||
authHeaders: Record<string, string>,
|
||
context: string,
|
||
): Promise<T | null> {
|
||
for (let attempt = 1; attempt <= 10; attempt++) {
|
||
let response
|
||
try {
|
||
response = await this.http.get<T>(url, {
|
||
headers: {
|
||
...authHeaders,
|
||
'anthropic-version': '2023-06-01',
|
||
'User-Agent': getClaudeCodeUserAgent(),
|
||
},
|
||
validateStatus: alwaysValidStatus,
|
||
timeout: 30_000,
|
||
})
|
||
} catch (error) {
|
||
logForDebugging(
|
||
`CCRClient: GET ${url} failed (attempt ${attempt}/10): ${errorMessage(error)}`,
|
||
{ level: 'warn' },
|
||
)
|
||
if (attempt < 10) {
|
||
const delay =
|
||
Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
|
||
await sleep(delay)
|
||
}
|
||
continue
|
||
}
|
||
|
||
if (response.status >= 200 && response.status < 300) {
|
||
return response.data
|
||
}
|
||
if (response.status === 409) {
|
||
this.handleEpochMismatch()
|
||
}
|
||
logForDebugging(
|
||
`CCRClient: GET ${url} returned ${response.status} (attempt ${attempt}/10)`,
|
||
{ level: 'warn' },
|
||
)
|
||
|
||
if (attempt < 10) {
|
||
const delay =
|
||
Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
|
||
await sleep(delay)
|
||
}
|
||
}
|
||
|
||
logForDebugging('CCRClient: GET retries exhausted', { level: 'error' })
|
||
logForDiagnosticsNoPII('error', 'cli_worker_get_retries_exhausted', {
|
||
context,
|
||
})
|
||
return null
|
||
}
|
||
|
||
/**
|
||
* Report delivery status for a client-to-worker event.
|
||
* POST /v1/code/sessions/{id}/worker/events/delivery (batch endpoint)
|
||
*/
|
||
reportDelivery(
|
||
eventId: string,
|
||
status: 'received' | 'processing' | 'processed',
|
||
): void {
|
||
void this.deliveryUploader.enqueue({ eventId, status })
|
||
}
|
||
|
||
/** Get the current epoch (for external use). */
|
||
getWorkerEpoch(): number {
|
||
return this.workerEpoch
|
||
}
|
||
|
||
/** Internal-event queue depth — shutdown-snapshot backpressure signal. */
|
||
get internalEventsPending(): number {
|
||
return this.internalEventUploader.pendingCount
|
||
}
|
||
|
||
/** Clean up uploaders and timers. */
|
||
close(): void {
|
||
this.closed = true
|
||
this.stopHeartbeat()
|
||
unregisterSessionActivityCallback()
|
||
if (this.streamEventTimer) {
|
||
clearTimeout(this.streamEventTimer)
|
||
this.streamEventTimer = null
|
||
}
|
||
this.streamEventBuffer = []
|
||
this.streamTextAccumulator.byMessage.clear()
|
||
this.streamTextAccumulator.scopeToMessage.clear()
|
||
this.workerState.close()
|
||
this.eventUploader.close()
|
||
this.internalEventUploader.close()
|
||
this.deliveryUploader.close()
|
||
}
|
||
}
|