10 Commits

3 changed files with 252 additions and 55 deletions

View File

@@ -15,6 +15,7 @@ import {
StepIcon,
LinearProgress,
IconButton,
Snackbar,
} from "@mui/material";
import ArrowBackIcon from "@mui/icons-material/ArrowBack";
import ReplayIcon from "@mui/icons-material/Replay";
@@ -22,6 +23,8 @@ import CheckCircleIcon from "@mui/icons-material/CheckCircle";
import ErrorIcon from "@mui/icons-material/Error";
import WarningAmberIcon from "@mui/icons-material/WarningAmber";
import PlayArrowIcon from "@mui/icons-material/PlayArrow";
import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline";
import FiberManualRecordIcon from "@mui/icons-material/FiberManualRecord";
import {
useFetchRequest,
useUpdateFetchRequest,
@@ -31,6 +34,7 @@ import {
import type {
FetchRequestStatus,
SSEEvent,
ProgressMessage,
} from "./features/fetch-requests";
import { RETRY_MAX } from "./features/fetch-requests";
import { useConfig } from "../react-openapi";
@@ -55,26 +59,78 @@ const statusIcons: Record<FetchRequestStatus, React.ReactNode> = {
failed: <ErrorIcon sx={{ fontSize: 16 }} />,
};
const stepLabels = ["Load Content", "Extract", "Enrich", "Save"];
function computeProgressPercent(
status: FetchRequestStatus,
liveCount: number,
seenSteps: Set<string>,
stepStats: Record<string, number>,
txnBlockCount: number,
txnDictCount: number,
): number {
if (status === "pending") return 0;
if (status === "completed") return 100;
function statusToActiveStep(status: FetchRequestStatus): number {
switch (status) {
case "pending": return -1;
case "processing": return 0;
case "paused": return 1;
case "raw_expenses_done": return 2;
case "enriched_done": return 3;
case "completed": return 4;
case "failed": return 0;
default: return -1;
let pct = 0;
if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) pct += 10;
if (txnBlockCount > 0) {
const current = Math.max(liveCount, stepStats.txn_dicts ?? 0);
pct += Math.min(1, current / txnBlockCount) * 20;
}
if (txnDictCount > 0) {
pct += Math.min(1, (stepStats.enrich_count ?? 0) / txnDictCount) * 50;
pct += Math.min(1, (stepStats.save_count ?? 0) / txnDictCount) * 20;
}
return Math.round(Math.min(100, pct));
}
const stepLabels = ["Extract", "Raw Expense", "Enrich", "Save"];
function computeActiveStep(status: FetchRequestStatus, seenSteps: Set<string>): number {
if (status === "completed") return stepLabels.length;
if (seenSteps.has("save_expenses/completed") || seenSteps.has("complete/completed")) return stepLabels.length;
if (seenSteps.has("save_expenses") || seenSteps.has("complete")) return 3;
if (seenSteps.has("enrich/completed")) return 3;
if (seenSteps.has("enrich")) return 2;
if (seenSteps.has("txn_dicts/completed") || status === "raw_expenses_done") return 2;
if (seenSteps.has("txn_dicts")) return 1;
if (seenSteps.has("txn_blocks/completed")) return 1;
if (seenSteps.has("raw_lines") || seenSteps.has("txn_blocks")) return 0;
if (status === "processing" || status === "paused") return 0;
return -1;
}
function formatProgressMessage(msg: ProgressMessage): string {
if (msg.lines !== undefined) return `${msg.lines} lines`;
if (msg.blocks !== undefined) return `${msg.blocks} blocks`;
if (msg.count !== undefined && msg.unit) return `${msg.count} ${msg.unit}`;
if (msg.count !== undefined) return `${msg.count} items`;
if (msg.raw_ocr_line) return `"${msg.raw_ocr_line.slice(0, 60)}${msg.raw_ocr_line.length > 60 ? "…" : ""}"`;
if (msg.error) return msg.error.slice(0, 80);
return "";
}
function sseIcon(status: SSEEvent["status"]) {
switch (status) {
case "started": return <CircularProgress size={14} />;
case "completed": return <CheckCircleIcon sx={{ fontSize: 16, color: "success.main" }} />;
case "failed": return <ErrorIcon sx={{ fontSize: 16, color: "error.main" }} />;
case "skipped": return <RemoveCircleOutlineIcon sx={{ fontSize: 16, color: "text.disabled" }} />;
case "paused": return <WarningAmberIcon sx={{ fontSize: 16, color: "warning.main" }} />;
case "progress": return (
<FiberManualRecordIcon
sx={{ fontSize: 14, color: "info.main" }}
/>
);
}
}
@@ -97,37 +153,45 @@ export default function FetchRequestDetail() {
const resolveMutation = useResolveAmbiguity();
const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!);
const [sseEvents, setSseEvents] = React.useState<SSEEvent[]>([]);
const [sseConnected, setSseConnected] = React.useState(false);
const [liveParsedCount, setLiveParsedCount] = React.useState<number | undefined>(undefined);
const [stepStats, setStepStats] = React.useState<Record<string, number>>({});
const [failNotif, setFailNotif] = React.useState<string | null>(null);
const sseRef = React.useRef<EventSource | null>(null);
const feedRef = React.useRef<HTMLDivElement>(null);
const txnBlockCount = React.useMemo(() => {
const blocks = (fetchRequest as any)?.source?.txn_blocks;
if (!blocks) return 0;
return Object.values(blocks).reduce(
(sum: number, list: any) => sum + (Array.isArray(list) ? list.length : 0),
0,
);
}, [fetchRequest]);
const stepMessages = React.useMemo(() => {
const msgs: Record<number, string> = {};
const source = (fetchRequest as any)?.source;
if (source?.raw_lines?.length)
msgs[0] = `${source.raw_lines.length} raw lines`;
const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0);
if (rawLineCount) msgs[0] = `${rawLineCount}`;
const blocks = source?.txn_blocks ?? {};
const dicts = source?.txn_dicts ?? [];
const blockCount = typeof blocks === "object"
? Object.keys(blocks).length : Array.isArray(blocks) ? blocks.length : 0;
if (blockCount || dicts.length) {
const parts: string[] = [];
if (blockCount) parts.push(`${blockCount} blocks`);
if (dicts.length) parts.push(`${dicts.length} dicts`);
msgs[1] = parts.join(" · ");
}
const sourceDictCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
const dictLive = liveParsedCount ?? stepStats.txn_dicts ?? 0;
const dictCurrent = Math.max(dictLive, sourceDictCount);
if (dictCurrent && txnBlockCount) msgs[1] = `${dictCurrent}/${txnBlockCount}`;
else if (dictCurrent) msgs[1] = `${dictCurrent}`;
if (["enriched_done", "completed"].includes((fetchRequest as any)?.status))
msgs[2] = "done";
if ((fetchRequest as any)?.status === "completed")
msgs[3] = (fetchRequest as any)?.completed_at
? new Date((fetchRequest as any).completed_at).toLocaleString() : "done";
const txnDictDenom = stepStats.txn_dicts ?? sourceDictCount;
if (stepStats.enrich_count && txnDictDenom) msgs[2] = `${stepStats.enrich_count}/${txnDictDenom}`;
else if (stepStats.enrich_count) msgs[2] = `${stepStats.enrich_count}`;
if (stepStats.save_count && txnDictDenom) msgs[3] = `${stepStats.save_count}/${txnDictDenom}`;
else if (stepStats.save_count) msgs[3] = `${stepStats.save_count}`;
return msgs;
}, [fetchRequest]);
const [sseEvents, setSseEvents] = React.useState<SSEEvent[]>([]);
const [sseConnected, setSseConnected] = React.useState(false);
const sseRef = React.useRef<EventSource | null>(null);
const feedRef = React.useRef<HTMLDivElement>(null);
}, [fetchRequest, stepStats, liveParsedCount, txnBlockCount]);
React.useEffect(() => {
if (!id || !config?.baseUrl) return;
@@ -141,10 +205,36 @@ export default function FetchRequestDetail() {
try {
const parsed: SSEEvent = JSON.parse(event.data);
setSseEvents((prev) => [...prev, parsed]);
if (parsed.status === "progress" && parsed.message.count !== undefined) {
if (parsed.step === "txn_dicts") setLiveParsedCount(parsed.message.count);
if (parsed.step === "enrich") setStepStats((prev) => ({ ...prev, enrich_count: parsed.message.count! }));
if (parsed.step === "save_expenses") setStepStats((prev) => ({ ...prev, save_count: parsed.message.count! }));
}
if (parsed.status === "completed" && parsed.message.count !== undefined) {
const stats: Record<string, number> = {};
if (parsed.step === "raw_lines" && parsed.message.lines !== undefined) stats.raw_lines = parsed.message.lines;
if (parsed.step === "txn_blocks" && parsed.message.blocks !== undefined) stats.txn_blocks = parsed.message.blocks;
if (parsed.step === "txn_dicts") stats.txn_dicts = parsed.message.count;
if (parsed.step === "enrich") stats.enrich_count = parsed.message.count;
if (parsed.step === "save_expenses") stats.save_count = parsed.message.count;
if (Object.keys(stats).length) {
setStepStats((prev) => ({ ...prev, ...stats }));
}
}
if (parsed.status === "paused") {
refetchRequest();
refetchAmbiguities();
}
if (parsed.status === "failed") {
setFailNotif(parsed.message.error || "Fetch request failed");
refetchRequest();
}
if (parsed.status === "completed" || parsed.step === "resume_extract") {
refetchRequest();
}
} catch {
// ignore malformed events
}
@@ -162,6 +252,68 @@ export default function FetchRequestDetail() {
}
}, [sseEvents]);
const displayEvents = React.useMemo(() => {
const progressSteps = new Set(["txn_dicts", "enrich", "save_expenses"]);
const lastProgressIdx: Record<string, number> = {};
for (let i = sseEvents.length - 1; i >= 0; i--) {
const e = sseEvents[i];
if (progressSteps.has(e.step) && e.status === "progress" && lastProgressIdx[e.step] === undefined) {
lastProgressIdx[e.step] = i;
}
}
const terminalStatuses = new Set(["completed", "skipped", "paused", "failed"]);
return sseEvents.filter((e, i) => {
if (progressSteps.has(e.step) && e.status === "progress") return i === lastProgressIdx[e.step];
if (e.status === "started") {
return !sseEvents.slice(i + 1).some(
(later) => later.step === e.step && terminalStatuses.has(later.status),
);
}
return true;
});
}, [sseEvents]);
const seenSteps = React.useMemo(() => {
const steps = new Set<string>();
for (const evt of sseEvents) {
steps.add(evt.step);
if (evt.status === "completed") steps.add(`${evt.step}/completed`);
if (evt.status === "failed") steps.add(`${evt.step}/failed`);
if (evt.status === "started") steps.add(`${evt.step}/started`);
if (evt.status === "progress") steps.add(`${evt.step}/progress`);
}
return steps;
}, [sseEvents]);
const displayParsedCount = React.useMemo(() => {
if (liveParsedCount && liveParsedCount > 0) return liveParsedCount;
const source = (fetchRequest as any)?.source;
const persistedCount = source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
if (persistedCount > 0) return persistedCount;
const dicts = source?.txn_dicts;
if (Array.isArray(dicts) && dicts.length > 0) return dicts.length;
return 0;
}, [liveParsedCount, fetchRequest]);
const txnDictCount = React.useMemo(() => {
const source = (fetchRequest as any)?.source;
if (stepStats.txn_dicts && stepStats.txn_dicts > 0) return stepStats.txn_dicts;
return source?.txn_dict_count ?? source?.txn_dicts_count ?? 0;
}, [fetchRequest, stepStats]);
const progressPercent = React.useMemo(
() => computeProgressPercent(
(fetchRequest as any)?.status as FetchRequestStatus ?? "pending",
displayParsedCount,
seenSteps,
stepStats,
txnBlockCount,
txnDictCount,
),
[fetchRequest, displayParsedCount, seenSteps, stepStats, txnBlockCount, txnDictCount],
);
const handleRetry = async () => {
if (!id) return;
try {
@@ -199,7 +351,7 @@ export default function FetchRequestDetail() {
}
const req = fetchRequest as any;
const activeStep = statusToActiveStep(req.status);
const activeStep = computeActiveStep(req.status as FetchRequestStatus, seenSteps);
const retryCount = req.retry_count ?? 0;
const isRetryExhausted = retryCount >= RETRY_MAX;
const pendingAmbiguities = ambiguities?.filter((a: any) => a.status === "pending") ?? [];
@@ -249,6 +401,28 @@ export default function FetchRequestDetail() {
)}
</Box>
<Box sx={{ mb: 2 }}>
<Box sx={{ display: "flex", alignItems: "center", gap: 2, mb: 0.5 }}>
<Typography variant="caption" color="text.secondary">
Overall Progress
</Typography>
{["processing", "paused"].includes(req.status) && displayParsedCount > 0 && (
<Typography variant="caption" fontWeight={600} color="info.main">
Validated: {displayParsedCount} transactions
</Typography>
)}
</Box>
<LinearProgress
variant="determinate"
value={progressPercent}
color={req.status === "failed" ? "error" : req.status === "completed" ? "success" : "primary"}
sx={{ borderRadius: 1, height: 8, transition: "width 0.3s ease" }}
/>
<Typography variant="caption" color="text.secondary" sx={{ mt: 0.25, display: "block" }}>
{progressPercent}%
</Typography>
</Box>
<Box sx={{ display: "flex", alignItems: "center", gap: 2 }}>
<Box sx={{ flex: 1, maxWidth: 300 }}>
<Typography variant="caption" color="text.secondary">
@@ -360,12 +534,12 @@ export default function FetchRequestDetail() {
gap: 1,
}}
>
{sseEvents.length === 0 ? (
{displayEvents.length === 0 ? (
<Typography variant="body2" color="text.disabled" sx={{ textAlign: "center", py: 2 }}>
Waiting for events...
</Typography>
) : (
sseEvents.map((evt, i) => (
displayEvents.map((evt, i) => (
<Box
key={i}
sx={{
@@ -382,9 +556,9 @@ export default function FetchRequestDetail() {
<Typography variant="body2" fontWeight={600}>
{evt.step.replace(/_/g, " ")}
</Typography>
{evt.message && (
{evt.message && formatProgressMessage(evt.message) && (
<Typography variant="caption" color="text.secondary">
{evt.message}
{formatProgressMessage(evt.message)}
</Typography>
)}
</Box>
@@ -397,32 +571,22 @@ export default function FetchRequestDetail() {
</Box>
</Paper>
{(hasAmbiguities || req.status === "paused") && (
{hasAmbiguities && (
<Paper sx={{ p: 3, borderRadius: 4, mb: 3 }} variant="outlined">
<Typography variant="subtitle1" fontWeight={600} gutterBottom>
Ambiguity Resolution
</Typography>
{ambiguitiesLoading ? (
<Box sx={{ display: "flex", alignItems: "center", gap: 1, py: 2 }}>
<CircularProgress size={16} />
<Typography variant="body2" color="text.secondary">Loading ambiguities...</Typography>
</Box>
) : allResolved ? (
{allResolved ? (
<Alert severity="success" sx={{ mb: 2, borderRadius: 2 }}>
All ambiguities resolved pipeline will resume on next poll cycle
</Alert>
) : !hasAmbiguities ? (
<Alert severity="info" sx={{ mb: 2, borderRadius: 2 }}>
Pipeline paused no ambiguities found
</Alert>
) : (
<Alert severity="warning" sx={{ mb: 2, borderRadius: 2 }}>
Pipeline paused resolve ambiguities to continue
</Alert>
)}
{hasAmbiguities && (
<Box sx={{ display: "flex", flexDirection: "column", gap: 2 }}>
{ambiguities.map((ambiguity: any) => {
const isResolved = ambiguity.status === "resolved";
@@ -494,9 +658,18 @@ export default function FetchRequestDetail() {
);
})}
</Box>
)}
</Paper>
)}
<Snackbar
open={!!failNotif}
autoHideDuration={6000}
onClose={() => setFailNotif(null)}
anchorOrigin={{ vertical: "bottom", horizontal: "center" }}
>
<Alert severity="error" onClose={() => setFailNotif(null)} sx={{ borderRadius: 2 }}>
{failNotif}
</Alert>
</Snackbar>
</Container>
);
}

View File

@@ -13,6 +13,8 @@ export interface FileSource {
raw_lines?: string[];
txn_blocks?: Record<string, any>;
txn_dicts?: Record<string, any>[];
txn_dict_count?: number;
txn_dicts_count?: number;
}
export interface EmailSource {
@@ -20,6 +22,8 @@ export interface EmailSource {
from_email?: string;
subject?: string;
raw_terms?: string[];
txn_dict_count?: number;
txn_dicts_count?: number;
}
export interface FetchRequestCreate {
@@ -80,10 +84,27 @@ export interface ResolveAmbiguityPayload {
};
}
export type SSEEventStep =
| "load_content" | "raw_lines" | "txn_blocks" | "txn_dicts"
| "resume_extract" | "extract" | "paused" | "complete" | "enrich"
| "save_expenses" | "pipeline";
export type SSEEventStatus =
| "started" | "completed" | "skipped" | "paused" | "progress" | "failed";
export interface ProgressMessage {
lines?: number;
blocks?: number;
count?: number;
unit?: string;
raw_ocr_line?: string;
error?: string;
}
export interface SSEEvent {
step: string;
status: "started" | "completed" | "paused";
message: string;
step: SSEEventStep;
status: SSEEventStatus;
message: ProgressMessage;
}
export interface FetchRequestFilters {

View File

@@ -11,6 +11,9 @@ export type {
AmbiguityCandidate,
ResolveAmbiguityPayload,
SSEEvent,
SSEEventStep,
SSEEventStatus,
ProgressMessage,
} from "./fetch-requests.models";
export { RETRY_MAX } from "./fetch-requests.models";
export {