import * as React from "react"; import { useParams, useNavigate } from "react-router-dom"; import { Box, Container, Paper, Typography, Button, Chip, CircularProgress, Alert, Stepper, Step, StepLabel, StepIcon, LinearProgress, IconButton, } from "@mui/material"; import ArrowBackIcon from "@mui/icons-material/ArrowBack"; import ReplayIcon from "@mui/icons-material/Replay"; import CheckCircleIcon from "@mui/icons-material/CheckCircle"; import ErrorIcon from "@mui/icons-material/Error"; import WarningAmberIcon from "@mui/icons-material/WarningAmber"; import PlayArrowIcon from "@mui/icons-material/PlayArrow"; import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline"; import FiberManualRecordIcon from "@mui/icons-material/FiberManualRecord"; import { useFetchRequest, useUpdateFetchRequest, useFetchRequestAmbiguities, useResolveAmbiguity, } from "./features/fetch-requests"; import type { FetchRequestStatus, SSEEvent, ProgressMessage, } from "./features/fetch-requests"; import { RETRY_MAX } from "./features/fetch-requests"; import { useConfig } from "../react-openapi"; const statusColors: Record = { pending: "default", processing: "info", paused: "warning", raw_expenses_done: "primary", enriched_done: "warning", completed: "success", failed: "error", }; const statusIcons: Record = { pending: , processing: , paused: , raw_expenses_done: , enriched_done: , completed: , failed: , }; function computeProgressPercent( status: FetchRequestStatus, liveCount: number, seenSteps: Set, ): number { if (status === "pending") return 0; if (status === "completed") return 100; if (seenSteps.has("complete")) return 90; if (seenSteps.has("enrich")) return 85; if (seenSteps.has("txn_dicts/completed")) return 80; if (seenSteps.has("txn_dicts/started") && liveCount > 0) { return Math.min(80, 35 + Math.min(liveCount, 300) / 300 * 45); } if (seenSteps.has("txn_blocks")) return 35; if (seenSteps.has("raw_lines")) return 25; if (seenSteps.has("load_content")) return 15; return status === "processing" || status === "paused" ? 10 : 0; } const stepLabels = ["Load Content", "Extract", "Raw Expense", "Enrich", "Save"]; function computeActiveStep(status: FetchRequestStatus, seenSteps: Set): number { if (status === "completed") return stepLabels.length; if (seenSteps.has("complete")) return 4; if (seenSteps.has("enrich") || status === "enriched_done") return 3; if (seenSteps.has("txn_dicts") || status === "raw_expenses_done") return 2; if (seenSteps.has("txn_blocks")) return 1; if (seenSteps.has("load_content") || status === "processing" || status === "paused") return 0; return -1; } function formatProgressMessage(msg: ProgressMessage): string { if (msg.lines !== undefined) return `${msg.lines} lines`; if (msg.blocks !== undefined) return `${msg.blocks} blocks`; if (msg.count !== undefined && msg.unit) return `${msg.count} ${msg.unit}`; if (msg.count !== undefined) return `${msg.count} items`; if (msg.raw_ocr_line) return `"${msg.raw_ocr_line.slice(0, 60)}${msg.raw_ocr_line.length > 60 ? "…" : ""}"`; return ""; } function sseIcon(status: SSEEvent["status"]) { switch (status) { case "started": return ; case "completed": return ; case "skipped": return ; case "paused": return ; case "progress": return ( ); } } function isMathValid(candidate: { amount: number; balance: number }, prevBalance: number) { return ( candidate.balance === prevBalance + candidate.amount || candidate.balance === prevBalance - candidate.amount || Math.abs(candidate.balance - (prevBalance + candidate.amount)) < 0.01 || Math.abs(candidate.balance - (prevBalance - candidate.amount)) < 0.01 ); } export default function FetchRequestDetail() { const { id } = useParams<{ id: string }>(); const navigate = useNavigate(); const config = useConfig(); const { data: fetchRequest, isLoading, error: fetchError, refetch: refetchRequest } = useFetchRequest(id!); const updateMutation = useUpdateFetchRequest(); const resolveMutation = useResolveAmbiguity(); const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!); const [sseEvents, setSseEvents] = React.useState([]); const [sseConnected, setSseConnected] = React.useState(false); const [liveParsedCount, setLiveParsedCount] = React.useState(0); const [stepStats, setStepStats] = React.useState>({}); const sseRef = React.useRef(null); const feedRef = React.useRef(null); const stepMessages = React.useMemo(() => { const msgs: Record = {}; const source = (fetchRequest as any)?.source; const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0); if (rawLineCount) msgs[0] = `${rawLineCount} raw lines`; const blockCount = stepStats.txn_blocks ?? 0; if (blockCount) msgs[1] = `${blockCount} blocks`; const dictCount = stepStats.txn_dicts ?? liveParsedCount ?? 0; if (dictCount) msgs[2] = `${dictCount} dicts`; if (["enriched_done", "completed"].includes((fetchRequest as any)?.status)) msgs[3] = "done"; if ((fetchRequest as any)?.status === "completed") msgs[4] = (fetchRequest as any)?.completed_at ? new Date((fetchRequest as any).completed_at).toLocaleString() : "done"; return msgs; }, [fetchRequest, stepStats, liveParsedCount]); React.useEffect(() => { if (!id || !config?.baseUrl) return; const url = `${config.baseUrl}/fetch-requests/${id}/events`; const es = new EventSource(url); sseRef.current = es; es.onopen = () => setSseConnected(true); es.onerror = () => setSseConnected(false); es.onmessage = (event) => { try { const parsed: SSEEvent = JSON.parse(event.data); setSseEvents((prev) => [...prev, parsed]); if (parsed.step === "txn_dicts" && parsed.status === "progress" && parsed.message.count !== undefined) { setLiveParsedCount(parsed.message.count); } if (parsed.status === "completed") { const stats: Record = {}; if (parsed.step === "raw_lines" && parsed.message.lines !== undefined) stats.raw_lines = parsed.message.lines; if (parsed.step === "txn_blocks" && parsed.message.blocks !== undefined) stats.txn_blocks = parsed.message.blocks; if (parsed.step === "txn_dicts" && parsed.message.count !== undefined) stats.txn_dicts = parsed.message.count; if (Object.keys(stats).length) { setStepStats((prev) => ({ ...prev, ...stats })); } } if (parsed.status === "paused") { refetchRequest(); refetchAmbiguities(); } if (parsed.status === "completed" || parsed.step === "resume_extract") { refetchRequest(); } } catch { // ignore malformed events } }; return () => { es.close(); sseRef.current = null; }; }, [id, config?.baseUrl]); React.useEffect(() => { if (feedRef.current) { feedRef.current.scrollTop = feedRef.current.scrollHeight; } }, [sseEvents]); const displayEvents = React.useMemo(() => { let lastProgressIdx = -1; for (let i = sseEvents.length - 1; i >= 0; i--) { if (sseEvents[i].step === "txn_dicts" && sseEvents[i].status === "progress") { lastProgressIdx = i; break; } } const terminalStatuses = new Set(["completed", "skipped", "paused"]); return sseEvents.filter((e, i) => { if (e.step === "txn_dicts" && e.status === "progress") return i === lastProgressIdx; if (e.status === "started") { return !sseEvents.slice(i + 1).some( (later) => later.step === e.step && terminalStatuses.has(later.status), ); } return true; }); }, [sseEvents]); const seenSteps = React.useMemo(() => { const steps = new Set(); for (const evt of sseEvents) { steps.add(evt.step); if (evt.status === "completed") steps.add(`${evt.step}/completed`); if (evt.status === "started") steps.add(`${evt.step}/started`); if (evt.status === "progress") steps.add(`${evt.step}/progress`); } return steps; }, [sseEvents]); const progressPercent = React.useMemo( () => computeProgressPercent( (fetchRequest as any)?.status as FetchRequestStatus ?? "pending", liveParsedCount, seenSteps, ), [fetchRequest, liveParsedCount, seenSteps], ); const displayParsedCount = React.useMemo(() => { if (liveParsedCount > 0) return liveParsedCount; const source = (fetchRequest as any)?.source; const persistedCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0; if (persistedCount > 0) return persistedCount; const dicts = source?.txn_dicts; if (Array.isArray(dicts) && dicts.length > 0) return dicts.length; return 0; }, [liveParsedCount, fetchRequest]); const handleRetry = async () => { if (!id) return; try { await updateMutation.mutateAsync({ id, data: { status: "pending" } }); } catch { // handled by react query } }; const handleResolve = async (ambiguity: any, candidate: { amount: number; balance: number }) => { await resolveMutation.mutateAsync({ ambiguityId: ambiguity.id, payload: { chosen: { amount: candidate.amount, balance: candidate.balance } }, }); refetchAmbiguities(); }; if (isLoading) { return ( ); } if (fetchError || !fetchRequest) { return ( Failed to load fetch request ); } const req = fetchRequest as any; const activeStep = computeActiveStep(req.status as FetchRequestStatus, seenSteps); const retryCount = req.retry_count ?? 0; const isRetryExhausted = retryCount >= RETRY_MAX; const pendingAmbiguities = ambiguities?.filter((a: any) => a.status === "pending") ?? []; const resolvedAmbiguities = ambiguities?.filter((a: any) => a.status === "resolved") ?? []; const hasAmbiguities = ambiguities && ambiguities.length > 0; const allResolved = hasAmbiguities && pendingAmbiguities.length === 0; const ambiguitiesLoading = !ambiguities; return ( {req.account_name} Date Range {(req as any).start_date ? new Date((req as any).start_date).toLocaleDateString() : "?"} → {(req as any).end_date ? new Date((req as any).end_date).toLocaleDateString() : "?"} Created {new Date(req.created_at).toLocaleString()} {req.completed_at && ( Completed {new Date(req.completed_at).toLocaleString()} )} Overall Progress {["processing", "paused"].includes(req.status) && displayParsedCount > 0 && ( Validated: {displayParsedCount} transactions )} {progressPercent}% Retries: {retryCount}/{RETRY_MAX} {req.status === "failed" && !isRetryExhausted && ( )} {req.status === "failed" && req.error_message && ( {req.error_message} )} {isRetryExhausted && req.status === "failed" && ( Max retries reached — no further retry attempts will be made. )} Pipeline Progress {stepLabels.map((label, index) => { const isCompleted = index < activeStep; const isActive = index === activeStep; const isPaused = req.status === "paused" && isActive; const isFailed = req.status === "failed" && isActive; let icon: React.ReactNode; if (isCompleted) { icon = ; } else if (isFailed) { icon = ; } else if (isPaused) { icon = ; } else if (isActive) { icon = ; } else { icon = {index + 1}; } const stepMsg = stepMessages[index]; return ( {icon}} > {label} {stepMsg && ( {stepMsg} )} ); })} Progress Events {sseConnected ? "Connected" : "Disconnected"} {displayEvents.length === 0 ? ( Waiting for events... ) : ( displayEvents.map((evt, i) => ( {sseIcon(evt.status)} {evt.step.replace(/_/g, " ")} {evt.message && formatProgressMessage(evt.message) && ( {formatProgressMessage(evt.message)} )} {new Date().toLocaleTimeString()} )) )} {hasAmbiguities && ( Ambiguity Resolution {allResolved ? ( All ambiguities resolved — pipeline will resume on next poll cycle ) : ( Pipeline paused — resolve ambiguities to continue )} {ambiguities.map((ambiguity: any) => { const isResolved = ambiguity.status === "resolved"; return ( {ambiguity.line} OCR Amount ₹{ambiguity.ocr_amount} OCR Balance ₹{ambiguity.ocr_balance} Previous Balance ₹{ambiguity.prev_balance} {isResolved ? ( }> Resolved: ₹{ambiguity.chosen?.amount} / ₹{ambiguity.chosen?.balance} ) : ( {ambiguity.candidates.map((candidate: any, ci: number) => { const isCredit = candidate.amount > 0; const isDebit = candidate.amount < 0; const cColor = isCredit ? "success.main" : isDebit ? "error.main" : undefined; return ( ); })} )} ); })} )} ); }