INSTALLATION

pip install highway-dsl
from highway_dsl import WorkflowBuilder

workflow = (WorkflowBuilder("my_workflow", version="2.0.0")
    .task("task_1", "module.function", args=["arg1"])
    .task("task_2", "module.function2", dependencies=["task_1"])
    .build()
)

# Serialize to YAML
yaml_output = workflow.to_yaml()

# Serialize to JSON
json_output = workflow.to_json()

SPECIFICATION OVERVIEW

Highway DSL v2.0.0 is a Long-Term Support (LTS) specification with guaranteed backward compatibility for the 2.x series. The specification defines:

WORKFLOW STRUCTURE

  • name: string (required)
  • version: string (default: "2.0.0")
  • description: string (optional)
  • start_task: string (required)
  • tasks: dict[str, Operator]
  • variables: dict[str, Any]

TYPE SYSTEM

  • Pydantic v2 data validation
  • Full Python type hints
  • Strict schema validation
  • Round-trip serialization
  • YAML/JSON interchange

TEMPLATE SYNTAX

  • Jinja2-style: {{variable}}
  • Result references: {{task.result}}
  • Item iteration: {{item.field}}
  • Nested access: {{obj.nested.field}}

VERSIONING POLICY

v2.0.0 LTS (Current)

• No breaking changes in 2.x series

• Backward-compatible additions only

• Semantic versioning: 2.MINOR.PATCH

• Production stability guarantee

CORE CONCEPTS

OPERATOR

Base unit of workflow definition. All operators inherit from BaseOperator with common fields:

task_id: Unique identifier within workflow

operator_type: Discriminator for operator polymorphism

dependencies: List of task_ids that must complete first

metadata: Arbitrary key-value pairs for extensibility

DEPENDENCY GRAPH

Workflows are directed acyclic graphs (DAGs) where:

• Nodes = Operators (tasks, conditions, loops, etc.)

• Edges = Dependencies (execution order constraints)

• Cycles are prohibited (validation enforced)

• Topological sort determines execution order

EXECUTION SEMANTICS

Highway DSL specification defines declarative workflow structure, not imperative execution logic. Execution engines must implement:

• Dependency resolution and topological ordering

• Template variable interpolation at runtime

• Operator-specific execution handlers

• State management and persistence

OPERATOR SPECIFICATION

Highway DSL v2.0.0 defines 10 operator types for comprehensive workflow modeling:

OPERATOR TYPE DESCRIPTION REQUIRED FIELDS
TaskOperator task Basic execution unit that calls a function function, args/kwargs
ConditionOperator condition Binary branch based on condition expression condition, if_true, if_false
ParallelOperator parallel Concurrent execution of multiple branches branches (dict of branch_name: [operators])
ForEachOperator foreach Iterate over collection with loop body items, loop_body
WhileOperator while Conditional loop with condition check condition, loop_body
WaitOperator wait Delay execution for specified duration wait_for (ISO 8601 duration)
EmitEventOperator emit_event Publish event to event bus for coordination event_name
WaitForEventOperator wait_for_event Block until event is received event_name, timeout (optional)
SwitchOperator switch Multi-way branch with case matching switch_on, cases, default (optional)
JoinOperator join Explicit coordination with multiple join modes join_on, join_mode (ALL_OF/ANY_OF/ALL_SUCCESS/ONE_SUCCESS)

TaskOperator - Function Execution

from highway_dsl import TaskOperator, RetryPolicy, TimeoutPolicy
from datetime import timedelta

task = TaskOperator(
    task_id="process_data",
    function="data.processor.transform",
    args=["{{input_file}}"],
    kwargs={"mode": "batch"},
    result_key="processed_data",
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=timedelta(seconds=30),
        backoff_factor=2.0
    ),
    timeout_policy=TimeoutPolicy(
        timeout=timedelta(minutes=10),
        kill_on_timeout=True
    )
)

ConditionOperator - Branching Logic

builder.condition(
    "data_quality_check",
    condition="{{quality_score}} > 0.95",
    if_true=lambda b: b.task("advanced_process", "proc.advanced"),
    if_false=lambda b: b.task("basic_process", "proc.basic")
)

