dagpipe
dagpipe
Summary
Directed acyclic graph execution framework for deterministic state propagation.
dagpipe executes pipelines composed of nodes connected in a directed acyclic
graph (DAG). Each node receives an immutable State and optionally produces
derived states for downstream nodes.
Installation
Install using pip:
Quick Start
Public API
This package re-exports the core pipeline components. Consumers should import from this namespace for standard usage.
Execution Core
Engine: Responsible for orchestrating node execution and state propagation.Graph: Defines the execution topology and node relationships.Node: Base class for defining execution logic and transformations.
State & Data
State: Represents an immutable execution snapshot at a point in time.Payload: Immutable hierarchical container for execution data.Schema: Defines and validates the allowed structure of payloads.SchemaError: Raised when data violates the declared schema.
Declarative Pipelines
Pipeline: High-level wrapper for an engine, state type, and initial payload.load_pipeline: Factory function to create a pipeline from YAML.
Classes
Engine
Execution engine responsible for running pipeline logic.
Notes
Responsibilities:
1 2 3 4 | |
Guarantees:
1 2 3 4 5 | |
Initialize the execution engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodes_or_graph |
Sequence[Node] | Graph
|
Pipeline definition. May be a linear sequence or a DAG. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If input is not a |
Notes
Responsibilities:
1 2 | |
Attributes
nodes
property
Return nodes managed by this engine.
Returns:
| Type | Description |
|---|---|
tuple[Node, ...]
|
tuple[Node, ...]: Ordered sequence in linear mode or all nodes in graph mode. |
Functions
__repr__
Return the canonical string representation of the object.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Representation that uniquely identifies the object and its configuration. |
run
Execute the pipeline starting from a root State.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
root |
State
|
Initial execution state. |
required |
Returns:
| Type | Description |
|---|---|
List[State]
|
list[State]: Terminal execution states produced by the pipeline. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
RuntimeError
|
If the engine execution mode is invalid. |
Notes
Responsibilities:
1 2 | |
Graph
Directed Acyclic Graph defining execution topology of Node objects.
Notes
Responsibilities:
1 2 | |
Guarantees:
1 2 | |
Create an empty Graph.
Initializes node registry and edge mappings.
Functions
add_edge
Add a directed edge from src to dst.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
Node
|
Source node. |
required |
dst |
Node
|
Destination node. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If the edge would create a cycle or if |
Notes
- Validates node types.
- Prevents cycles.
- Registers nodes if not present.
- Updates parent and child mappings.
add_root
Add a root node with no parents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node |
Node
|
Node to add as a root. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If node is not a Node instance. |
children
nodes
Return all nodes in the graph.
Returns:
| Type | Description |
|---|---|
Tuple[Node, ...]
|
Tuple[Node, ...]: All registered nodes. |
parents
Node
Bases: ABC
Base class for all dagpipe execution nodes.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
str
|
Unique identifier of the node (snake_case dotted format). |
name |
str
|
Human-readable display name. |
Notes
Responsibilities:
1 2 3 | |
Guarantees:
1 2 | |
Functions
__new__
Create or reuse singleton instance of the Node subclass.
Returns:
| Name | Type | Description |
|---|---|---|
Node |
Node
|
Singleton instance of the subclass. |
clean_id_and_name
classmethod
Normalize and validate node ID and display name.
Raises:
| Type | Description |
|---|---|
TypeError
|
If ID is not a string. |
ValueError
|
If ID format is invalid. |
Notes
Guarantees:
1 2 3 | |
fork
Create a child State attributed to this node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
State
|
Parent execution state. |
required |
payload_update |
Mapping[str, Any]
|
Dot-path payload updates. |
None
|
confidence_delta |
float
|
Confidence adjustment. |
0.0
|
metadata_update |
Mapping[str, Any]
|
Metadata updates. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
State |
State
|
New child execution state. |
Notes
Responsibilities:
1 2 | |
node_id_to_name
staticmethod
Convert a dotted snake_case node ID into a human-readable name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_id |
str
|
Unique node identifier (e.g., 'entity.resolve.numeric_merchant'). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Human-readable display name (e.g., 'Entity › Resolve › Numeric Merchant'). |
resolve
abstractmethod
Execute node logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
State
|
Input execution state. |
required |
Yields:
| Name | Type | Description |
|---|---|---|
State |
Iterable[State]
|
Derived execution state(s). |
Notes
Responsibilities:
1 2 3 4 | |
run
Payload
dataclass
Immutable hierarchical container with dot-path access.
Attributes:
| Name | Type | Description |
|---|---|---|
_data |
Mapping[str, Any]
|
Immutable hierarchical data structure. |
Notes
Responsibilities:
1 2 3 | |
Functions
as_dict
Return underlying mapping.
Returns:
| Type | Description |
|---|---|
Mapping[str, Any]
|
Mapping[str, Any]: Read-only view of the underlying data. |
get
Retrieve value using dot-path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
str
|
Dot-separated path to the value. |
required |
default |
Any
|
Default value if path doesn't exist. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The retrieved value or default. |
has
Return True if path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
str
|
Dot-separated path to check. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Existence of the path. |
iter_paths
classmethod
Recursively yield dot-paths for all leaf nodes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
Mapping[str, Any]
|
The mapping to iterate over. |
required |
prefix |
str
|
Current path prefix. |
''
|
Returns:
| Type | Description |
|---|---|
Iterable[str]
|
Iterable[str]: Generator yielding dot-paths. |
keys
Return top-level keys.
Returns:
| Type | Description |
|---|---|
Iterable[str]
|
Iterable[str]: Iterator over top-level keys. |
update
Create a new Payload with dot-path updates applied.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates |
Mapping[str, Any]
|
Dot-path to value mapping. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Payload |
Payload
|
New immutable payload instance with updates. |
Notes
Guarantees:
1 2 | |
Pipeline
dataclass
Executable pipeline created from YAML configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
engine |
Engine
|
Execution engine responsible for running the pipeline. |
state_cls |
Type[State]
|
Dynamically created |
initial_payload |
Payload
|
Default payload used when execution begins. |
Notes
Responsibilities:
1 2 3 | |
Functions
run
Execute the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload_override |
Mapping[str, Any]
|
Payload values overriding initial payload. |
None
|
Returns:
| Type | Description |
|---|---|
list[State]
|
list[State]: Terminal execution states. |
Notes
Responsibilities:
1 2 | |
Schema
dataclass
Immutable hierarchical schema defining allowed payload structure.
Attributes:
| Name | Type | Description |
|---|---|---|
tree |
Mapping[str, SchemaNode]
|
Hierarchical schema definition. |
Notes
Responsibilities:
1 2 3 | |
Functions
validate_payload
Validate complete payload structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload |
Payload
|
Payload to validate. |
required |
Raises:
| Type | Description |
|---|---|
SchemaError
|
If payload violates schema. |
validate_update
Validate payload update paths.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates |
Mapping[str, Any]
|
Dot-path updates to validate. |
required |
Raises:
| Type | Description |
|---|---|
SchemaError
|
If any path is invalid according to the schema. |
SchemaError
Bases: Exception
Raised when payload data violates the declared schema.
Indicates invalid structure, invalid path, or invalid type.
State
dataclass
Immutable execution state propagated through dagpipe pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
payload |
Payload
|
Execution data container. |
schema |
ClassVar[Schema]
|
Payload validation schema. |
confidence |
float
|
Execution confidence score. |
parent |
Optional[State]
|
Parent state reference. |
depth |
int
|
Execution depth. |
history |
Tuple[str, ...]
|
Ordered node execution lineage. |
metadata |
Dict[str, Any]
|
Execution metadata. |
Notes
Responsibilities:
1 2 3 4 | |
Functions
__repr__
Concise debug representation.
Avoids printing full data for large states.
fork
Create a new child State derived from this state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload_update |
Mapping[str, Any]
|
Dot-path updates applied to the payload. |
None
|
confidence_delta |
float
|
Adjustment applied to current confidence. |
0.0
|
node_id |
str
|
Identifier of the node creating this state. |
None
|
metadata_update |
Mapping[str, Any]
|
Updates merged into state metadata. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
State |
State
|
A new immutable |
Notes
Guarantees:
1 2 3 | |
get
Retrieve payload value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key |
str
|
Dot-path key. |
required |
default |
Any
|
Fallback value. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
Stored value or default. |
has
Check whether payload contains key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key |
str
|
Dot-path key. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Existence of the key. |
Functions
load_pipeline
Load pipeline from YAML file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
str
|
Path to YAML configuration file. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
Pipeline
|
Executable pipeline instance. |
Notes
Responsibilities:
1 2 3 | |