Skip to content

dagpipe

dagpipe

Directed acyclic graph execution framework for deterministic state propagation.


Summary

dagpipe executes pipelines composed of nodes connected in a directed acyclic graph (DAG). Each node receives an immutable state and optionally produces derived states for downstream nodes.


Installation

Install using pip:

1
pip install dagpipe

Quick Start

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from dagpipe import State, Payload, Schema, Graph, Engine
from dagpipe.node import Node

class HelloNode(Node):
    id = "hello"
    def resolve(self, state):
        yield self.fork(state, payload_update={"msg": "hello"})

# Build and run
graph = Graph()
graph.add_root(HelloNode())
engine = Engine(graph)

class MyState(State):
    schema = Schema({})

results = engine.run(MyState(payload=Payload({})))

Public API

This package re-exports the core pipeline components. Consumers should import from this namespace for standard usage.

Execution Core: - Engine - Graph - Node

State & Data: - State / MyState (custom base) - Payload - Schema / SchemaError

Declarative Pipelines: - Pipeline - load_pipeline


Classes

Functions