Add Fetch Request pipeline UI with real-time SSEs #8

Merged
aetos merged 16 commits from ai/fetch-request-steps-ui into fetch-request-steps-ui 2026-05-30 15:58:49 +00:00
Showing only changes of commit a1a406756f - Show all commits

View File

@@ -71,25 +71,33 @@ function computeProgressPercent(
if (seenSteps.has("txn_dicts/completed")) return 80; if (seenSteps.has("txn_dicts/completed")) return 80;
if (seenSteps.has("txn_dicts") && liveCount > 0) { if (seenSteps.has("txn_dicts") && liveCount > 0) {
return Math.min(80, 35 + Math.min(liveCount, 300) / 300 * 45); return Math.min(80, 20 + Math.min(liveCount, 300) / 300 * 60);
} }
if (seenSteps.has("txn_blocks")) return 35; if (seenSteps.has("txn_blocks") || seenSteps.has("raw_lines")) return 20;
if (seenSteps.has("raw_lines")) return 25;
if (seenSteps.has("load_content")) return 15;
return status === "processing" || status === "paused" ? 10 : 0; return status === "processing" || status === "paused" ? 5 : 0;
} }
const stepLabels = ["Load Content", "Extract", "Raw Expense", "Enrich", "Save"]; const stepLabels = ["Extract", "Raw Expense", "Enrich", "Save"];
function computeActiveStep(status: FetchRequestStatus, seenSteps: Set<string>): number { function computeActiveStep(status: FetchRequestStatus, seenSteps: Set<string>): number {
if (status === "completed") return stepLabels.length; if (status === "completed") return stepLabels.length;
if (seenSteps.has("save_expenses") || seenSteps.has("complete")) return 4;
if (seenSteps.has("enrich") || status === "enriched_done") return 3; if (seenSteps.has("save_expenses/completed") || seenSteps.has("complete/completed")) return stepLabels.length;
if (seenSteps.has("txn_dicts") || status === "raw_expenses_done") return 2; if (seenSteps.has("save_expenses") || seenSteps.has("complete")) return 3;
if (seenSteps.has("txn_blocks")) return 1;
if (seenSteps.has("load_content") || status === "processing" || status === "paused") return 0; if (seenSteps.has("enrich/completed")) return 3;
if (seenSteps.has("enrich")) return 2;
if (seenSteps.has("txn_dicts/completed") || status === "raw_expenses_done") return 2;
if (seenSteps.has("txn_dicts")) return 1;
if (seenSteps.has("txn_blocks/completed")) return 1;
if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) return 0;
if (status === "processing" || status === "paused") return 0;
return -1; return -1;
} }
@@ -147,21 +155,20 @@ export default function FetchRequestDetail() {
const source = (fetchRequest as any)?.source; const source = (fetchRequest as any)?.source;
const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0); const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0);
if (rawLineCount) msgs[0] = `${rawLineCount} raw lines`;
const blockCount = stepStats.txn_blocks ?? 0; const blockCount = stepStats.txn_blocks ?? 0;
if (blockCount) msgs[1] = `${blockCount} blocks`; if (rawLineCount || blockCount) msgs[0] = blockCount ? `${blockCount} blocks` : `${rawLineCount} raw lines`;
const dictCount = stepStats.txn_dicts ?? liveParsedCount ?? 0; const sourceDictCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
if (dictCount) msgs[2] = `${dictCount} dicts`; const dictCount = stepStats.txn_dicts ?? liveParsedCount ?? sourceDictCount;
if (dictCount) msgs[1] = `${dictCount} dicts`;
if (stepStats.enrich_count) msgs[3] = `${stepStats.enrich_count} enriched`; if (stepStats.enrich_count) msgs[2] = `${stepStats.enrich_count} enriched`;
else if (["enriched_done", "completed"].includes((fetchRequest as any)?.status)) else if (["enriched_done", "completed"].includes((fetchRequest as any)?.status))
msgs[3] = "done"; msgs[2] = "done";
if (stepStats.save_count) msgs[4] = `${stepStats.save_count} saved`; if (stepStats.save_count) msgs[3] = `${stepStats.save_count} saved`;
else if ((fetchRequest as any)?.status === "completed") else if ((fetchRequest as any)?.status === "completed")
msgs[4] = (fetchRequest as any)?.completed_at msgs[3] = (fetchRequest as any)?.completed_at
? new Date((fetchRequest as any).completed_at).toLocaleString() : "done"; ? new Date((fetchRequest as any).completed_at).toLocaleString() : "done";
return msgs; return msgs;