diff --git a/src/FetchRequestDetail.tsx b/src/FetchRequestDetail.tsx index 8592213..70c771d 100644 --- a/src/FetchRequestDetail.tsx +++ b/src/FetchRequestDetail.tsx @@ -23,6 +23,7 @@ import ErrorIcon from "@mui/icons-material/Error"; import WarningAmberIcon from "@mui/icons-material/WarningAmber"; import PlayArrowIcon from "@mui/icons-material/PlayArrow"; import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline"; +import FiberManualRecordIcon from "@mui/icons-material/FiberManualRecord"; import { useFetchRequest, useUpdateFetchRequest, @@ -32,6 +33,7 @@ import { import type { FetchRequestStatus, SSEEvent, + ProgressMessage, } from "./features/fetch-requests"; import { RETRY_MAX } from "./features/fetch-requests"; import { useConfig } from "../react-openapi"; @@ -56,6 +58,29 @@ const statusIcons: Record = { failed: , }; +function computeProgressPercent( + status: FetchRequestStatus, + liveCount: number, + seenSteps: Set, +): number { + if (status === "pending") return 0; + if (status === "completed") return 100; + + if (seenSteps.has("complete")) return 90; + if (seenSteps.has("enrich")) return 85; + if (seenSteps.has("txn_dicts/completed")) return 80; + + if (seenSteps.has("txn_dicts/started") && liveCount > 0) { + return Math.min(80, 35 + Math.min(liveCount, 300) / 300 * 45); + } + + if (seenSteps.has("txn_blocks")) return 35; + if (seenSteps.has("raw_lines")) return 25; + if (seenSteps.has("load_content")) return 15; + + return status === "processing" || status === "paused" ? 10 : 0; +} + const stepLabels = ["Load Content", "Extract", "Validate", "Enrich", "Save"]; function statusToActiveStep(status: FetchRequestStatus): number { @@ -71,12 +96,26 @@ function statusToActiveStep(status: FetchRequestStatus): number { } } +function formatProgressMessage(msg: ProgressMessage): string { + if (msg.lines !== undefined) return `${msg.lines} lines`; + if (msg.blocks !== undefined) return `${msg.blocks} blocks`; + if (msg.count !== undefined && msg.unit) return `${msg.count} ${msg.unit}`; + if (msg.count !== undefined) return `${msg.count} items`; + if (msg.raw_ocr_line) return `"${msg.raw_ocr_line.slice(0, 60)}${msg.raw_ocr_line.length > 60 ? "…" : ""}"`; + return ""; +} + function sseIcon(status: SSEEvent["status"]) { switch (status) { case "started": return ; case "completed": return ; case "skipped": return ; case "paused": return ; + case "progress": return ( + + ); } } @@ -99,21 +138,25 @@ export default function FetchRequestDetail() { const resolveMutation = useResolveAmbiguity(); const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!); + const [sseEvents, setSseEvents] = React.useState([]); + const [sseConnected, setSseConnected] = React.useState(false); + const [liveParsedCount, setLiveParsedCount] = React.useState(0); + const [stepStats, setStepStats] = React.useState>({}); + const sseRef = React.useRef(null); + const feedRef = React.useRef(null); + const stepMessages = React.useMemo(() => { const msgs: Record = {}; const source = (fetchRequest as any)?.source; - if (source?.raw_lines?.length) - msgs[0] = `${source.raw_lines.length} raw lines`; + const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0); + if (rawLineCount) msgs[0] = `${rawLineCount} raw lines`; - const blocks = source?.txn_blocks ?? {}; - const dicts = source?.txn_dicts ?? []; - const blockCount = typeof blocks === "object" - ? Object.keys(blocks).length : Array.isArray(blocks) ? blocks.length : 0; - if (blockCount) - msgs[1] = `${blockCount} blocks`; - if (dicts.length) - msgs[2] = `${dicts.length} dicts`; + const blockCount = stepStats.txn_blocks ?? 0; + if (blockCount) msgs[1] = `${blockCount} blocks`; + + const dictCount = stepStats.txn_dicts ?? liveParsedCount ?? 0; + if (dictCount) msgs[2] = `${dictCount} dicts`; if (["enriched_done", "completed"].includes((fetchRequest as any)?.status)) msgs[3] = "done"; @@ -122,12 +165,7 @@ export default function FetchRequestDetail() { ? new Date((fetchRequest as any).completed_at).toLocaleString() : "done"; return msgs; - }, [fetchRequest]); - - const [sseEvents, setSseEvents] = React.useState([]); - const [sseConnected, setSseConnected] = React.useState(false); - const sseRef = React.useRef(null); - const feedRef = React.useRef(null); + }, [fetchRequest, stepStats, liveParsedCount]); React.useEffect(() => { if (!id || !config?.baseUrl) return; @@ -141,6 +179,21 @@ export default function FetchRequestDetail() { try { const parsed: SSEEvent = JSON.parse(event.data); setSseEvents((prev) => [...prev, parsed]); + + if (parsed.step === "txn_dicts" && parsed.status === "progress" && parsed.message.count !== undefined) { + setLiveParsedCount(parsed.message.count); + } + + if (parsed.status === "completed") { + const stats: Record = {}; + if (parsed.step === "raw_lines" && parsed.message.lines !== undefined) stats.raw_lines = parsed.message.lines; + if (parsed.step === "txn_blocks" && parsed.message.blocks !== undefined) stats.txn_blocks = parsed.message.blocks; + if (parsed.step === "txn_dicts" && parsed.message.count !== undefined) stats.txn_dicts = parsed.message.count; + if (Object.keys(stats).length) { + setStepStats((prev) => ({ ...prev, ...stats })); + } + } + if (parsed.status === "paused") { refetchRequest(); refetchAmbiguities(); @@ -165,6 +218,49 @@ export default function FetchRequestDetail() { } }, [sseEvents]); + const displayEvents = React.useMemo(() => { + let lastProgressIdx = -1; + for (let i = sseEvents.length - 1; i >= 0; i--) { + if (sseEvents[i].step === "txn_dicts" && sseEvents[i].status === "progress") { + lastProgressIdx = i; + break; + } + } + return sseEvents.filter((e, i) => { + if (e.step === "txn_dicts" && e.status === "progress") return i === lastProgressIdx; + return true; + }); + }, [sseEvents]); + + const seenSteps = React.useMemo(() => { + const steps = new Set(); + for (const evt of sseEvents) { + steps.add(evt.step); + if (evt.status === "completed") steps.add(`${evt.step}/completed`); + if (evt.status === "started") steps.add(`${evt.step}/started`); + if (evt.status === "progress") steps.add(`${evt.step}/progress`); + } + return steps; + }, [sseEvents]); + + const progressPercent = React.useMemo( + () => computeProgressPercent( + (fetchRequest as any)?.status as FetchRequestStatus ?? "pending", + liveParsedCount, + seenSteps, + ), + [fetchRequest, liveParsedCount, seenSteps], + ); + + const displayParsedCount = React.useMemo(() => { + if (liveParsedCount > 0) return liveParsedCount; + const source = (fetchRequest as any)?.source; + if (source?.txn_dicts_count) return source.txn_dicts_count; + const dicts = source?.txn_dicts; + if (Array.isArray(dicts) && dicts.length > 0) return dicts.length; + return 0; + }, [liveParsedCount, fetchRequest]); + const handleRetry = async () => { if (!id) return; try { @@ -252,6 +348,28 @@ export default function FetchRequestDetail() { )} + + + + Overall Progress + + {["processing", "paused"].includes(req.status) && displayParsedCount > 0 && ( + + Validated: {displayParsedCount} transactions + + )} + + + + {progressPercent}% + + + @@ -363,12 +481,12 @@ export default function FetchRequestDetail() { gap: 1, }} > - {sseEvents.length === 0 ? ( + {displayEvents.length === 0 ? ( Waiting for events... ) : ( - sseEvents.map((evt, i) => ( + displayEvents.map((evt, i) => ( {evt.step.replace(/_/g, " ")} - {evt.message && ( + {evt.message && formatProgressMessage(evt.message) && ( - {evt.message} + {formatProgressMessage(evt.message)} )} @@ -400,32 +518,22 @@ export default function FetchRequestDetail() { - {(hasAmbiguities || req.status === "paused") && ( + {hasAmbiguities && ( Ambiguity Resolution - {ambiguitiesLoading ? ( - - - Loading ambiguities... - - ) : allResolved ? ( + {allResolved ? ( All ambiguities resolved — pipeline will resume on next poll cycle - ) : !hasAmbiguities ? ( - - Pipeline paused — no ambiguities found - ) : ( Pipeline paused — resolve ambiguities to continue )} - {hasAmbiguities && ( {ambiguities.map((ambiguity: any) => { const isResolved = ambiguity.status === "resolved"; @@ -497,7 +605,6 @@ export default function FetchRequestDetail() { ); })} - )} )} diff --git a/src/features/fetch-requests/fetch-requests.models.ts b/src/features/fetch-requests/fetch-requests.models.ts index 9159ca7..bac7c28 100644 --- a/src/features/fetch-requests/fetch-requests.models.ts +++ b/src/features/fetch-requests/fetch-requests.models.ts @@ -13,6 +13,7 @@ export interface FileSource { raw_lines?: string[]; txn_blocks?: Record; txn_dicts?: Record[]; + txn_dicts_count?: number; } export interface EmailSource { @@ -20,6 +21,7 @@ export interface EmailSource { from_email?: string; subject?: string; raw_terms?: string[]; + txn_dicts_count?: number; } export interface FetchRequestCreate { @@ -80,10 +82,25 @@ export interface ResolveAmbiguityPayload { }; } +export type SSEEventStep = + | "load_content" | "raw_lines" | "txn_blocks" | "txn_dicts" + | "resume_extract" | "extract" | "paused" | "complete" | "enrich"; + +export type SSEEventStatus = + | "started" | "completed" | "skipped" | "paused" | "progress"; + +export interface ProgressMessage { + lines?: number; + blocks?: number; + count?: number; + unit?: string; + raw_ocr_line?: string; +} + export interface SSEEvent { - step: string; - status: "started" | "completed" | "skipped" | "paused"; - message: string; + step: SSEEventStep; + status: SSEEventStatus; + message: ProgressMessage; } export interface FetchRequestFilters { diff --git a/src/features/fetch-requests/index.ts b/src/features/fetch-requests/index.ts index bde2e6c..094ee8c 100644 --- a/src/features/fetch-requests/index.ts +++ b/src/features/fetch-requests/index.ts @@ -11,6 +11,9 @@ export type { AmbiguityCandidate, ResolveAmbiguityPayload, SSEEvent, + SSEEventStep, + SSEEventStatus, + ProgressMessage, } from "./fetch-requests.models"; export { RETRY_MAX } from "./fetch-requests.models"; export {