Skip to content

dagpipe

dagpipe

Summary

Directed acyclic graph execution framework for deterministic state propagation.

dagpipe executes pipelines composed of nodes connected in a directed acyclic graph (DAG). Each node receives an immutable State and optionally produces derived states for downstream nodes.

Installation

Install using pip:

pip install dagpipe

Quick Start

from dagpipe import State, Payload, Schema, Graph, Engine
from dagpipe.node import Node

class HelloNode(Node):
    id = "hello"
    def resolve(self, state):
        yield self.fork(state, payload_update={"msg": "hello"})

# Build and run
graph = Graph()
graph.add_root(HelloNode())
engine = Engine(graph)

class MyState(State):
    schema = Schema({})

results = engine.run(MyState(payload=Payload({})))

Public API

This package re-exports the core pipeline components. Consumers should import from this namespace for standard usage.

Execution Core

  • Engine: Responsible for orchestrating node execution and state propagation.
  • Graph: Defines the execution topology and node relationships.
  • Node: Base class for defining execution logic and transformations.

State & Data

  • State: Represents an immutable execution snapshot at a point in time.
  • Payload: Immutable hierarchical container for execution data.
  • Schema: Defines and validates the allowed structure of payloads.
  • SchemaError: Raised when data violates the declared schema.

Declarative Pipelines

  • Pipeline: High-level wrapper for an engine, state type, and initial payload.
  • load_pipeline: Factory function to create a pipeline from YAML.

Classes

Engine

Engine(nodes_or_graph: Union[Sequence[Node], Graph])

Execution engine responsible for running pipeline logic.

Notes

Responsibilities:

1
2
3
4
- Accepts either a linear sequence of `Node` objects or a `Graph`
  defining execution topology.
- Propagates immutable `State` objects through `Node` objects and
  collects terminal states.

Guarantees:

1
2
3
4
5
- Never mutates `State`, `Node`, or `Graph` instances.
- `State` objects are never modified in place; each branch produces
  independent instances.
- Execution order is deterministic and follows graph or pipeline topology.
- Thread-safe for concurrent execution.

Initialize the execution engine.

Parameters:

Name Type Description Default
nodes_or_graph Sequence[Node] | Graph

Pipeline definition. May be a linear sequence or a DAG.

required

Raises:

Type Description
TypeError

If input is not a Sequence[Node] or Graph, or contains invalid node types.

Notes

Responsibilities:

1
2
- Detects execution mode (linear or DAG) and validates node
  types and structure.
Attributes
nodes property
nodes: tuple[Node, ...]

Return nodes managed by this engine.

Returns:

Type Description
tuple[Node, ...]

tuple[Node, ...]: Ordered sequence in linear mode or all nodes in graph mode.

Functions
__repr__
__repr__() -> str

Return the canonical string representation of the object.

Returns:

Name Type Description
str str

Representation that uniquely identifies the object and its configuration.

run
run(root: State) -> List[State]

Execute the pipeline starting from a root State.

Parameters:

Name Type Description Default
root State

Initial execution state.

required

Returns:

Type Description
List[State]

list[State]: Terminal execution states produced by the pipeline.

Raises:

Type Description
TypeError

If root is not a State instance.

RuntimeError

If the engine execution mode is invalid.

Notes

Responsibilities:

1
2
- Selects execution mode, propagates state through nodes, creates
  new instances for branches, and collects terminal states.

Graph

Graph()

Directed Acyclic Graph defining execution topology of Node objects.

Notes

Responsibilities:

1
2
- Stores node connectivity and validates that the topology remains acyclic.
- Structure determines how `State` flows between nodes during execution.

Guarantees:

1
2
- Topology is acyclic. Node relationships remain consistent.
- Thread-safe for concurrent reads after construction.

Create an empty Graph.

Initializes node registry and edge mappings.
Functions
__repr__
__repr__() -> str

Return debug representation.

Returns

str

add_edge
add_edge(src: Node, dst: Node) -> None

Add a directed edge from src to dst.

Parameters:

Name Type Description Default
src Node

Source node.

required
dst Node

Destination node.

required

Raises:

Type Description
TypeError

If src or dst is not a Node.

ValueError

If the edge would create a cycle or if src and dst are common.

Notes
  • Validates node types.
  • Prevents cycles.
  • Registers nodes if not present.
  • Updates parent and child mappings.
add_root
add_root(node: Node) -> None

Add a root node with no parents.

Parameters:

Name Type Description Default
node Node

Node to add as a root.

required

Raises:

Type Description
TypeError

If node is not a Node instance.

children
children(node: Node) -> Tuple[Node, ...]

Return child nodes of a node.

Parameters:

Name Type Description Default
node Node

Node to query.

required

Returns:

Type Description
Tuple[Node, ...]

Tuple[Node, ...]: Outgoing neighbors.

nodes
nodes() -> Tuple[Node, ...]

Return all nodes in the graph.

Returns:

Type Description
Tuple[Node, ...]

Tuple[Node, ...]: All registered nodes.

parents
parents(node: Node) -> Tuple[Node, ...]

Return parent nodes of a node.

Parameters:

