Highway DSL - Workflow Engine Framework

By RODMEN/A

A sophisticated Domain Specific Language for defining and managing complex workflow engines and data processing pipelines in Python. Engineered by RODMEN/A for data infrastructure and distributed systems.

pip install highway-dsl

Stable Release v1.0.2 • MIT License

Enterprise-Grade Workflow Capabilities

Built for complex data processing and distributed systems

6
Operator Types
100%
Python Type Safety
YAML/JSON
Serialization
1000+
Workflow Nodes

About RODMEN/A

Software development company specializing in data infrastructure, distributed systems, and cloud-based solutions.

Highway DSL is developed by RODMEN/A to provide robust, scalable workflow management for complex data processing pipelines. Our expertise in distributed systems ensures Highway DSL is designed for enterprise-grade reliability and performance.

Specialized in data infrastructure solutions
Expertise in distributed systems architecture
Cloud-native workflow solutions
Reliable and scalable implementations

Enterprise Focus

Highway DSL is engineered specifically for enterprise environments requiring high availability, robust error handling, and complex dependency management.

Developed with RODMEN/A's deep expertise in distributed systems, Highway DSL ensures your workflows can scale efficiently while maintaining consistency and reliability.

Core Features

Comprehensive workflow engine capabilities

Fluent API

Intuitive, chainable methods for building complex workflows with clean, readable code. Designed for developers familiar with Python.

Rich Operators

Support for Task, Condition, Parallel, ForEach, While, and Wait operators for complex logic. Enterprise-grade workflow constructs.

Error Handling

Retry and timeout policies with configurable backoff strategies for resilient workflows that can handle real-world failure scenarios.

Loop Management

Fixed ForEach and While loops with proper dependency encapsulation and no unwanted "grandparent" dependencies.

Parallel Execution

Fan-in/fan-out parallel processing with proper synchronization and dependency management for maximum throughput.

Serialization

YAML and JSON export/import with round-trip validation and full interoperability for integration with other tools.

Advanced Examples

See Highway DSL in action with real-world workflow patterns

Parallel Processing

# Process data in parallel branches
builder.parallel(
    "process_data",
    branches={
        "branch_a": lambda b: b.task("task_a1", "processor.task_a1")
                                .task("task_a2", "processor.task_a2"),
        "branch_b": lambda b: b.task("task_b1", "processor.task_b1")
                                .task("task_b2", "processor.task_b2"),
    }
)

Loop with Fixed Dependencies

# ForEach loop with proper dependency management
builder.foreach(
    "process_items",
    items="{{data.items}}",
    loop_body=lambda fb: fb.task(
        "process_item",
        "processor.handle_item",
        args=["{{item.id}}"]
    )
# Fixed: Loop body tasks no longer get
# unwanted "grandparent" dependencies
)

Conditional Logic

# Branch based on runtime conditions
builder.condition(
    "data_quality_check",
    condition="{{data.quality_score}} > 0.8",
    if_true=lambda b: b.task("advanced_processing", "processor.advanced"),
    if_false=lambda b: b.task("basic_processing", "processor.basic"),
)

While Loop with Retry

# Retry processing until success
builder.while_loop(
    "retry_processing",
    condition="{{result.status}} == 'failed'",
    loop_body=lambda wb: wb.task("retry_task", "processor.process")
                           .task("check_result", "processor.validate_result")
)

DSL Specification

Comprehensive domain-specific language for workflow definition

Component Type Description Usage
Workflow Core Object Top-level container for all workflow tasks and definitions Contains tasks, dependencies, variables, and serialization methods
WorkflowBuilder Fluent Interface Chainable API for constructing workflows programmatically Used to fluently define tasks, operators, and dependencies
TaskOperator Execution Unit Basic building block representing a single workflow operation Executes functions, can have dependencies, retries, and timeouts
ConditionOperator Control Flow Implements conditional branching (if/else logic) Directs workflow execution based on runtime conditions
ParallelOperator Concurrency Executes multiple branches simultaneously Enables parallel processing with fan-in/fan-out patterns
ForEachOperator Iteration Iterates over collections with proper dependency management Processes each item in a collection with encapsulated dependencies
WhileOperator Looping Executes loops based on runtime conditions Continues execution until specified condition is met
WaitOperator Scheduling Pauses execution until specified time or duration Introduces delays or schedules execution for later
RetryPolicy Error Handling Configurable retry strategy with backoff Defines how failed tasks should be retried
TimeoutPolicy Error Handling Execution time limits with configurable behavior Ensures tasks don't run indefinitely

Workflow Definition Syntax

# Core workflow structure
workflow = WorkflowBuilder(name)
    .task(id, function, **options)
    .condition(id, condition, if_true, if_false)
    .parallel(id, branches)
    .foreach(id, items, loop_body)
    .while_loop(id, condition, loop_body)
    .wait(id, duration_or_datetime)
    .build()

