import * as React from "react"; import { useParams, useNavigate } from "react-router-dom"; import { Box, Container, Paper, Typography, Button, Chip, CircularProgress, Alert, Stepper, Step, StepLabel, StepIcon, LinearProgress, IconButton, Snackbar, } from "@mui/material"; import ArrowBackIcon from "@mui/icons-material/ArrowBack"; import ReplayIcon from "@mui/icons-material/Replay"; import CheckCircleIcon from "@mui/icons-material/CheckCircle"; import ErrorIcon from "@mui/icons-material/Error"; import WarningAmberIcon from "@mui/icons-material/WarningAmber"; import PlayArrowIcon from "@mui/icons-material/PlayArrow"; import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline"; import FiberManualRecordIcon from "@mui/icons-material/FiberManualRecord"; import { useFetchRequestAmbiguities, useResolveAmbiguity, } from "./features/fetch-requests"; import type { FetchRequestStatus, SSEEvent, ProgressMessage, } from "./features/fetch-requests"; import { RETRY_MAX, formatApiError } from "./features/fetch-requests"; import { useResourceByName, useConfig, defaultFieldComponents } from "../react-openapi"; const statusColors: Record = { pending: "default", processing: "info", paused: "warning", raw_expenses_done: "primary", enriched_done: "warning", completed: "success", failed: "error", }; const statusIcons: Record = { pending: , processing: , paused: , raw_expenses_done: , enriched_done: , completed: , failed: , }; function computeProgressPercent( status: FetchRequestStatus, liveCount: number, seenSteps: Set, stepStats: Record, txnBlockCount: number, txnDictCount: number, ): number { if (status === "pending") return 0; if (status === "completed") return 100; let pct = 0; if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) pct += 10; if (txnBlockCount > 0) { const current = Math.max(liveCount, stepStats.txn_dicts ?? 0); pct += Math.min(1, current / txnBlockCount) * 20; } if (txnDictCount > 0) { pct += Math.min(1, (stepStats.enrich_count ?? 0) / txnDictCount) * 50; pct += Math.min(1, (stepStats.save_count ?? 0) / txnDictCount) * 20; } return Math.round(Math.min(100, pct)); } const stepLabels = ["Extract", "Raw Expense", "Enrich", "Save"]; function computeActiveStep(status: FetchRequestStatus, seenSteps: Set): number { if (status === "completed") return stepLabels.length; if (seenSteps.has("save_expenses/completed") || seenSteps.has("complete/completed")) return stepLabels.length; if (seenSteps.has("save_expenses") || seenSteps.has("complete")) return 3; if (seenSteps.has("enrich/completed")) return 3; if (seenSteps.has("enrich")) return 2; if (seenSteps.has("txn_dicts/completed") || status === "raw_expenses_done") return 2; if (seenSteps.has("txn_dicts")) return 1; if (seenSteps.has("txn_blocks/completed")) return 1; if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) return 0; if (status === "processing" || status === "paused") return 0; return -1; } function formatProgressMessage(msg: ProgressMessage): string { if (msg.lines !== undefined) return `${msg.lines} lines`; if (msg.blocks !== undefined) return `${msg.blocks} blocks`; if (msg.count !== undefined && msg.unit) return `${msg.count} ${msg.unit}`; if (msg.count !== undefined) return `${msg.count} items`; if (msg.raw_ocr_line) return `"${msg.raw_ocr_line.slice(0, 60)}${msg.raw_ocr_line.length > 60 ? "…" : ""}"`; if (msg.error) return msg.error.slice(0, 80); return ""; } function sseIcon(status: SSEEvent["status"]) { switch (status) { case "started": return ; case "completed": return ; case "failed": return ; case "skipped": return ; case "paused": return ; case "progress": return ( ); } } function isMathValid(candidate: { amount: number; balance: number }, prevBalance: number) { return ( candidate.balance === prevBalance + candidate.amount || candidate.balance === prevBalance - candidate.amount || Math.abs(candidate.balance - (prevBalance + candidate.amount)) < 0.01 || Math.abs(candidate.balance - (prevBalance - candidate.amount)) < 0.01 ); } export default function FetchRequestDetail() { const { id } = useParams<{ id: string }>(); const navigate = useNavigate(); const config = useConfig(); const { useRead, usePatch } = useResourceByName("fetch-requests", { fieldComponents: defaultFieldComponents }); const { data: fetchRequest, isLoading, error: fetchError, refetch: refetchRequest } = useRead(id!); const updateMutation = usePatch(); const resolveMutation = useResolveAmbiguity(); const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!); const [sseEvents, setSseEvents] = React.useState([]); const [sseConnected, setSseConnected] = React.useState(false); const [liveParsedCount, setLiveParsedCount] = React.useState(undefined); const [stepStats, setStepStats] = React.useState>({}); const [failNotif, setFailNotif] = React.useState(null); const sseRef = React.useRef(null); const feedRef = React.useRef(null); const txnBlockCount = React.useMemo(() => { const blocks = (fetchRequest as any)?.source?.txn_blocks; if (!blocks) return 0; return Object.values(blocks).reduce( (sum: number, list: any) => sum + (Array.isArray(list) ? list.length : 0), 0, ); }, [fetchRequest]); const stepMessages = React.useMemo(() => { const msgs: Record = {}; const source = (fetchRequest as any)?.source; const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0); if (rawLineCount) msgs[0] = `${rawLineCount}`; const sourceDictCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0; const dictLive = liveParsedCount ?? stepStats.txn_dicts ?? 0; const dictCurrent = Math.max(dictLive, sourceDictCount); if (dictCurrent && txnBlockCount) msgs[1] = `${dictCurrent}/${txnBlockCount}`; else if (dictCurrent) msgs[1] = `${dictCurrent}`; const txnDictDenom = stepStats.txn_dicts ?? sourceDictCount; if (stepStats.enrich_count && txnDictDenom) msgs[2] = `${stepStats.enrich_count}/${txnDictDenom}`; else if (stepStats.enrich_count) msgs[2] = `${stepStats.enrich_count}`; if (stepStats.save_count && txnDictDenom) msgs[3] = `${stepStats.save_count}/${txnDictDenom}`; else if (stepStats.save_count) msgs[3] = `${stepStats.save_count}`; return msgs; }, [fetchRequest, stepStats, liveParsedCount, txnBlockCount]); React.useEffect(() => { if (!id || !config?.baseUrl) return; const url = `${config.baseUrl}/fetch-requests/${id}/events`; const es = new EventSource(url); sseRef.current = es; es.onopen = () => setSseConnected(true); es.onerror = () => setSseConnected(false); es.onmessage = (event) => { try { const parsed: SSEEvent = JSON.parse(event.data); setSseEvents((prev) => [...prev, parsed]); if (parsed.status === "progress" && parsed.message.count !== undefined) { if (parsed.step === "txn_dicts") setLiveParsedCount(parsed.message.count); if (parsed.step === "enrich") setStepStats((prev) => ({ ...prev, enrich_count: parsed.message.count! })); if (parsed.step === "save_expenses") setStepStats((prev) => ({ ...prev, save_count: parsed.message.count! })); } if (parsed.status === "completed" && parsed.message.count !== undefined) { const stats: Record = {}; if (parsed.step === "raw_lines" && parsed.message.lines !== undefined) stats.raw_lines = parsed.message.lines; if (parsed.step === "txn_blocks" && parsed.message.blocks !== undefined) stats.txn_blocks = parsed.message.blocks; if (parsed.step === "txn_dicts") stats.txn_dicts = parsed.message.count; if (parsed.step === "enrich") stats.enrich_count = parsed.message.count; if (parsed.step === "save_expenses") stats.save_count = parsed.message.count; if (Object.keys(stats).length) { setStepStats((prev) => ({ ...prev, ...stats })); } } if (parsed.status === "paused") { refetchRequest(); refetchAmbiguities(); } if (parsed.status === "failed") { setFailNotif(parsed.message.error || "Fetch request failed"); refetchRequest(); } if (parsed.status === "completed" || parsed.step === "resume_extract") { refetchRequest(); } } catch { // ignore malformed events } }; return () => { es.close(); sseRef.current = null; }; }, [id, config?.baseUrl]); React.useEffect(() => { if (feedRef.current) { feedRef.current.scrollTop = feedRef.current.scrollHeight; } }, [sseEvents]); const displayEvents = React.useMemo(() => { const progressSteps = new Set(["txn_dicts", "enrich", "save_expenses"]); const lastProgressIdx: Record = {}; for (let i = sseEvents.length - 1; i >= 0; i--) { const e = sseEvents[i]; if (progressSteps.has(e.step) && e.status === "progress" && lastProgressIdx[e.step] === undefined) { lastProgressIdx[e.step] = i; } } const terminalStatuses = new Set(["completed", "skipped", "paused", "failed"]); return sseEvents.filter((e, i) => { if (progressSteps.has(e.step) && e.status === "progress") return i === lastProgressIdx[e.step]; if (e.status === "started") { return !sseEvents.slice(i + 1).some( (later) => later.step === e.step && terminalStatuses.has(later.status), ); } return true; }); }, [sseEvents]); const seenSteps = React.useMemo(() => { const steps = new Set(); for (const evt of sseEvents) { steps.add(evt.step); if (evt.status === "completed") steps.add(`${evt.step}/completed`); if (evt.status === "failed") steps.add(`${evt.step}/failed`); if (evt.status === "started") steps.add(`${evt.step}/started`); if (evt.status === "progress") steps.add(`${evt.step}/progress`); } return steps; }, [sseEvents]); const displayParsedCount = React.useMemo(() => { if (liveParsedCount && liveParsedCount > 0) return liveParsedCount; const source = (fetchRequest as any)?.source; const persistedCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0; if (persistedCount > 0) return persistedCount; const dicts = source?.txn_dicts; if (Array.isArray(dicts) && dicts.length > 0) return dicts.length; return 0; }, [liveParsedCount, fetchRequest]); const txnDictCount = React.useMemo(() => { const source = (fetchRequest as any)?.source; if (stepStats.txn_dicts && stepStats.txn_dicts > 0) return stepStats.txn_dicts; return source?.txn_dict_count ?? source?.txn_dicts_count ?? 0; }, [fetchRequest, stepStats]); const progressPercent = React.useMemo( () => computeProgressPercent( (fetchRequest as any)?.status as FetchRequestStatus ?? "pending", displayParsedCount, seenSteps, stepStats, txnBlockCount, txnDictCount, ), [fetchRequest, displayParsedCount, seenSteps, stepStats, txnBlockCount, txnDictCount], ); const handleRetry = async () => { if (!id) return; try { await updateMutation.mutateAsync({ id, data: { status: "pending" } }); } catch (err: any) { setFailNotif(formatApiError(err)); } }; const handleResolve = async (ambiguity: any, candidate: { amount: number; balance: number }) => { await resolveMutation.mutateAsync({ ambiguityId: ambiguity.id, payload: { chosen: { amount: candidate.amount, balance: candidate.balance } }, }); refetchAmbiguities(); }; if (isLoading) { return ( ); } if (fetchError || !fetchRequest) { return ( Failed to load fetch request ); } const req = fetchRequest as any; const activeStep = computeActiveStep(req.status as FetchRequestStatus, seenSteps); const retryCount = req.retry_count ?? 0; const isRetryExhausted = retryCount >= RETRY_MAX; const pendingAmbiguities = ambiguities?.filter((a: any) => a.status === "pending") ?? []; const resolvedAmbiguities = ambiguities?.filter((a: any) => a.status === "resolved") ?? []; const hasAmbiguities = ambiguities && ambiguities.length > 0; const allResolved = hasAmbiguities && pendingAmbiguities.length === 0; const ambiguitiesLoading = !ambiguities; return ( {req.account_name} Date Range {(req as any).start_date ? new Date((req as any).start_date).toLocaleDateString() : "?"} → {(req as any).end_date ? new Date((req as any).end_date).toLocaleDateString() : "?"} Created {new Date(req.created_at).toLocaleString()} {req.completed_at && ( Completed {new Date(req.completed_at).toLocaleString()} )} Overall Progress {["processing", "paused"].includes(req.status) && displayParsedCount > 0 && ( Validated: {displayParsedCount} transactions )} {progressPercent}% Retries: {retryCount}/{RETRY_MAX} {req.status === "failed" && !isRetryExhausted && ( )} {req.status === "failed" && req.error_message && ( {req.error_message} )} {isRetryExhausted && req.status === "failed" && ( Max retries reached — no further retry attempts will be made. )} Pipeline Progress {stepLabels.map((label, index) => { const isCompleted = index < activeStep; const isActive = index === activeStep; const isPaused = req.status === "paused" && isActive; const isFailed = req.status === "failed" && isActive; let icon: React.ReactNode; if (isCompleted) { icon = ; } else if (isFailed) { icon = ; } else if (isPaused) { icon = ; } else if (isActive) { icon = ; } else { icon = {index + 1}; } const stepMsg = stepMessages[index]; return ( {icon}} > {label} {stepMsg && ( {stepMsg} )} ); })} Progress Events {sseConnected ? "Connected" : "Disconnected"} {displayEvents.length === 0 ? ( Waiting for events... ) : ( displayEvents.map((evt, i) => ( {sseIcon(evt.status)} {evt.step.replace(/_/g, " ")} {evt.message && formatProgressMessage(evt.message) && ( {formatProgressMessage(evt.message)} )} {new Date().toLocaleTimeString()} )) )} {hasAmbiguities && ( Ambiguity Resolution {allResolved ? ( All ambiguities resolved — pipeline will resume on next poll cycle ) : ( Pipeline paused — resolve ambiguities to continue )} {ambiguities.map((ambiguity: any) => { const isResolved = ambiguity.status === "resolved"; return ( {ambiguity.line} OCR Amount ₹{ambiguity.ocr_amount} OCR Balance ₹{ambiguity.ocr_balance} Previous Balance ₹{ambiguity.prev_balance} {isResolved ? ( }> Resolved: ₹{ambiguity.chosen?.amount} / ₹{ambiguity.chosen?.balance} ) : ( {ambiguity.candidates.map((candidate: any, ci: number) => { const isCredit = candidate.amount > 0; const isDebit = candidate.amount < 0; const cColor = isCredit ? "success.main" : isDebit ? "error.main" : undefined; return ( ); })} )} ); })} )} setFailNotif(null)} anchorOrigin={{ vertical: "bottom", horizontal: "center" }} > setFailNotif(null)} sx={{ borderRadius: 2 }}> {failNotif} ); }