Name Type Description Default
node Node

Node to query.

required

Returns:

Type Description
Tuple[Node, ...]

Tuple[Node, ...]: Incoming neighbors.

roots
roots() -> Tuple[Node, ...]

Return root nodes (nodes with no incoming edges).

Returns:

Type Description
Tuple[Node, ...]

Tuple[Node, ...]: Entry point nodes.

Node

Bases: ABC

Base class for all dagpipe execution nodes.

Attributes:

Name Type Description
id str

Unique identifier of the node (snake_case dotted format).

name str

Human-readable display name.

Notes

Responsibilities:

1
2
3
- Represents a deterministic unit of execution in the pipeline graph.
- Consumes one `State` and produces zero, one, or many derived states.
- Defines execution logic and enables branching, filtering, and transformation.

Guarantees:

1
2
- Nodes must never mutate the input `State`.
- Instances are singletons per subclass and reused across executions.
Functions
__hash__
__hash__() -> int

Return stable hash based on node ID.

Returns

int

__new__
__new__() -> Node

Create or reuse singleton instance of the Node subclass.

Returns:

Name Type Description
Node Node

Singleton instance of the subclass.

__repr__
__repr__() -> str

Return debug representation.

Returns

str

__str__
__str__() -> str

Return display representation.

Returns

str

clean_id_and_name classmethod
clean_id_and_name() -> None

Normalize and validate node ID and display name.

Raises:

Type Description
TypeError

If ID is not a string.

ValueError

If ID format is invalid.

Notes

Guarantees:

1
2
3
- Generates ID from module and class name if missing.
- Validates ID format.
- Generates human-readable name if missing.
fork
fork(state: State, *, payload_update=None, confidence_delta=0.0, metadata_update=None) -> State

Create a child State attributed to this node.

Parameters:

Name Type Description Default
state State

Parent execution state.

required
payload_update Mapping[str, Any]

Dot-path payload updates.

None
confidence_delta float

Confidence adjustment.

0.0
metadata_update Mapping[str, Any]

Metadata updates.

None

Returns:

Name Type Description
State State

New child execution state.

Notes

Responsibilities:

1
2
- Convenience wrapper around `State.fork()` that automatically
  records this node's ID in state history.
node_id_to_name staticmethod
node_id_to_name(node_id: str) -> str

Convert a dotted snake_case node ID into a human-readable name.

Parameters:

Name Type Description Default
node_id str

Unique node identifier (e.g., 'entity.resolve.numeric_merchant').

required

Returns:

Name Type Description
str str

Human-readable display name (e.g., 'Entity › Resolve › Numeric Merchant').

resolve abstractmethod
resolve(state: State) -> Iterable[State]

Execute node logic.

Parameters:

Name Type Description Default
state State

Input execution state.

required

Yields:

Name Type Description
State Iterable[State]

Derived execution state(s).

Notes

Responsibilities:

1
2
3
4
- Subclasses implement specific resolution behavior.
- Must not mutate input state.
- Should use `fork()` to create child states.
- May yield zero states to terminate a branch.
run
run(state: State) -> tuple[State, ...]

Execute this node on a State.

Parameters:

Name Type Description Default
state State

Input execution state.

required

Returns:

Type Description
tuple[State, ...]

tuple[State, ...]: Derived execution states.

Raises:

Type Description
TypeError

If resolve() yields a non-State object.

Payload dataclass

Payload(_data: Mapping[str, Any])

Immutable hierarchical container with dot-path access.

Attributes:

Name Type Description
_data Mapping[str, Any]

Immutable hierarchical data structure.

Notes

Responsibilities:

1
2
3
- Stores execution data used by `State`.
- Supports efficient atomic updates without modifying existing instances.
- `Payload` instances are fully thread-safe due to immutability.
Functions
as_dict
as_dict() -> Mapping[str, Any]

Return underlying mapping.

Returns:

Type Description
Mapping[str, Any]

Mapping[str, Any]: Read-only view of the underlying data.

get
get(path: str, default: Any = None) -> Any

Retrieve value using dot-path.

Parameters:

Name Type Description Default
path str

Dot-separated path to the value.

required
default Any

Default value if path doesn't exist.

None

Returns:

Name Type Description
Any Any

The retrieved value or default.

has
has(path: str) -> bool

Return True if path exists.

Parameters:

Name Type Description Default
path str

Dot-separated path to check.

required

Returns:

Name Type Description
bool bool

Existence of the path.

iter_paths classmethod
iter_paths(data: Mapping[str, Any], prefix: str = '') -> Iterable[str]

Recursively yield dot-paths for all leaf nodes.

Parameters:

Name Type Description Default
data Mapping[str, Any]

The mapping to iterate over.

required
prefix str

Current path prefix.

''

Returns:

Type Description
Iterable[str]

Iterable[str]: Generator yielding dot-paths.

keys
keys() -> Iterable[str]

Return top-level keys.

Returns:

Type Description
Iterable[str]

Iterable[str]: Iterator over top-level keys.

update
update(updates: Mapping[str, Any]) -> Payload

Create a new Payload with dot-path updates applied.

Parameters:

