INSTALLATION
from highway_dsl import WorkflowBuilder workflow = (WorkflowBuilder("my_workflow", version="2.0.0") .task("task_1", "module.function", args=["arg1"]) .task("task_2", "module.function2", dependencies=["task_1"]) .build() ) # Serialize to YAML yaml_output = workflow.to_yaml() # Serialize to JSON json_output = workflow.to_json()
SPECIFICATION OVERVIEW
Highway DSL v2.0.0 is a Long-Term Support (LTS) specification with guaranteed backward compatibility for the 2.x series. The specification defines:
WORKFLOW STRUCTURE
- name: string (required)
- version: string (default: "2.0.0")
- description: string (optional)
- start_task: string (required)
- tasks: dict[str, Operator]
- variables: dict[str, Any]
TYPE SYSTEM
- Pydantic v2 data validation
- Full Python type hints
- Strict schema validation
- Round-trip serialization
- YAML/JSON interchange
TEMPLATE SYNTAX
- Jinja2-style: {{variable}}
- Result references: {{task.result}}
- Item iteration: {{item.field}}
- Nested access: {{obj.nested.field}}
VERSIONING POLICY
v2.0.0 LTS (Current)
• No breaking changes in 2.x series
• Backward-compatible additions only
• Semantic versioning: 2.MINOR.PATCH
• Production stability guarantee
CORE CONCEPTS
OPERATOR
Base unit of workflow definition. All operators inherit from BaseOperator with common fields:
• task_id: Unique identifier within workflow
• operator_type: Discriminator for operator polymorphism
• dependencies: List of task_ids that must complete first
• metadata: Arbitrary key-value pairs for extensibility
DEPENDENCY GRAPH
Workflows are directed acyclic graphs (DAGs) where:
• Nodes = Operators (tasks, conditions, loops, etc.)
• Edges = Dependencies (execution order constraints)
• Cycles are prohibited (validation enforced)
• Topological sort determines execution order
EXECUTION SEMANTICS
Highway DSL specification defines declarative workflow structure, not imperative execution logic. Execution engines must implement:
• Dependency resolution and topological ordering
• Template variable interpolation at runtime
• Operator-specific execution handlers
• State management and persistence
OPERATOR SPECIFICATION
Highway DSL v2.0.0 defines 10 operator types for comprehensive workflow modeling:
| OPERATOR | TYPE | DESCRIPTION | REQUIRED FIELDS |
|---|---|---|---|
| TaskOperator | task | Basic execution unit that calls a function | function, args/kwargs |
| ConditionOperator | condition | Binary branch based on condition expression | condition, if_true, if_false |
| ParallelOperator | parallel | Concurrent execution of multiple branches | branches (dict of branch_name: [operators]) |
| ForEachOperator | foreach | Iterate over collection with loop body | items, loop_body |
| WhileOperator | while | Conditional loop with condition check | condition, loop_body |
| WaitOperator | wait | Delay execution for specified duration | wait_for (ISO 8601 duration) |
| EmitEventOperator | emit_event | Publish event to event bus for coordination | event_name |
| WaitForEventOperator | wait_for_event | Block until event is received | event_name, timeout (optional) |
| SwitchOperator | switch | Multi-way branch with case matching | switch_on, cases, default (optional) |
| JoinOperator | join | Explicit coordination with multiple join modes | join_on, join_mode (ALL_OF/ANY_OF/ALL_SUCCESS/ONE_SUCCESS) |
TaskOperator - Function Execution
from highway_dsl import TaskOperator, RetryPolicy, TimeoutPolicy from datetime import timedelta task = TaskOperator( task_id="process_data", function="data.processor.transform", args=["{{input_file}}"], kwargs={"mode": "batch"}, result_key="processed_data", retry_policy=RetryPolicy( max_retries=3, delay=timedelta(seconds=30), backoff_factor=2.0 ), timeout_policy=TimeoutPolicy( timeout=timedelta(minutes=10), kill_on_timeout=True ) )
ConditionOperator - Branching Logic
builder.condition(
"data_quality_check",
condition="{{quality_score}} > 0.95",
if_true=lambda b: b.task("advanced_process", "proc.advanced"),
if_false=lambda b: b.task("basic_process", "proc.basic")
)
ParallelOperator - Concurrent Execution
builder.parallel(
"process_all_data",
branches={
"images": lambda b: b.task("process_img", "img.process"),
"videos": lambda b: b.task("process_vid", "vid.process"),
"audio": lambda b: b.task("process_aud", "aud.process")
}
)
ForEachOperator - Collection Iteration
builder.foreach(
"process_items",
items="{{data.records}}",
loop_body=lambda b: (
b.task("validate", "validate.record", args=["{{item}}"])
.task("transform", "transform.record", args=["{{item}}"])
.task("store", "store.record", args=["{{item}}"])
)
)
WhileOperator - Conditional Loop
builder.while_loop(
"retry_until_success",
condition="{{result.status}} == 'failed'",
loop_body=lambda b: (
b.task("attempt", "processor.attempt")
.task("check", "processor.check_result", result_key="result")
)
)
EmitEvent / WaitForEvent - Event Coordination
# Branch 1: Emit event after processing builder.task("process", "processor.run") .emit_event("emit_ready", event_name="processing_complete") # Branch 2: Wait for event before continuing builder.wait_for_event( "wait_ready", event_name="processing_complete", timeout=timedelta(hours=1) ) .task("downstream", "processor.downstream")
SwitchOperator - Multi-Way Branch
builder.switch(
"route_by_priority",
switch_on="{{task.priority}}",
cases={
"critical": "handle_critical",
"high": "handle_high",
"medium": "handle_medium",
"low": "handle_low"
},
default="handle_default"
)
JoinOperator - Explicit Coordination
from highway_dsl import JoinMode # Wait for all tasks to complete (regardless of success/failure) builder.join( "wait_all", join_on=["task_1", "task_2", "task_3"], join_mode=JoinMode.ALL_OF ) # Wait for all tasks to succeed (fail if any fails) builder.join( "wait_all_success", join_on=["task_1", "task_2", "task_3"], join_mode=JoinMode.ALL_SUCCESS ) # Wait for any one task to complete builder.join( "wait_any", join_on=["task_1", "task_2", "task_3"], join_mode=JoinMode.ANY_OF ) # Wait for first successful completion builder.join( "wait_first_success", join_on=["task_1", "task_2", "task_3"], join_mode=JoinMode.ONE_SUCCESS )
COMPLETE WORKFLOW EXAMPLES
Example 1: Data Processing Pipeline
from highway_dsl import WorkflowBuilder from datetime import timedelta workflow = ( WorkflowBuilder("data_pipeline", version="2.0.0") # Extract data from multiple sources in parallel .parallel( "extract_data", branches={ "database": lambda b: b.task("extract_db", "extract.from_db"), "api": lambda b: b.task("extract_api", "extract.from_api"), "files": lambda b: b.task("extract_files", "extract.from_files") } ) # Wait for all extractions to complete successfully .join( "wait_extractions", join_on=["extract_db", "extract_api", "extract_files"], join_mode="ALL_SUCCESS" ) # Merge and transform .task("merge", "transform.merge_sources", result_key="merged_data") .task("validate", "transform.validate", args=["{{merged_data}}"]) # Branch based on data quality .condition( "quality_check", condition="{{validate.score}} >= 0.95", if_true=lambda b: b.task("load_prod", "load.to_production"), if_false=lambda b: b.task("load_staging", "load.to_staging") ) .build() )
Example 2: Event-Driven Workflow
workflow = (
WorkflowBuilder("event_driven", version="2.0.0")
.parallel(
"process_branches",
branches={
"producer": lambda b: (
b.task("generate", "producer.generate_data")
.emit_event("notify", event_name="data_ready")
),
"consumer": lambda b: (
b.wait_for_event("wait", event_name="data_ready",
timeout=timedelta(minutes=5))
.task("process", "consumer.process_data")
)
}
)
.build()
)
YAML Serialization Output
name: data_pipeline version: "2.0.0" description: "" start_task: extract_data tasks: extract_data: task_id: extract_data operator_type: parallel branches: database: - task_id: extract_db operator_type: task function: extract.from_db api: - task_id: extract_api operator_type: task function: extract.from_api wait_extractions: task_id: wait_extractions operator_type: join join_on: [extract_db, extract_api, extract_files] join_mode: ALL_SUCCESS dependencies: [extract_data] merge: task_id: merge operator_type: task function: transform.merge_sources result_key: merged_data dependencies: [wait_extractions] variables: {}
TECHNICAL SPECIFICATIONS
SERIALIZATION
- Format: YAML 1.2 / JSON
- Encoding: UTF-8
- Schema: Pydantic v2
- Validation: Strict mode
- Round-trip: Lossless
DEPENDENCIES
- Python: >= 3.8
- pydantic: >= 2.0
- pyyaml: >= 6.0
- python-dateutil: >= 2.8
VALIDATION RULES
- No cyclic dependencies (DAG)
- Unique task_id per workflow
- Valid dependency references
- Type-safe field validation
- ISO 8601 duration format
DURATION FORMAT (ISO 8601)
from highway_dsl import Duration # String format: "P[n]Y[n]M[n]DT[n]H[n]M[n]S" Duration("PT30S") # 30 seconds Duration("PT5M") # 5 minutes Duration("PT2H") # 2 hours Duration("P1D") # 1 day Duration("PT1H30M") # 1 hour 30 minutes # Programmatic (timedelta) from datetime import timedelta Duration(timedelta(seconds=30))
IDEMPOTENCY
IDEMPOTENCY KEY
All operators support optional idempotency_key field for deterministic re-execution:
task.idempotency_key = "unique-key-{date}-{id}"
Execution engines should use idempotency keys to prevent duplicate execution of tasks when workflows are retried or resumed.
CALLBACK HOOKS
# Define callback tasks that execute on success/failure builder.task("risky_task", "processor.risky") .on_success("notify_success") .on_failure("notify_failure") # Define the callback handlers builder.task("notify_success", "notify.send_success") builder.task("notify_failure", "notify.send_failure")
SCHEDULING METADATA
from datetime import datetime workflow = ( WorkflowBuilder("scheduled_workflow") .set_schedule("0 */6 * * *") # Cron: every 6 hours .set_start_date(datetime(2025, 1, 1)) .set_catchup(False) # Don't backfill .add_tags("production", "critical") # ... define tasks ... .build() )
IMPLEMENTATION NOTES
PARSER
- YAML/JSON deserialization
- Schema validation
- Operator type dispatch
- DAG construction
EXECUTOR
- Topological sort
- Dependency resolution
- Template interpolation
- Operator handlers
- State persistence
RUNTIME
- Function registry/loader
- Result storage
- Event bus (for events)
- Retry/timeout handling
- Checkpoint/resume logic
REFERENCE IMPLEMENTATION
Highway Workflow Engine provides a production-grade reference implementation of the Highway DSL specification. It demonstrates:
• PostgreSQL-backed durable execution
• Crash-resistant state management
• Atomic transaction orchestration
• Complete operator support (all 10 types)
• Event coordination and join semantics