Dependency Management

# Explicit dependencies
.task("task_a", "service.operation_a")
.task("task_b", "service.operation_b", dependencies=["task_a"])

# Implicit dependencies (fluent)
.task("task_a", "service.operation_a")
.task("task_b", "service.operation_b")  # Automatically depends on task_a

Simple Yet Powerful

Define complex workflows with intuitive Python code that's both readable and maintainable.

Chain operations fluently
Define dependencies automatically
Handle complex branching logic
Serialize to multiple formats
Enterprise-grade error handling
Scale to complex distributed workflows
# Enterprise ETL Workflow
from
highway_dsl
import
WorkflowBuilder, RetryPolicy
from
datetime
import
timedelta

workflow = (WorkflowBuilder("enterprise_etl")
    .task("extract", "etl.extract_from_source",
        result_key="raw_data",
        retry_policy=RetryPolicy(max_retries=3, delay=timedelta(seconds=30)))
    .task("transform", "etl.transform_data",
        args=["{{raw_data}}"], result_key="transformed_data")
    .task("load", "etl.load_to_target",
        args=["{{transformed_data}}"])
    .build())

Advanced Examples

See Highway DSL in action with real-world workflow patterns

Parallel Processing

# Process data in parallel branches
builder.parallel(
    "process_data",
    branches={
        "branch_a": lambda b: b.task("task_a1", "processor.task_a1")
                                .task("task_a2", "processor.task_a2"),
        "branch_b": lambda b: b.task("task_b1", "processor.task_b1")
                                .task("task_b2", "processor.task_b2"),
    }
)

Loop with Fixed Dependencies

# ForEach loop with proper dependency management
builder.foreach(
    "process_items",
    items="{{data.items}}",
    loop_body=lambda fb: fb.task(
        "process_item",
        "processor.handle_item",
        args=["{{item.id}}"]
    )
# Fixed: Loop body tasks no longer get
# unwanted "grandparent" dependencies
)

Conditional Logic

# Branch based on runtime conditions
builder.condition(
    "data_quality_check",
    condition="{{data.quality_score}} > 0.8",
    if_true=lambda b: b.task("advanced_processing", "processor.advanced"),
    if_false=lambda b: b.task("basic_processing", "processor.basic"),
)

While Loop with Retry

# Retry processing until success
builder.while_loop(
    "retry_processing",
    condition="{{result.status}} == 'failed'",
    loop_body=lambda wb: wb.task("retry_task", "processor.process")
                           .task("check_result", "processor.validate_result")
)

Get Started in Seconds

Install Highway DSL and start building workflows today

pip install highway-dsl

Why Highway DSL?

Enterprise Performance

Built with performance in mind, supporting parallel execution and optimized dependency management for large-scale workflows.

Reliable by Design

Stable release with comprehensive error handling, retry mechanisms, and proper dependency management for production environments.

Industrial Strength

Developed by RODMEN/A for data infrastructure and distributed systems with proven reliability in enterprise environments.

AI Agent Workflow Example

Real-world example: AI agent that builds, tests, and deploys a full software platform using TDD

Generated YAML Output

Python DSL Code

Workflow Breakdown

Phase 1: Setup & Test Generation

AI analyzes requirements and sets up Git repository, then generates test suites for all microservices in parallel.

Phase 2: TDD Loop

Continuous implementation and refactoring loop that continues until all tests pass, with conditional logic for implementation vs. refactoring.

Phase 3: Deployment & Observability

Infrastructure provisioning, parallel deployments, health checks, and observability setup with automated verification.

About RODMEN/A

Software development company specializing in data infrastructure, distributed systems, and cloud-based solutions.

Highway DSL is developed by RODMEN/A to provide robust, scalable workflow management for complex data processing pipelines. Our expertise in distributed systems ensures Highway DSL is designed for enterprise-grade reliability and performance.

Specialized in data infrastructure solutions
Expertise in distributed systems architecture
Cloud-native workflow solutions
Reliable and scalable implementations

Enterprise Focus

Highway DSL is engineered specifically for enterprise environments requiring high availability, robust error handling, and complex dependency management.

Developed with RODMEN/A's deep expertise in distributed systems, Highway DSL ensures your workflows can scale efficiently while maintaining consistency and reliability.

AI-Optimized Workflow Engine

Highway DSL represents the most sophisticated workflow specification available for Python environments

Domain-Specific Optimization

Highway DSL is purpose-built for workflow definition with an intuitive API that reduces complexity while maximizing expressiveness.

Declarative Syntax

Define complex workflow relationships with simple, readable Python code that clearly expresses intent and execution order.

Advanced Dependency Management

Intelligent dependency resolution that correctly handles complex nested structures including ForEach and While loops.