Name Type Description Default
updates Mapping[str, Any]

Dot-path to value mapping.

required

Returns:

Name Type Description
Payload Payload

New immutable payload instance with updates.

Notes

Guarantees:

1
2
- Preserves existing data by copying only modified branches.
- Returns a new immutable `Payload`.

Pipeline dataclass

Pipeline(engine: Engine, state_cls: Type[State], initial_payload: Payload)

Executable pipeline created from YAML configuration.

Attributes:

Name Type Description
engine Engine

Execution engine responsible for running the pipeline.

state_cls Type[State]

Dynamically created State subclass with configured schema.

initial_payload Payload

Default payload used when execution begins.

Notes

Responsibilities:

1
2
3
- Encapsulates engine, state type, and initial payload.
- Provides a simplified interface for executing configured pipelines.
- Safe for concurrent execution if underlying nodes are thread-safe.
Functions
run
run(payload_override: Optional[Mapping[str, Any]] = None) -> list[State]

Execute the pipeline.

Parameters:

Name Type Description Default
payload_override Mapping[str, Any]

Payload values overriding initial payload.

None

Returns:

Type Description
list[State]

list[State]: Terminal execution states.

Notes

Responsibilities:

1
2
- Merges override payload with initial payload.
- Creates root `State` and executes engine.

Schema dataclass

Schema(tree: Mapping[str, SchemaNode])

Immutable hierarchical schema defining allowed payload structure.

Attributes:

Name Type Description
tree Mapping[str, SchemaNode]

Hierarchical schema definition.

Notes

Responsibilities:

1
2
3
- Validates `State` payloads and updates.
- Reusable across all `State` instances.
- Fully thread-safe due to immutability.
Functions
validate_payload
validate_payload(payload: Payload) -> None

Validate complete payload structure.

Parameters:

Name Type Description Default
payload Payload

Payload to validate.

required

Raises:

Type Description
SchemaError

If payload violates schema.

validate_update
validate_update(updates: Mapping[str, Any]) -> None

Validate payload update paths.

Parameters:

Name Type Description Default
updates Mapping[str, Any]

Dot-path updates to validate.

required

Raises:

Type Description
SchemaError

If any path is invalid according to the schema.

SchemaError

Bases: Exception

Raised when payload data violates the declared schema.

Indicates invalid structure, invalid path, or invalid type.

State dataclass

State(payload: Payload, confidence: float = 1.0, parent: Optional[State] = None, depth: int = 0, history: Tuple[str, ...] = tuple(), metadata: Dict[str, Any] = dict())

Immutable execution state propagated through dagpipe pipeline.

Attributes:

Name Type Description
payload Payload

Execution data container.

schema ClassVar[Schema]

Payload validation schema.

confidence float

Execution confidence score.

parent Optional[State]

Parent state reference.

depth int

Execution depth.

history Tuple[str, ...]

Ordered node execution lineage.

metadata Dict[str, Any]

Execution metadata.

Notes

Responsibilities:

1
2
3
4
- Represents a complete execution snapshot at a specific point in
  pipeline traversal.
- Fundamental unit of execution in `dagpipe`.
- Fully thread-safe due to immutability.
Functions
__repr__
__repr__() -> str

Concise debug representation.

Avoids printing full data for large states.

fork
fork(*, payload_update: Optional[Mapping[str, Any]] = None, confidence_delta: float = 0.0, node_id: Optional[str] = None, metadata_update: Optional[Mapping[str, Any]] = None) -> State

Create a new child State derived from this state.

Parameters:

Name Type Description Default
payload_update Mapping[str, Any]

Dot-path updates applied to the payload.

None
confidence_delta float

Adjustment applied to current confidence.

0.0
node_id str

Identifier of the node creating this state.

None
metadata_update Mapping[str, Any]

Updates merged into state metadata.

None

Returns:

Name Type Description
State State

A new immutable State instance.

Notes

Guarantees:

1
2
3
- This is the only supported mechanism for modifying execution data.
- Validates payload updates, preserves lineage, increments depth,
  and appends to history.
get
get(key: str, default: Any = None) -> Any

Retrieve payload value.

Parameters:

Name Type Description Default
key str

Dot-path key.

required
default Any

Fallback value.

None

Returns:

Name Type Description
Any Any

Stored value or default.

has
has(key: str) -> bool

Check whether payload contains key.

Parameters:

Name Type Description Default
key str

Dot-path key.

required

Returns:

Name Type Description
bool bool

Existence of the key.

lineage
lineage() -> Tuple[State, ...]

Return lineage from root to this State.

Returns:

Type Description
Tuple[State, ...]

Tuple[State, ...]: Ordered execution lineage (root first).

Functions

load_pipeline

load_pipeline(path: str) -> Pipeline

Load pipeline from YAML file.

Parameters:

Name Type Description Default
path str

Path to YAML configuration file.

required

Returns:

Name Type Description
Pipeline Pipeline

Executable pipeline instance.

Notes

Responsibilities:

1
2
3
- Loads YAML configuration and builds schema.
- Creates `State` subclass and loads `Node` instances.
- Builds `Graph` topology and initializes `Engine`.