ParallelOperator - Concurrent Execution

builder.parallel(
    "process_all_data",
    branches={
        "images": lambda b: b.task("process_img", "img.process"),
        "videos": lambda b: b.task("process_vid", "vid.process"),
        "audio": lambda b: b.task("process_aud", "aud.process")
    }
)

ForEachOperator - Collection Iteration

builder.foreach(
    "process_items",
    items="{{data.records}}",
    loop_body=lambda b: (
        b.task("validate", "validate.record", args=["{{item}}"])
         .task("transform", "transform.record", args=["{{item}}"])
         .task("store", "store.record", args=["{{item}}"])
    )
)

WhileOperator - Conditional Loop

builder.while_loop(
    "retry_until_success",
    condition="{{result.status}} == 'failed'",
    loop_body=lambda b: (
        b.task("attempt", "processor.attempt")
         .task("check", "processor.check_result", result_key="result")
    )
)

EmitEvent / WaitForEvent - Event Coordination

# Branch 1: Emit event after processing
builder.task("process", "processor.run")
       .emit_event("emit_ready", event_name="processing_complete")

# Branch 2: Wait for event before continuing
builder.wait_for_event(
    "wait_ready",
    event_name="processing_complete",
    timeout=timedelta(hours=1)
)
       .task("downstream", "processor.downstream")

SwitchOperator - Multi-Way Branch

builder.switch(
    "route_by_priority",
    switch_on="{{task.priority}}",
    cases={
        "critical": "handle_critical",
        "high": "handle_high",
        "medium": "handle_medium",
        "low": "handle_low"
    },
    default="handle_default"
)

JoinOperator - Explicit Coordination

from highway_dsl import JoinMode

# Wait for all tasks to complete (regardless of success/failure)
builder.join(
    "wait_all",
    join_on=["task_1", "task_2", "task_3"],
    join_mode=JoinMode.ALL_OF
)

# Wait for all tasks to succeed (fail if any fails)
builder.join(
    "wait_all_success",
    join_on=["task_1", "task_2", "task_3"],
    join_mode=JoinMode.ALL_SUCCESS
)

# Wait for any one task to complete
builder.join(
    "wait_any",
    join_on=["task_1", "task_2", "task_3"],
    join_mode=JoinMode.ANY_OF
)

# Wait for first successful completion
builder.join(
    "wait_first_success",
    join_on=["task_1", "task_2", "task_3"],
    join_mode=JoinMode.ONE_SUCCESS
)

COMPLETE WORKFLOW EXAMPLES

Example 1: Data Processing Pipeline

from highway_dsl import WorkflowBuilder
from datetime import timedelta

workflow = (
    WorkflowBuilder("data_pipeline", version="2.0.0")

    # Extract data from multiple sources in parallel
    .parallel(
        "extract_data",
        branches={
            "database": lambda b: b.task("extract_db", "extract.from_db"),
            "api": lambda b: b.task("extract_api", "extract.from_api"),
            "files": lambda b: b.task("extract_files", "extract.from_files")
        }
    )

    # Wait for all extractions to complete successfully
    .join(
        "wait_extractions",
        join_on=["extract_db", "extract_api", "extract_files"],
        join_mode="ALL_SUCCESS"
    )

    # Merge and transform
    .task("merge", "transform.merge_sources", result_key="merged_data")
    .task("validate", "transform.validate", args=["{{merged_data}}"])

    # Branch based on data quality
    .condition(
        "quality_check",
        condition="{{validate.score}} >= 0.95",
        if_true=lambda b: b.task("load_prod", "load.to_production"),
        if_false=lambda b: b.task("load_staging", "load.to_staging")
    )

    .build()
)

Example 2: Event-Driven Workflow

