richer sses

This commit is contained in:
2026-05-30 03:02:23 +05:30
parent baffd11a49
commit cb7f20181e
3 changed files with 163 additions and 36 deletions

View File

@@ -23,6 +23,7 @@ import ErrorIcon from "@mui/icons-material/Error";
import WarningAmberIcon from "@mui/icons-material/WarningAmber"; import WarningAmberIcon from "@mui/icons-material/WarningAmber";
import PlayArrowIcon from "@mui/icons-material/PlayArrow"; import PlayArrowIcon from "@mui/icons-material/PlayArrow";
import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline"; import RemoveCircleOutlineIcon from "@mui/icons-material/RemoveCircleOutline";
import FiberManualRecordIcon from "@mui/icons-material/FiberManualRecord";
import { import {
useFetchRequest, useFetchRequest,
useUpdateFetchRequest, useUpdateFetchRequest,
@@ -32,6 +33,7 @@ import {
import type { import type {
FetchRequestStatus, FetchRequestStatus,
SSEEvent, SSEEvent,
ProgressMessage,
} from "./features/fetch-requests"; } from "./features/fetch-requests";
import { RETRY_MAX } from "./features/fetch-requests"; import { RETRY_MAX } from "./features/fetch-requests";
import { useConfig } from "../react-openapi"; import { useConfig } from "../react-openapi";
@@ -56,6 +58,29 @@ const statusIcons: Record<FetchRequestStatus, React.ReactNode> = {
failed: <ErrorIcon sx={{ fontSize: 16 }} />, failed: <ErrorIcon sx={{ fontSize: 16 }} />,
}; };
function computeProgressPercent(
status: FetchRequestStatus,
liveCount: number,
seenSteps: Set<string>,
): number {
if (status === "pending") return 0;
if (status === "completed") return 100;
if (seenSteps.has("complete")) return 90;
if (seenSteps.has("enrich")) return 85;
if (seenSteps.has("txn_dicts/completed")) return 80;
if (seenSteps.has("txn_dicts/started") && liveCount > 0) {
return Math.min(80, 35 + Math.min(liveCount, 300) / 300 * 45);
}
if (seenSteps.has("txn_blocks")) return 35;
if (seenSteps.has("raw_lines")) return 25;
if (seenSteps.has("load_content")) return 15;
return status === "processing" || status === "paused" ? 10 : 0;
}
const stepLabels = ["Load Content", "Extract", "Validate", "Enrich", "Save"]; const stepLabels = ["Load Content", "Extract", "Validate", "Enrich", "Save"];
function statusToActiveStep(status: FetchRequestStatus): number { function statusToActiveStep(status: FetchRequestStatus): number {
@@ -71,12 +96,26 @@ function statusToActiveStep(status: FetchRequestStatus): number {
} }
} }
function formatProgressMessage(msg: ProgressMessage): string {
if (msg.lines !== undefined) return `${msg.lines} lines`;
if (msg.blocks !== undefined) return `${msg.blocks} blocks`;
if (msg.count !== undefined && msg.unit) return `${msg.count} ${msg.unit}`;
if (msg.count !== undefined) return `${msg.count} items`;
if (msg.raw_ocr_line) return `"${msg.raw_ocr_line.slice(0, 60)}${msg.raw_ocr_line.length > 60 ? "…" : ""}"`;
return "";
}
function sseIcon(status: SSEEvent["status"]) { function sseIcon(status: SSEEvent["status"]) {
switch (status) { switch (status) {
case "started": return <CircularProgress size={14} />; case "started": return <CircularProgress size={14} />;
case "completed": return <CheckCircleIcon sx={{ fontSize: 16, color: "success.main" }} />; case "completed": return <CheckCircleIcon sx={{ fontSize: 16, color: "success.main" }} />;
case "skipped": return <RemoveCircleOutlineIcon sx={{ fontSize: 16, color: "text.disabled" }} />; case "skipped": return <RemoveCircleOutlineIcon sx={{ fontSize: 16, color: "text.disabled" }} />;
case "paused": return <WarningAmberIcon sx={{ fontSize: 16, color: "warning.main" }} />; case "paused": return <WarningAmberIcon sx={{ fontSize: 16, color: "warning.main" }} />;
case "progress": return (
<FiberManualRecordIcon
sx={{ fontSize: 14, color: "info.main" }}
/>
);
} }
} }
@@ -99,21 +138,25 @@ export default function FetchRequestDetail() {
const resolveMutation = useResolveAmbiguity(); const resolveMutation = useResolveAmbiguity();
const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!); const { data: ambiguities, refetch: refetchAmbiguities } = useFetchRequestAmbiguities(id!);
const [sseEvents, setSseEvents] = React.useState<SSEEvent[]>([]);
const [sseConnected, setSseConnected] = React.useState(false);
const [liveParsedCount, setLiveParsedCount] = React.useState<number>(0);
const [stepStats, setStepStats] = React.useState<Record<string, number>>({});
const sseRef = React.useRef<EventSource | null>(null);
const feedRef = React.useRef<HTMLDivElement>(null);
const stepMessages = React.useMemo(() => { const stepMessages = React.useMemo(() => {
const msgs: Record<number, string> = {}; const msgs: Record<number, string> = {};
const source = (fetchRequest as any)?.source; const source = (fetchRequest as any)?.source;
if (source?.raw_lines?.length) const rawLineCount = stepStats.raw_lines ?? (source?.raw_lines?.length ?? 0);
msgs[0] = `${source.raw_lines.length} raw lines`; if (rawLineCount) msgs[0] = `${rawLineCount} raw lines`;
const blocks = source?.txn_blocks ?? {}; const blockCount = stepStats.txn_blocks ?? 0;
const dicts = source?.txn_dicts ?? []; if (blockCount) msgs[1] = `${blockCount} blocks`;
const blockCount = typeof blocks === "object"
? Object.keys(blocks).length : Array.isArray(blocks) ? blocks.length : 0; const dictCount = stepStats.txn_dicts ?? liveParsedCount ?? 0;
if (blockCount) if (dictCount) msgs[2] = `${dictCount} dicts`;
msgs[1] = `${blockCount} blocks`;
if (dicts.length)
msgs[2] = `${dicts.length} dicts`;
if (["enriched_done", "completed"].includes((fetchRequest as any)?.status)) if (["enriched_done", "completed"].includes((fetchRequest as any)?.status))
msgs[3] = "done"; msgs[3] = "done";
@@ -122,12 +165,7 @@ export default function FetchRequestDetail() {
? new Date((fetchRequest as any).completed_at).toLocaleString() : "done"; ? new Date((fetchRequest as any).completed_at).toLocaleString() : "done";
return msgs; return msgs;
}, [fetchRequest]); }, [fetchRequest, stepStats, liveParsedCount]);
const [sseEvents, setSseEvents] = React.useState<SSEEvent[]>([]);
const [sseConnected, setSseConnected] = React.useState(false);
const sseRef = React.useRef<EventSource | null>(null);
const feedRef = React.useRef<HTMLDivElement>(null);
React.useEffect(() => { React.useEffect(() => {
if (!id || !config?.baseUrl) return; if (!id || !config?.baseUrl) return;
@@ -141,6 +179,21 @@ export default function FetchRequestDetail() {
try { try {
const parsed: SSEEvent = JSON.parse(event.data); const parsed: SSEEvent = JSON.parse(event.data);
setSseEvents((prev) => [...prev, parsed]); setSseEvents((prev) => [...prev, parsed]);
if (parsed.step === "txn_dicts" && parsed.status === "progress" && parsed.message.count !== undefined) {
setLiveParsedCount(parsed.message.count);
}
if (parsed.status === "completed") {
const stats: Record<string, number> = {};
if (parsed.step === "raw_lines" && parsed.message.lines !== undefined) stats.raw_lines = parsed.message.lines;
if (parsed.step === "txn_blocks" && parsed.message.blocks !== undefined) stats.txn_blocks = parsed.message.blocks;
if (parsed.step === "txn_dicts" && parsed.message.count !== undefined) stats.txn_dicts = parsed.message.count;
if (Object.keys(stats).length) {
setStepStats((prev) => ({ ...prev, ...stats }));
}
}
if (parsed.status === "paused") { if (parsed.status === "paused") {
refetchRequest(); refetchRequest();
refetchAmbiguities(); refetchAmbiguities();
@@ -165,6 +218,49 @@ export default function FetchRequestDetail() {
} }
}, [sseEvents]); }, [sseEvents]);
const displayEvents = React.useMemo(() => {
let lastProgressIdx = -1;
for (let i = sseEvents.length - 1; i >= 0; i--) {
if (sseEvents[i].step === "txn_dicts" && sseEvents[i].status === "progress") {
lastProgressIdx = i;
break;
}
}
return sseEvents.filter((e, i) => {
if (e.step === "txn_dicts" && e.status === "progress") return i === lastProgressIdx;
return true;
});
}, [sseEvents]);
const seenSteps = React.useMemo(() => {
const steps = new Set<string>();
for (const evt of sseEvents) {
steps.add(evt.step);
if (evt.status === "completed") steps.add(`${evt.step}/completed`);
if (evt.status === "started") steps.add(`${evt.step}/started`);
if (evt.status === "progress") steps.add(`${evt.step}/progress`);
}
return steps;
}, [sseEvents]);
const progressPercent = React.useMemo(
() => computeProgressPercent(
(fetchRequest as any)?.status as FetchRequestStatus ?? "pending",
liveParsedCount,
seenSteps,
),
[fetchRequest, liveParsedCount, seenSteps],
);
const displayParsedCount = React.useMemo(() => {
if (liveParsedCount > 0) return liveParsedCount;
const source = (fetchRequest as any)?.source;
if (source?.txn_dicts_count) return source.txn_dicts_count;
const dicts = source?.txn_dicts;
if (Array.isArray(dicts) && dicts.length > 0) return dicts.length;
return 0;
}, [liveParsedCount, fetchRequest]);
const handleRetry = async () => { const handleRetry = async () => {
if (!id) return; if (!id) return;
try { try {
@@ -252,6 +348,28 @@ export default function FetchRequestDetail() {
)} )}
</Box> </Box>
<Box sx={{ mb: 2 }}>
<Box sx={{ display: "flex", alignItems: "center", gap: 2, mb: 0.5 }}>
<Typography variant="caption" color="text.secondary">
Overall Progress
</Typography>
{["processing", "paused"].includes(req.status) && displayParsedCount > 0 && (
<Typography variant="caption" fontWeight={600} color="info.main">
Validated: {displayParsedCount} transactions
</Typography>
)}
</Box>
<LinearProgress
variant="determinate"
value={progressPercent}
color={req.status === "failed" ? "error" : req.status === "completed" ? "success" : "primary"}
sx={{ borderRadius: 1, height: 8, transition: "width 0.3s ease" }}
/>
<Typography variant="caption" color="text.secondary" sx={{ mt: 0.25, display: "block" }}>
{progressPercent}%
</Typography>
</Box>
<Box sx={{ display: "flex", alignItems: "center", gap: 2 }}> <Box sx={{ display: "flex", alignItems: "center", gap: 2 }}>
<Box sx={{ flex: 1, maxWidth: 300 }}> <Box sx={{ flex: 1, maxWidth: 300 }}>
<Typography variant="caption" color="text.secondary"> <Typography variant="caption" color="text.secondary">
@@ -363,12 +481,12 @@ export default function FetchRequestDetail() {
gap: 1, gap: 1,
}} }}
> >
{sseEvents.length === 0 ? ( {displayEvents.length === 0 ? (
<Typography variant="body2" color="text.disabled" sx={{ textAlign: "center", py: 2 }}> <Typography variant="body2" color="text.disabled" sx={{ textAlign: "center", py: 2 }}>
Waiting for events... Waiting for events...
</Typography> </Typography>
) : ( ) : (
sseEvents.map((evt, i) => ( displayEvents.map((evt, i) => (
<Box <Box
key={i} key={i}
sx={{ sx={{
@@ -385,9 +503,9 @@ export default function FetchRequestDetail() {
<Typography variant="body2" fontWeight={600}> <Typography variant="body2" fontWeight={600}>
{evt.step.replace(/_/g, " ")} {evt.step.replace(/_/g, " ")}
</Typography> </Typography>
{evt.message && ( {evt.message && formatProgressMessage(evt.message) && (
<Typography variant="caption" color="text.secondary"> <Typography variant="caption" color="text.secondary">
{evt.message} {formatProgressMessage(evt.message)}
</Typography> </Typography>
)} )}
</Box> </Box>
@@ -400,32 +518,22 @@ export default function FetchRequestDetail() {
</Box> </Box>
</Paper> </Paper>
{(hasAmbiguities || req.status === "paused") && ( {hasAmbiguities && (
<Paper sx={{ p: 3, borderRadius: 4, mb: 3 }} variant="outlined"> <Paper sx={{ p: 3, borderRadius: 4, mb: 3 }} variant="outlined">
<Typography variant="subtitle1" fontWeight={600} gutterBottom> <Typography variant="subtitle1" fontWeight={600} gutterBottom>
Ambiguity Resolution Ambiguity Resolution
</Typography> </Typography>
{ambiguitiesLoading ? ( {allResolved ? (
<Box sx={{ display: "flex", alignItems: "center", gap: 1, py: 2 }}>
<CircularProgress size={16} />
<Typography variant="body2" color="text.secondary">Loading ambiguities...</Typography>
</Box>
) : allResolved ? (
<Alert severity="success" sx={{ mb: 2, borderRadius: 2 }}> <Alert severity="success" sx={{ mb: 2, borderRadius: 2 }}>
All ambiguities resolved pipeline will resume on next poll cycle All ambiguities resolved pipeline will resume on next poll cycle
</Alert> </Alert>
) : !hasAmbiguities ? (
<Alert severity="info" sx={{ mb: 2, borderRadius: 2 }}>
Pipeline paused no ambiguities found
</Alert>
) : ( ) : (
<Alert severity="warning" sx={{ mb: 2, borderRadius: 2 }}> <Alert severity="warning" sx={{ mb: 2, borderRadius: 2 }}>
Pipeline paused resolve ambiguities to continue Pipeline paused resolve ambiguities to continue
</Alert> </Alert>
)} )}
{hasAmbiguities && (
<Box sx={{ display: "flex", flexDirection: "column", gap: 2 }}> <Box sx={{ display: "flex", flexDirection: "column", gap: 2 }}>
{ambiguities.map((ambiguity: any) => { {ambiguities.map((ambiguity: any) => {
const isResolved = ambiguity.status === "resolved"; const isResolved = ambiguity.status === "resolved";
@@ -497,7 +605,6 @@ export default function FetchRequestDetail() {
); );
})} })}
</Box> </Box>
)}
</Paper> </Paper>
)} )}
</Container> </Container>

View File

@@ -13,6 +13,7 @@ export interface FileSource {
raw_lines?: string[]; raw_lines?: string[];
txn_blocks?: Record<string, any>; txn_blocks?: Record<string, any>;
txn_dicts?: Record<string, any>[]; txn_dicts?: Record<string, any>[];
txn_dicts_count?: number;
} }
export interface EmailSource { export interface EmailSource {
@@ -20,6 +21,7 @@ export interface EmailSource {
from_email?: string; from_email?: string;
subject?: string; subject?: string;
raw_terms?: string[]; raw_terms?: string[];
txn_dicts_count?: number;
} }
export interface FetchRequestCreate { export interface FetchRequestCreate {
@@ -80,10 +82,25 @@ export interface ResolveAmbiguityPayload {
}; };
} }
export type SSEEventStep =
| "load_content" | "raw_lines" | "txn_blocks" | "txn_dicts"
| "resume_extract" | "extract" | "paused" | "complete" | "enrich";
export type SSEEventStatus =
| "started" | "completed" | "skipped" | "paused" | "progress";
export interface ProgressMessage {
lines?: number;
blocks?: number;
count?: number;
unit?: string;
raw_ocr_line?: string;
}
export interface SSEEvent { export interface SSEEvent {
step: string; step: SSEEventStep;
status: "started" | "completed" | "skipped" | "paused"; status: SSEEventStatus;
message: string; message: ProgressMessage;
} }
export interface FetchRequestFilters { export interface FetchRequestFilters {

View File

@@ -11,6 +11,9 @@ export type {
AmbiguityCandidate, AmbiguityCandidate,
ResolveAmbiguityPayload, ResolveAmbiguityPayload,
SSEEvent, SSEEvent,
SSEEventStep,
SSEEventStatus,
ProgressMessage,
} from "./fetch-requests.models"; } from "./fetch-requests.models";
export { RETRY_MAX } from "./fetch-requests.models"; export { RETRY_MAX } from "./fetch-requests.models";
export { export {