10 Commits

3 changed files with 252 additions and 55 deletions

View File

@@ -15,6 +15,7 @@ import {
StepIcon, StepIcon,
LinearProgress, LinearProgress,
IconButton, IconButton,
Snackbar,
} from "@mui/material"; } from "@mui/material";
import ArrowBackIcon from "@mui/icons-material/ArrowBack"; import ArrowBackIcon from "@mui/icons-material/ArrowBack";
import ReplayIcon from "@mui/icons-material/Replay"; import ReplayIcon from "@mui/icons-material/Replay";
@@ -22,6 +23,8 @@ import CheckCircleIcon from "@mui/icons-material/CheckCircle";
import ErrorIcon from "@mui/icons-material/Error"; import ErrorIcon from "@mui/icons-material/Error";
import WarningAmberIcon from "@mui/icons-material/WarningAmber"; import WarningAmberIcon from "@mui/icons-material/WarningAmber";
import PlayArrowIcon from "@mui/icons-material/PlayArrow"; import PlayArrowIcon from "@mui/icons-material/PlayArrow";
import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline";
import FiberManualRecordIcon from "@mui/icons-material/FiberManualRecord";
import { import {
useFetchRequest, useFetchRequest,
useUpdateFetchRequest, useUpdateFetchRequest,
@@ -31,6 +34,7 @@ import {
import type { import type {
FetchRequestStatus, FetchRequestStatus,
SSEEvent, SSEEvent,
ProgressMessage,
} from "./features/fetch-requests"; } from "./features/fetch-requests";
import { RETRY_MAX } from "./features/fetch-requests"; import { RETRY_MAX } from "./features/fetch-requests";
import { useConfig } from "../react-openapi"; import { useConfig } from "../react-openapi";
@@ -55,26 +59,78 @@ const statusIcons: Record<FetchRequestStatus, React.ReactNode> = {
failed: <ErrorIcon sx={{ fontSize: 16 }} />, failed: <ErrorIcon sx={{ fontSize: 16 }} />,
}; };
const stepLabels = ["Load Content", "Extract", "Enrich", "Save"]; function computeProgressPercent(
status: FetchRequestStatus,
liveCount: number,
seenSteps: Set<string>,
stepStats: Record<string, number>,
txnBlockCount: number,
txnDictCount: number,
): number {
if (status === "pending") return 0;
if (status === "completed") return 100;
function statusToActiveStep(status: FetchRequestStatus): number { let pct = 0;
switch (status) {
case "pending": return -1; if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) pct += 10;
case "processing": return 0;
case "paused": return 1; if (txnBlockCount > 0) {
case "raw_expenses_done": return 2; const current = Math.max(liveCount, stepStats.txn_dicts ?? 0);
case "enriched_done": return 3; pct += Math.min(1, current / txnBlockCount) * 20;
case "completed": return 4;
case "failed": return 0;
default: return -1;
} }
if (txnDictCount > 0) {
pct += Math.min(1, (stepStats.enrich_count ?? 0) / txnDictCount) * 50;
pct += Math.min(1, (stepStats.save_count ?? 0) / txnDictCount) * 20;
}
return Math.round(Math.min(100, pct));
}
const stepLabels = ["Extract", "Raw Expense", "Enrich", "Save"];
function computeActiveStep(status: FetchRequestStatus, seenSteps: Set<string>): number {
if (status === "completed") return stepLabels.length;
if (seenSteps.has("save_expenses/completed") || seenSteps.has("complete/completed")) return stepLabels.length;
if (seenSteps.has("save_expenses") || seenSteps.has("complete")) return 3;
if (seenSteps.has("enrich/completed")) return 3;
if (seenSteps.has("enrich")) return 2;
if (seenSteps.has("txn_dicts/completed") || status === "raw_expenses_done") return 2;
if (seenSteps.has("txn_dicts")) return 1;
if (seenSteps.has("txn_blocks/completed")) return 1;
if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) return 0;
if (status === "processing" || status === "paused") return 0;
return -1;
}
function formatProgressMessage(msg: ProgressMessage): string {
if (msg.lines !== undefined) return `${msg.lines} lines`;
if (msg.blocks !== undefined) return `${msg.blocks} blocks`;
if (msg.count !== undefined && msg.unit) return `${msg.count} ${msg.unit}`;
if (msg.count !== undefined) return `${msg.count} items`;
if (msg.raw_ocr_line) return `"${msg.raw_ocr_line.slice(0, 60)}${msg.raw_ocr_line.length > 60 ? "…" : ""}"`;
if (msg.error) return msg.error.slice(0, 80);
return "";
} }
function sseIcon(status: SSEEvent["status"]) { function sseIcon(status: SSEEvent["status"]) {
switch (status) { switch (status) {
case "started": return <CircularProgress size={14} />; case "started": return <CircularProgress size={14} />;
case "completed": return <CheckCircleIcon sx={{ fontSize: 16, color: "success.main" }} />; case "completed": return <CheckCircleIcon sx={{ fontSize: 16, color: "success.main" }} />;
case "failed": return <ErrorIcon sx={{ fontSize: 16, color: "error.main" }} />;
case "skipped": return <RemoveCircleOutlineIcon sx={{ fontSize: 16, color: "text.disabled" }} />;
case "paused": return <WarningAmberIcon sx={{ fontSize: 16, color: "warning.main" }} />; case "paused": return <WarningAmberIcon sx={{ fontSize: 16, color: "warning.main" }} />;
case "progress": return (
<FiberManualRecordIcon
sx={{ fontSize: 14, color: "info.main" }}
/>
);
} }
} }
@@ -97,37 +153,45 @@ export default function FetchRequestDetail() {
const resolveMutation = useResolveAmbiguity(); const resolveMutation = useResolveAmbiguity();
const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!); const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!);
const [sseEvents, setSseEvents] = React.useState<SSEEvent[]>([]);
const [sseConnected, setSseConnected] = React.useState(false);
const [liveParsedCount, setLiveParsedCount] = React.useState<number | undefined>(undefined);
const [stepStats, setStepStats] = React.useState<Record<string, number>>({});
const [failNotif, setFailNotif] = React.useState<string | null>(null);
const sseRef = React.useRef<EventSource | null>(null);
const feedRef = React.useRef<HTMLDivElement>(null);
const txnBlockCount = React.useMemo(() => {
const blocks = (fetchRequest as any)?.source?.txn_blocks;
if (!blocks) return 0;
return Object.values(blocks).reduce(
(sum: number, list: any) => sum + (Array.isArray(list) ? list.length : 0),
0,
);
}, [fetchRequest]);
const stepMessages = React.useMemo(() => { const stepMessages = React.useMemo(() => {
const msgs: Record<number, string> = {}; const msgs: Record<number, string> = {};
const source = (fetchRequest as any)?.source; const source = (fetchRequest as any)?.source;
if (source?.raw_lines?.length) const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0);
msgs[0] = `${source.raw_lines.length} raw lines`; if (rawLineCount) msgs[0] = `${rawLineCount}`;
const blocks = source?.txn_blocks ?? {}; const sourceDictCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
const dicts = source?.txn_dicts ?? []; const dictLive = liveParsedCount ?? stepStats.txn_dicts ?? 0;
const blockCount = typeof blocks === "object" const dictCurrent = Math.max(dictLive, sourceDictCount);
? Object.keys(blocks).length : Array.isArray(blocks) ? blocks.length : 0; if (dictCurrent && txnBlockCount) msgs[1] = `${dictCurrent}/${txnBlockCount}`;
if (blockCount || dicts.length) { else if (dictCurrent) msgs[1] = `${dictCurrent}`;
const parts: string[] = [];
if (blockCount) parts.push(`${blockCount} blocks`);
if (dicts.length) parts.push(`${dicts.length} dicts`);
msgs[1] = parts.join(" · ");
}
if (["enriched_done", "completed"].includes((fetchRequest as any)?.status)) const txnDictDenom = stepStats.txn_dicts ?? sourceDictCount;
msgs[2] = "done"; if (stepStats.enrich_count && txnDictDenom) msgs[2] = `${stepStats.enrich_count}/${txnDictDenom}`;
if ((fetchRequest as any)?.status === "completed") else if (stepStats.enrich_count) msgs[2] = `${stepStats.enrich_count}`;
msgs[3] = (fetchRequest as any)?.completed_at
? new Date((fetchRequest as any).completed_at).toLocaleString() : "done"; if (stepStats.save_count && txnDictDenom) msgs[3] = `${stepStats.save_count}/${txnDictDenom}`;
else if (stepStats.save_count) msgs[3] = `${stepStats.save_count}`;
return msgs; return msgs;
}, [fetchRequest]); }, [fetchRequest, stepStats, liveParsedCount, txnBlockCount]);
const [sseEvents, setSseEvents] = React.useState<SSEEvent[]>([]);
const [sseConnected, setSseConnected] = React.useState(false);
const sseRef = React.useRef<EventSource | null>(null);
const feedRef = React.useRef<HTMLDivElement>(null);
React.useEffect(() => { React.useEffect(() => {
if (!id || !config?.baseUrl) return; if (!id || !config?.baseUrl) return;
@@ -141,10 +205,36 @@ export default function FetchRequestDetail() {
try { try {
const parsed: SSEEvent = JSON.parse(event.data); const parsed: SSEEvent = JSON.parse(event.data);
setSseEvents((prev) => [...prev, parsed]); setSseEvents((prev) => [...prev, parsed]);
if (parsed.status === "progress" && parsed.message.count !== undefined) {
if (parsed.step === "txn_dicts") setLiveParsedCount(parsed.message.count);
if (parsed.step === "enrich") setStepStats((prev) => ({ ...prev, enrich_count: parsed.message.count! }));
if (parsed.step === "save_expenses") setStepStats((prev) => ({ ...prev, save_count: parsed.message.count! }));
}
if (parsed.status === "completed" && parsed.message.count !== undefined) {
const stats: Record<string, number> = {};
if (parsed.step === "raw_lines" && parsed.message.lines !== undefined) stats.raw_lines = parsed.message.lines;
if (parsed.step === "txn_blocks" && parsed.message.blocks !== undefined) stats.txn_blocks = parsed.message.blocks;
if (parsed.step === "txn_dicts") stats.txn_dicts = parsed.message.count;
if (parsed.step === "enrich") stats.enrich_count = parsed.message.count;
if (parsed.step === "save_expenses") stats.save_count = parsed.message.count;
if (Object.keys(stats).length) {
setStepStats((prev) => ({ ...prev, ...stats }));
}
}
if (parsed.status === "paused") { if (parsed.status === "paused") {
refetchRequest(); refetchRequest();
refetchAmbiguities(); refetchAmbiguities();
} }
if (parsed.status === "failed") {
setFailNotif(parsed.message.error || "Fetch request failed");
refetchRequest();
}
if (parsed.status === "completed" || parsed.step === "resume_extract") {
refetchRequest();
}
} catch { } catch {
// ignore malformed events // ignore malformed events
} }
@@ -162,6 +252,68 @@ export default function FetchRequestDetail() {
} }
}, [sseEvents]); }, [sseEvents]);
const displayEvents = React.useMemo(() => {
const progressSteps = new Set(["txn_dicts", "enrich", "save_expenses"]);
const lastProgressIdx: Record<string, number> = {};
for (let i = sseEvents.length - 1; i >= 0; i--) {
const e = sseEvents[i];
if (progressSteps.has(e.step) && e.status === "progress" && lastProgressIdx[e.step] === undefined) {
lastProgressIdx[e.step] = i;
}
}
const terminalStatuses = new Set(["completed", "skipped", "paused", "failed"]);
return sseEvents.filter((e, i) => {
if (progressSteps.has(e.step) && e.status === "progress") return i === lastProgressIdx[e.step];
if (e.status === "started") {
return !sseEvents.slice(i + 1).some(
(later) => later.step === e.step && terminalStatuses.has(later.status),
);
}
return true;
});
}, [sseEvents]);
const seenSteps = React.useMemo(() => {
const steps = new Set<string>();
for (const evt of sseEvents) {
steps.add(evt.step);
if (evt.status === "completed") steps.add(`${evt.step}/completed`);
if (evt.status === "failed") steps.add(`${evt.step}/failed`);
if (evt.status === "started") steps.add(`${evt.step}/started`);
if (evt.status === "progress") steps.add(`${evt.step}/progress`);
}
return steps;
}, [sseEvents]);
const displayParsedCount = React.useMemo(() => {
if (liveParsedCount && liveParsedCount > 0) return liveParsedCount;
const source = (fetchRequest as any)?.source;
const persistedCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
if (persistedCount > 0) return persistedCount;
const dicts = source?.txn_dicts;
if (Array.isArray(dicts) && dicts.length > 0) return dicts.length;
return 0;
}, [liveParsedCount, fetchRequest]);
const txnDictCount = React.useMemo(() => {
const source = (fetchRequest as any)?.source;
if (stepStats.txn_dicts && stepStats.txn_dicts > 0) return stepStats.txn_dicts;
return source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
}, [fetchRequest, stepStats]);
const progressPercent = React.useMemo(
() => computeProgressPercent(
(fetchRequest as any)?.status as FetchRequestStatus ?? "pending",
displayParsedCount,
seenSteps,
stepStats,
txnBlockCount,
txnDictCount,
),
[fetchRequest, displayParsedCount, seenSteps, stepStats, txnBlockCount, txnDictCount],
);
const handleRetry = async () => { const handleRetry = async () => {
if (!id) return; if (!id) return;
try { try {
@@ -199,7 +351,7 @@ export default function FetchRequestDetail() {
} }
const req = fetchRequest as any; const req = fetchRequest as any;
const activeStep = statusToActiveStep(req.status); const activeStep = computeActiveStep(req.status as FetchRequestStatus, seenSteps);
const retryCount = req.retry_count ?? 0; const retryCount = req.retry_count ?? 0;
const isRetryExhausted = retryCount >= RETRY_MAX; const isRetryExhausted = retryCount >= RETRY_MAX;
const pendingAmbiguities = ambiguities?.filter((a: any) => a.status === "pending") ?? []; const pendingAmbiguities = ambiguities?.filter((a: any) => a.status === "pending") ?? [];
@@ -249,6 +401,28 @@ export default function FetchRequestDetail() {
)} )}
</Box> </Box>
<Box sx={{ mb: 2 }}>
<Box sx={{ display: "flex", alignItems: "center", gap: 2, mb: 0.5 }}>
<Typography variant="caption" color="text.secondary">
Overall Progress
</Typography>
{["processing", "paused"].includes(req.status) && displayParsedCount > 0 && (
<Typography variant="caption" fontWeight={600} color="info.main">
Validated: {displayParsedCount} transactions
</Typography>
)}
</Box>
<LinearProgress
variant="determinate"
value={progressPercent}
color={req.status === "failed" ? "error" : req.status === "completed" ? "success" : "primary"}
sx={{ borderRadius: 1, height: 8, transition: "width 0.3s ease" }}
/>
<Typography variant="caption" color="text.secondary" sx={{ mt: 0.25, display: "block" }}>
{progressPercent}%
</Typography>
</Box>
<Box sx={{ display: "flex", alignItems: "center", gap: 2 }}> <Box sx={{ display: "flex", alignItems: "center", gap: 2 }}>
<Box sx={{ flex: 1, maxWidth: 300 }}> <Box sx={{ flex: 1, maxWidth: 300 }}>
<Typography variant="caption" color="text.secondary"> <Typography variant="caption" color="text.secondary">
@@ -360,12 +534,12 @@ export default function FetchRequestDetail() {
gap: 1, gap: 1,
}} }}
> >
{sseEvents.length === 0 ? ( {displayEvents.length === 0 ? (
<Typography variant="body2" color="text.disabled" sx={{ textAlign: "center", py: 2 }}> <Typography variant="body2" color="text.disabled" sx={{ textAlign: "center", py: 2 }}>
Waiting for events... Waiting for events...
</Typography> </Typography>
) : ( ) : (
sseEvents.map((evt, i) => ( displayEvents.map((evt, i) => (
<Box <Box
key={i} key={i}
sx={{ sx={{
@@ -382,9 +556,9 @@ export default function FetchRequestDetail() {
<Typography variant="body2" fontWeight={600}> <Typography variant="body2" fontWeight={600}>
{evt.step.replace(/_/g, " ")} {evt.step.replace(/_/g, " ")}
</Typography> </Typography>
{evt.message && ( {evt.message && formatProgressMessage(evt.message) && (
<Typography variant="caption" color="text.secondary"> <Typography variant="caption" color="text.secondary">
{evt.message} {formatProgressMessage(evt.message)}
</Typography> </Typography>
)} )}
</Box> </Box>
@@ -397,32 +571,22 @@ export default function FetchRequestDetail() {
</Box> </Box>
</Paper> </Paper>
{(hasAmbiguities || req.status === "paused") && ( {hasAmbiguities && (
<Paper sx={{ p: 3, borderRadius: 4, mb: 3 }} variant="outlined"> <Paper sx={{ p: 3, borderRadius: 4, mb: 3 }} variant="outlined">
<Typography variant="subtitle1" fontWeight={600} gutterBottom> <Typography variant="subtitle1" fontWeight={600} gutterBottom>
Ambiguity Resolution Ambiguity Resolution
</Typography> </Typography>
{ambiguitiesLoading ? ( {allResolved ? (
<Box sx={{ display: "flex", alignItems: "center", gap: 1, py: 2 }}>
<CircularProgress size={16} />
<Typography variant="body2" color="text.secondary">Loading ambiguities...</Typography>
</Box>
) : allResolved ? (
<Alert severity="success" sx={{ mb: 2, borderRadius: 2 }}> <Alert severity="success" sx={{ mb: 2, borderRadius: 2 }}>
All ambiguities resolved pipeline will resume on next poll cycle All ambiguities resolved pipeline will resume on next poll cycle
</Alert> </Alert>
) : !hasAmbiguities ? (
<Alert severity="info" sx={{ mb: 2, borderRadius: 2 }}>
Pipeline paused no ambiguities found
</Alert>
) : ( ) : (
<Alert severity="warning" sx={{ mb: 2, borderRadius: 2 }}> <Alert severity="warning" sx={{ mb: 2, borderRadius: 2 }}>
Pipeline paused resolve ambiguities to continue Pipeline paused resolve ambiguities to continue
</Alert> </Alert>
)} )}
{hasAmbiguities && (
<Box sx={{ display: "flex", flexDirection: "column", gap: 2 }}> <Box sx={{ display: "flex", flexDirection: "column", gap: 2 }}>
{ambiguities.map((ambiguity: any) => { {ambiguities.map((ambiguity: any) => {
const isResolved = ambiguity.status === "resolved"; const isResolved = ambiguity.status === "resolved";
@@ -494,9 +658,18 @@ export default function FetchRequestDetail() {
); );
})} })}
</Box> </Box>
)}
</Paper> </Paper>
)} )}
<Snackbar
open={!!failNotif}
autoHideDuration={6000}
onClose={() => setFailNotif(null)}
anchorOrigin={{ vertical: "bottom", horizontal: "center" }}
>
<Alert severity="error" onClose={() => setFailNotif(null)} sx={{ borderRadius: 2 }}>
{failNotif}
</Alert>
</Snackbar>
</Container> </Container>
); );
} }

View File

@@ -13,6 +13,8 @@ export interface FileSource {
raw_lines?: string[]; raw_lines?: string[];
txn_blocks?: Record<string, any>; txn_blocks?: Record<string, any>;
txn_dicts?: Record<string, any>[]; txn_dicts?: Record<string, any>[];
txn_dict_count?: number;
txn_dicts_count?: number;
} }
export interface EmailSource { export interface EmailSource {
@@ -20,6 +22,8 @@ export interface EmailSource {
from_email?: string; from_email?: string;
subject?: string; subject?: string;
raw_terms?: string[]; raw_terms?: string[];
txn_dict_count?: number;
txn_dicts_count?: number;
} }
export interface FetchRequestCreate { export interface FetchRequestCreate {
@@ -80,10 +84,27 @@ export interface ResolveAmbiguityPayload {
}; };
} }
export type SSEEventStep =
| "load_content" | "raw_lines" | "txn_blocks" | "txn_dicts"
| "resume_extract" | "extract" | "paused" | "complete" | "enrich"
| "save_expenses" | "pipeline";
export type SSEEventStatus =
| "started" | "completed" | "skipped" | "paused" | "progress" | "failed";
export interface ProgressMessage {
lines?: number;
blocks?: number;
count?: number;
unit?: string;
raw_ocr_line?: string;
error?: string;
}
export interface SSEEvent { export interface SSEEvent {
step: string; step: SSEEventStep;
status: "started" | "completed" | "paused"; status: SSEEventStatus;
message: string; message: ProgressMessage;
} }
export interface FetchRequestFilters { export interface FetchRequestFilters {

View File

@@ -11,6 +11,9 @@ export type {
AmbiguityCandidate, AmbiguityCandidate,
ResolveAmbiguityPayload, ResolveAmbiguityPayload,
SSEEvent, SSEEvent,
SSEEventStep,
SSEEventStatus,
ProgressMessage,
} from "./fetch-requests.models"; } from "./fetch-requests.models";
export { RETRY_MAX } from "./fetch-requests.models"; export { RETRY_MAX } from "./fetch-requests.models";
export { export {