Deep Research: NDJSON streaming so the connection survives long runs

Previously the endpoint returned a single JSON object at the end. Apache+
PHP-FPM buffers the entire body until PHP exits, so a 160s azure_full run
caused the browser to drop the fetch as "Failed to fetch" while the server
was still synthesising — the response then arrived to a dead socket.

Switch to application/x-ndjson with one event per line. The endpoint emits
'progress', 'start', 'step' (running/complete/warning/error), 'subq', and a
final 'final' event carrying the full result payload. Output buffering is
explicitly disabled so each line flushes through Apache as soon as the
agent emits it.

DbnDeepResearchAgent::run() now accepts an optional ?callable $emit and
fires step:running before each step + step:complete after, plus a subq
event per sub-question retrieval round.

JS reads response.body as a stream, splits on newlines, updates the
trace panel live, and renders the final result when the final event
arrives. Status pill shows live progress detail (e.g. "Synthesising with
Azure gpt-4o — this is the slowest step…").

Engine row in the form now shows expected duration per engine
(~15-45s mini, ~60-180s full, ~30-90s GPU) so users know what they're in
for before clicking Run.
This commit is contained in:
2026-05-15 10:47:35 +02:00
parent 4cbe0a4ac4
commit a1a7f442a7
4 changed files with 296 additions and 77 deletions
+110 -22
View File
@@ -7,35 +7,72 @@ require_once __DIR__ . '/../includes/DeepResearchAgent.php';
dbnToolsRequireMethod('POST');
dbnToolsRequireAuth();
$isMultipart = stripos((string)($_SERVER['CONTENT_TYPE'] ?? ''), 'multipart/form-data') !== false;
// Stream-friendly response — defeat output buffering so the user's browser
// receives progress events while the agent runs (can take 60-180s for
// gpt-4o synthesis or multi-file ingest).
@ini_set('output_buffering', '0');
@ini_set('zlib.output_compression', '0');
@ini_set('implicit_flush', '1');
while (ob_get_level() > 0) { @ob_end_clean(); }
ob_implicit_flush(true);
if ($isMultipart) {
$payloadRaw = (string)($_POST['payload'] ?? '');
if ($payloadRaw === '') {
dbnToolsError('Multipart request is missing the "payload" JSON field.', 422, 'missing_payload');
header('Content-Type: application/x-ndjson; charset=utf-8');
header('Cache-Control: no-store');
header('X-Accel-Buffering: no');
$language = 'en';
$startTime = microtime(true);
$emit = function (string $event, array $payload = []) use ($startTime): void {
$payload['event'] = $event;
$payload['t_ms'] = (int)round((microtime(true) - $startTime) * 1000);
echo json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . "\n";
@flush();
};
try {
$isMultipart = stripos((string)($_SERVER['CONTENT_TYPE'] ?? ''), 'multipart/form-data') !== false;
if ($isMultipart) {
$payloadRaw = (string)($_POST['payload'] ?? '');
if ($payloadRaw === '') {
throw new DbnToolsHttpException('Multipart request missing payload.', 422, 'missing_payload');
}
$input = json_decode($payloadRaw, true);
if (!is_array($input)) {
throw new DbnToolsHttpException('Invalid payload JSON.', 422, 'invalid_payload_json');
}
} else {
$raw = file_get_contents('php://input');
if ($raw === false || strlen($raw) > 120000) {
throw new DbnToolsHttpException('Request body unreadable or too large.', 413, 'body_too_large');
}
$input = json_decode((string)$raw, true);
if (!is_array($input)) {
throw new DbnToolsHttpException('Request body must be valid JSON.', 400, 'invalid_json');
}
}
$input = json_decode($payloadRaw, true);
if (!is_array($input)) {
dbnToolsError('Multipart "payload" field must be valid JSON.', 422, 'invalid_payload_json');
}
} else {
$input = dbnToolsJsonInput(120000);
}
$language = dbnToolsNormalizeLanguage($input['language'] ?? 'en');
dbnToolsWithTelemetry('deep_research', $language, function () use ($input, $language) {
$seedQuery = dbnToolsString($input, 'query', 4000, false);
$pastedText = dbnToolsString($input, 'paste_text', 64000, false);
$sliceInput = $input['slices'] ?? null;
$language = dbnToolsNormalizeLanguage($input['language'] ?? 'en');
$seedQuery = trim((string)($input['query'] ?? ''));
$pastedText = trim((string)($input['paste_text'] ?? ''));
$sliceInput = $input['slices'] ?? [];
$engine = (string)($input['engine'] ?? 'azure_mini');
$controls = is_array($input['controls'] ?? null) ? $input['controls'] : [];
if (mb_strlen($seedQuery, 'UTF-8') > 4000) {
throw new DbnToolsHttpException('Query is too long.', 422, 'query_too_long');
}
if (mb_strlen($pastedText, 'UTF-8') > 64000) {
throw new DbnToolsHttpException('Pasted text is too long.', 422, 'paste_too_long');
}
$emit('progress', ['detail' => 'Reading upload(s)…']);
$uploadedFiles = [];
if (!empty($_FILES['files']) && is_array($_FILES['files']['tmp_name'] ?? null)) {
$count = count($_FILES['files']['tmp_name']);
if ($count > 5) {
dbnToolsAbort('At most 5 files can be uploaded per request.', 413, 'too_many_files');
throw new DbnToolsHttpException('At most 5 files can be uploaded per request.', 413, 'too_many_files');
}
for ($i = 0; $i < $count; $i++) {
$file = [
@@ -52,16 +89,67 @@ dbnToolsWithTelemetry('deep_research', $language, function () use ($input, $lang
'chars' => $extracted['chars'],
'truncated' => $extracted['truncated'],
];
$emit('progress', [
'detail' => sprintf('Extracted %s (%d chars%s)',
$extracted['filename'],
$extracted['chars'],
!empty($extracted['truncated']) ? ', truncated' : ''
),
]);
}
}
return (new DbnDeepResearchAgent())->run(
$emit('start', [
'engine' => $engine,
'language' => $language,
'upload_count' => count($uploadedFiles),
]);
$result = (new DbnDeepResearchAgent())->run(
$seedQuery,
$pastedText,
$uploadedFiles,
is_array($sliceInput) ? $sliceInput : [],
$engine,
$language,
$controls
$controls,
$emit
);
});
$result['ok'] = true;
$result['latency_ms'] = (int)round((microtime(true) - $startTime) * 1000);
dbnToolsLogMetadata([
'tool' => 'deep_research',
'language' => $language,
'ok' => true,
'latency_ms' => $result['latency_ms'],
'chunk_count' => (int)($result['trace_metadata']['chunk_count'] ?? 0),
'source_count' => (int)($result['trace_metadata']['source_count'] ?? 0),
'deployment' => $result['trace_metadata']['deployment'] ?? null,
]);
$emit('final', ['result' => $result]);
} catch (DbnToolsHttpException $e) {
$latency = (int)round((microtime(true) - $startTime) * 1000);
dbnToolsLogMetadata([
'tool' => 'deep_research',
'language' => $language,
'ok' => false,
'latency_ms' => $latency,
'error_code' => $e->errorCode,
]);
$emit('error', ['code' => $e->errorCode, 'message' => $e->getMessage(), 'status' => $e->status]);
} catch (Throwable $e) {
error_log('DBN deep research fatal: ' . $e->getMessage());
$latency = (int)round((microtime(true) - $startTime) * 1000);
dbnToolsLogMetadata([
'tool' => 'deep_research',
'language' => $language,
'ok' => false,
'latency_ms' => $latency,
'error_code' => 'internal_error',
]);
$emit('error', ['code' => 'internal_error', 'message' => 'The agent could not complete this request.']);
}
+121 -22
View File
@@ -232,27 +232,38 @@
return;
}
setStatus('Running deep research…', 'busy');
els.runButton.disabled = true;
els.results.innerHTML = `<div class="empty-state"><h3>Working…</h3><p>The agent is expanding the question, retrieving from the corpus, and synthesising the brief. This usually takes 615 seconds.</p></div>`;
const engine = getEngine();
const expectedDuration = engine === 'azure_full'
? '60180 seconds with Azure gpt-4o'
: (engine === 'gpu' ? '3090 seconds on GPU' : '1545 seconds with Azure gpt-4o-mini');
// Render placeholder trace with first step running
const placeholder = STEP_LABELS.map((label, i) => ({
label,
detail: i === 0 ? 'Running…' : 'Queued',
status: i === 0 ? 'running' : 'idle',
}));
renderTrace(placeholder);
setStatus(`Running deep research… (${expectedDuration})`, 'busy');
els.runButton.disabled = true;
els.results.innerHTML = `<div class="empty-state"><h3>Working…</h3><p>The agent is expanding your question and researching the corpus. Live progress in the right-hand panel. Expect ${expectedDuration}.</p></div>`;
// Initialise the trace with all 7 steps as 'idle'
const stepState = STEP_LABELS.map((label) => ({ label, detail: 'Queued', status: 'idle' }));
renderTrace(stepState);
const payload = {
query,
paste_text: '',
slices,
engine: getEngine(),
engine,
language: lang,
controls: getControls(),
};
const stepKeyToIndex = {
interpretation: 0,
expansion: 1,
slice_resolution: 2,
upload_indexing: 3,
retrieval: 4,
synthesis: 5,
confidence: 6,
};
let response;
try {
if (uploadFiles.length > 0) {
@@ -271,25 +282,113 @@
} catch (err) {
setStatus(`Network error: ${err.message || err}`, 'error');
els.runButton.disabled = false;
stepState[0] = { ...stepState[0], status: 'error', detail: String(err) };
renderTrace(stepState);
return;
}
let data = null;
try { data = await response.json(); } catch (_) {}
if (!response.ok || !data || data.ok === false) {
const msg = (data && data.error && data.error.message) || `Request failed (${response.status}).`;
setStatus(msg, 'error');
if (!response.ok || !response.body) {
setStatus(`Request failed (${response.status}).`, 'error');
els.runButton.disabled = false;
renderTrace(placeholder.map((s, i) => i === 0 ? { ...s, status: 'error', detail: msg } : s));
return;
}
lastResult = data;
setStatus(`Done in ${data.latency_ms || 0} ms · ${data.trace_metadata?.source_count || 0} sources · confidence ${data.trace_metadata?.citation_confidence || '?'}`, 'ok');
// Read the NDJSON stream
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let finalResult = null;
let errorEvent = null;
let progressDetail = '';
while (true) {
let chunk;
try {
chunk = await reader.read();
} catch (err) {
setStatus(`Stream error: ${err.message || err}`, 'error');
els.runButton.disabled = false;
return;
}
const { done, value } = chunk;
if (value) {
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
let evt;
try { evt = JSON.parse(trimmed); } catch (_) { continue; }
handleStreamEvent(evt);
}
}
if (done) break;
}
if (errorEvent) {
setStatus(`${errorEvent.code}: ${errorEvent.message}`, 'error');
els.runButton.disabled = false;
// Mark the currently-running step as error
const runningIdx = stepState.findIndex((s) => s.status === 'running');
if (runningIdx >= 0) {
stepState[runningIdx] = { ...stepState[runningIdx], status: 'error', detail: errorEvent.message };
renderTrace(stepState);
}
return;
}
if (!finalResult) {
setStatus('Stream ended without a final result.', 'error');
els.runButton.disabled = false;
return;
}
lastResult = finalResult;
const meta = finalResult.trace_metadata || {};
setStatus(
`Done in ${Math.round((finalResult.latency_ms || 0) / 1000)} s · ${meta.source_count || 0} sources · confidence ${meta.citation_confidence || '?'}`,
'ok'
);
els.runButton.disabled = false;
renderTrace(data.trace || []);
renderResults(data);
renderTrace(finalResult.trace || []);
renderResults(finalResult);
function handleStreamEvent(evt) {
if (!evt || !evt.event) return;
if (evt.event === 'progress') {
progressDetail = evt.detail || '';
if (progressDetail) setStatus(progressDetail, 'busy');
return;
}
if (evt.event === 'start') {
setStatus(`Running… engine=${evt.engine}, uploads=${evt.upload_count || 0}`, 'busy');
return;
}
if (evt.event === 'step') {
const idx = stepKeyToIndex[evt.step];
if (idx === undefined) return;
stepState[idx] = {
label: evt.label || stepState[idx].label,
detail: evt.detail || stepState[idx].detail,
status: evt.status || stepState[idx].status,
};
renderTrace(stepState);
return;
}
if (evt.event === 'subq') {
setStatus(`Retrieving sub-question ${evt.index}/${evt.total}: ${evt.question.slice(0, 80)}${evt.question.length > 80 ? '…' : ''}`, 'busy');
return;
}
if (evt.event === 'final') {
finalResult = evt.result;
return;
}
if (evt.event === 'error') {
errorEvent = evt;
return;
}
}
}
function setStatus(message, kind) {
+4 -4
View File
@@ -16,11 +16,11 @@ require_once __DIR__ . '/includes/layout.php';
<div class="control-row" id="drEngineControl">
<span class="control-label">Engine</span>
<label><input type="radio" name="drEngine" value="azure_mini" checked> Azure gpt-4o-mini &#9733; <small class="control-hint">(fast)</small></label>
<label><input type="radio" name="drEngine" value="azure_full"> Azure gpt-4o <small class="control-hint">(best)</small></label>
<label><input type="radio" name="drEngine" value="gpu"> GPU (cuttlefish) <small class="control-hint">(local)</small></label>
<label><input type="radio" name="drEngine" value="azure_mini" checked> Azure gpt-4o-mini &#9733; <small class="control-hint">(~15-45s)</small></label>
<label><input type="radio" name="drEngine" value="azure_full"> Azure gpt-4o <small class="control-hint">(best · ~60-180s)</small></label>
<label><input type="radio" name="drEngine" value="gpu"> GPU (cuttlefish) <small class="control-hint">(local · ~30-90s)</small></label>
</div>
<p class="upload-hint">Azure engines use your BNL Azure credits. GPU runs qwen2.5:14b via LiteLLM on cuttlefish.</p>
<p class="upload-hint">Azure mini is the default and finishes fastest. Azure full is the most thorough but can take 1-3 minutes. GPU keeps everything inside the BNL fleet. Live progress shown in the right-hand reasoning panel.</p>
<div class="dr-slice-section">
<p class="control-label">Corpus slices</p>
+61 -29
View File
@@ -30,7 +30,8 @@ final class DbnDeepResearchAgent
array $sliceSelection,
string $engine,
string $language,
array $controls
array $controls,
?callable $emit = null
): array {
$seedQuery = trim($seedQuery);
$pastedText = trim($pastedText);
@@ -58,31 +59,49 @@ final class DbnDeepResearchAgent
$trace = [];
$seedDescription = $this->buildSeedDescription($seedQuery, $pastedText, $uploadedFiles);
// STEP 1: Query interpretation — build research brief
$emitStep = function (string $stepId, string $label, string $detail, string $status) use (&$trace, $emit): void {
$trace[] = $this->trace($label, $detail, $status);
if ($emit) {
$emit('step', [
'step' => $stepId,
'label' => $label,
'detail' => $detail,
'status' => $status,
]);
}
};
$emitRunning = function (string $stepId, string $label, string $detail = 'Running…') use ($emit): void {
if ($emit) {
$emit('step', [
'step' => $stepId,
'label' => $label,
'detail' => $detail,
'status' => 'running',
]);
}
};
// STEP 1: Query interpretation
$emitRunning('interpretation', 'Query interpretation', 'Summarising the seed input…');
$stepStart = microtime(true);
$interpretation = $this->interpretSeed($seedDescription, $language);
$this->stepTimings['interpretation'] = $this->elapsedMs($stepStart);
$trace[] = $this->trace(
'Query interpretation',
$interpretation['detail'],
'complete'
);
$emitStep('interpretation', 'Query interpretation', $interpretation['detail'], 'complete');
// STEP 2: Query expansion
$emitRunning('expansion', 'Query expansion', 'Generating sub-questions…');
$stepStart = microtime(true);
$expansion = $this->expandQueries($seedDescription, $interpretation['brief'], $controls['sub_q_count'], $language);
$this->stepTimings['expansion'] = $this->elapsedMs($stepStart);
$subQuestions = $expansion['questions'];
$expansionStatus = $expansion['fallback'] ? 'warning' : 'complete';
$trace[] = $this->trace(
'Query expansion',
$expansion['fallback']
? 'Could not parse sub-questions; falling back to retrieving on the seed query alone.'
: sprintf('Generated %d sub-questions to research the corpus from multiple angles.', count($subQuestions)),
$expansionStatus
);
$expansionDetail = $expansion['fallback']
? 'Could not parse sub-questions; falling back to retrieving on the seed query alone.'
: sprintf('Generated %d sub-questions to research the corpus from multiple angles.', count($subQuestions));
$emitStep('expansion', 'Query expansion', $expansionDetail, $expansionStatus);
// STEP 3: Slice resolution
$emitRunning('slice_resolution', 'Slice resolution', 'Resolving slice toggles to document IDs…');
$stepStart = microtime(true);
$sliceSelectionNormalized = dbnV6NormalizeSliceSelection($sliceSelection);
if (!array_filter($sliceSelectionNormalized)) {
@@ -104,9 +123,12 @@ final class DbnDeepResearchAgent
$sliceDetail = 'Slice resolution failed; corpus search will run unconstrained.';
}
$this->stepTimings['slice_resolution'] = $this->elapsedMs($stepStart);
$trace[] = $this->trace('Slice resolution', $sliceDetail, $sliceStatus);
$emitStep('slice_resolution', 'Slice resolution', $sliceDetail, $sliceStatus);
// STEP 4: Upload indexing (in-memory, ephemeral)
$emitRunning('upload_indexing', 'Upload indexing', empty($uploadedFiles)
? 'No uploads; skipping…'
: sprintf('Chunking + embedding %d file(s) in memory…', count($uploadedFiles)));
$stepStart = microtime(true);
$uploadChunks = [];
foreach ($uploadedFiles as $idx => $file) {
@@ -141,15 +163,16 @@ final class DbnDeepResearchAgent
$uploadDetail = 'No files uploaded; agent will research the corpus only.';
}
$this->stepTimings['upload_indexing'] = $this->elapsedMs($stepStart);
$trace[] = $this->trace('Upload indexing', $uploadDetail, $uploadStatus);
$emitStep('upload_indexing', 'Upload indexing', $uploadDetail, $uploadStatus);
// STEP 5: Retrieval (per sub-question)
$stepStart = microtime(true);
$retrievalQueries = $subQuestions ?: [[
'id' => 'q1',
'question' => $seedQuery !== '' ? $seedQuery : ($interpretation['brief'] ?: 'legal research'),
'rationale' => 'Seed query (no sub-question expansion).',
]];
$emitRunning('retrieval', 'Retrieval', sprintf('Hybrid vector + keyword + rerank across %d sub-question(s)…', count($retrievalQueries)));
$stepStart = microtime(true);
try {
$rag = new ClientRagPipeline((int)$client['id'], 'http://10.0.1.10:4000', 60);
@@ -159,7 +182,15 @@ final class DbnDeepResearchAgent
$rawPool = [];
$retrievalWarnings = 0;
foreach ($retrievalQueries as $sq) {
foreach ($retrievalQueries as $idx => $sq) {
if ($emit) {
$emit('subq', [
'index' => $idx + 1,
'total' => count($retrievalQueries),
'id' => $sq['id'],
'question' => $sq['question'],
]);
}
try {
$corpusChunks = $rag->searchAll(
$sq['question'],
@@ -197,22 +228,21 @@ final class DbnDeepResearchAgent
$merged = $this->mergeAndDedupe($rawPool, self::POOL_CAP);
$this->stepTimings['retrieval'] = $this->elapsedMs($stepStart);
$retrievalStatus = $retrievalWarnings > 0 ? 'warning' : 'complete';
$trace[] = $this->trace(
'Retrieval',
sprintf(
'%d sub-question(s) × hybrid + RRF + rerank → %d raw chunks → %d unique after dedupe.',
count($retrievalQueries),
count($rawPool),
count($merged)
),
$retrievalStatus
$retrievalDetail = sprintf(
'%d sub-question(s) × hybrid + RRF + rerank → %d raw chunks → %d unique after dedupe.',
count($retrievalQueries),
count($rawPool),
count($merged)
);
$emitStep('retrieval', 'Retrieval', $retrievalDetail, $retrievalStatus);
// Cap pool to reranker top-K for synthesis
$synthesisPool = array_slice($merged, 0, $controls['reranker_top_k']);
$numberedSources = $this->numberSources($synthesisPool);
// STEP 6: Synthesis
$synthesisEngineLabel = $engine === 'azure_full' ? 'Azure gpt-4o' : ($engine === 'gpu' ? 'GPU qwen2.5:14b' : 'Azure gpt-4o-mini');
$emitRunning('synthesis', 'Synthesis', sprintf('Synthesising cited brief with %s — this is the slowest step…', $synthesisEngineLabel));
$stepStart = microtime(true);
$synthesis = $this->synthesise(
$seedDescription,
@@ -224,7 +254,8 @@ final class DbnDeepResearchAgent
$controls['temperature']
);
$this->stepTimings['synthesis'] = $this->elapsedMs($stepStart);
$trace[] = $this->trace(
$emitStep(
'synthesis',
'Synthesis',
sprintf('%s synthesised the brief using %d grounded source(s).', $synthesis['deploy_label'], count($numberedSources)),
'complete'
@@ -232,7 +263,8 @@ final class DbnDeepResearchAgent
// STEP 7: Confidence
$confidence = $this->citationConfidence($numberedSources);
$trace[] = $this->trace(
$emitStep(
'confidence',
'Citation confidence',
sprintf('%s confidence based on %d source(s) and reranker score distribution.', ucfirst($confidence), count($numberedSources)),
$confidence === 'low' ? 'warning' : 'complete'