workflow = (
    WorkflowBuilder("event_driven", version="2.0.0")

    .parallel(
        "process_branches",
        branches={
            "producer": lambda b: (
                b.task("generate", "producer.generate_data")
                 .emit_event("notify", event_name="data_ready")
            ),
            "consumer": lambda b: (
                b.wait_for_event("wait", event_name="data_ready",
                                 timeout=timedelta(minutes=5))
                 .task("process", "consumer.process_data")
            )
        }
    )

    .build()
)

YAML Serialization Output

name: data_pipeline
version: "2.0.0"
description: ""
start_task: extract_data
tasks:
  extract_data:
    task_id: extract_data
    operator_type: parallel
    branches:
      database:
        - task_id: extract_db
          operator_type: task
          function: extract.from_db
      api:
        - task_id: extract_api
          operator_type: task
          function: extract.from_api
  wait_extractions:
    task_id: wait_extractions
    operator_type: join
    join_on: [extract_db, extract_api, extract_files]
    join_mode: ALL_SUCCESS
    dependencies: [extract_data]
  merge:
    task_id: merge
    operator_type: task
    function: transform.merge_sources
    result_key: merged_data
    dependencies: [wait_extractions]
variables: {}

TECHNICAL SPECIFICATIONS

SERIALIZATION

  • Format: YAML 1.2 / JSON
  • Encoding: UTF-8
  • Schema: Pydantic v2
  • Validation: Strict mode
  • Round-trip: Lossless

DEPENDENCIES

  • Python: >= 3.8
  • pydantic: >= 2.0
  • pyyaml: >= 6.0
  • python-dateutil: >= 2.8

VALIDATION RULES

  • No cyclic dependencies (DAG)
  • Unique task_id per workflow
  • Valid dependency references
  • Type-safe field validation
  • ISO 8601 duration format

DURATION FORMAT (ISO 8601)

from highway_dsl import Duration

# String format: "P[n]Y[n]M[n]DT[n]H[n]M[n]S"
Duration("PT30S")      # 30 seconds
Duration("PT5M")       # 5 minutes
Duration("PT2H")       # 2 hours
Duration("P1D")        # 1 day
Duration("PT1H30M")    # 1 hour 30 minutes

# Programmatic (timedelta)
from datetime import timedelta
Duration(timedelta(seconds=30))

IDEMPOTENCY

IDEMPOTENCY KEY

All operators support optional idempotency_key field for deterministic re-execution:

task.idempotency_key = "unique-key-{date}-{id}"

Execution engines should use idempotency keys to prevent duplicate execution of tasks when workflows are retried or resumed.

CALLBACK HOOKS

# Define callback tasks that execute on success/failure
builder.task("risky_task", "processor.risky")
       .on_success("notify_success")
       .on_failure("notify_failure")

# Define the callback handlers
builder.task("notify_success", "notify.send_success")
builder.task("notify_failure", "notify.send_failure")

SCHEDULING METADATA

from datetime import datetime

workflow = (
    WorkflowBuilder("scheduled_workflow")
    .set_schedule("0 */6 * * *")        # Cron: every 6 hours
    .set_start_date(datetime(2025, 1, 1))
    .set_catchup(False)                    # Don't backfill
    .add_tags("production", "critical")

    # ... define tasks ...

    .build()
)

IMPLEMENTATION NOTES

FOR EXECUTION ENGINE DEVELOPERS: Highway DSL is a specification language, not an execution runtime. To implement a Highway DSL-compliant execution engine, you must provide:

PARSER

  • YAML/JSON deserialization
  • Schema validation
  • Operator type dispatch
  • DAG construction

EXECUTOR

  • Topological sort
  • Dependency resolution
  • Template interpolation
  • Operator handlers
  • State persistence

RUNTIME

  • Function registry/loader
  • Result storage
  • Event bus (for events)
  • Retry/timeout handling
  • Checkpoint/resume logic

REFERENCE IMPLEMENTATION

Highway Workflow Engine provides a production-grade reference implementation of the Highway DSL specification. It demonstrates:

• PostgreSQL-backed durable execution

• Crash-resistant state management

• Atomic transaction orchestration

• Complete operator support (all 10 types)

• Event coordination and join semantics

AI Workflow Generator

Limit: 10 requests/min