Skip to content

Step 1 & 2: Analyze Requirements & Design Unified Structure Based on your request, I propose the following conceptual structure for a unified workflow YAML file. This structure aims to bring trigger definitions (like webhooks) and the taskflow execution logic together. yaml

Top-level workflow definition

workflow_name: "UniqueWorkflowIdentifier" description: "Human-readable description of the entire workflow." version: "1.0"

triggers: - id: "trigger_id_1" # Unique ID for this trigger within the workflow file type: "webhook" # Type of trigger (e.g., webhook, schedule, manual) enabled: true # Whether this trigger is active config: # Webhook-specific configuration provider: "unipile" # Corresponds to :provider in /webhooks/:provider/:webhook_id webhook_id: "new_email" # Corresponds to :webhook_id # Optional: Define expected payload schema or key fields for validation/documentation # expected_payload_schema: { ... JSON Schema ... } # Optional: Security configuration (e.g., secret name to look up) # secret_ref: "unipile_webhook_secret" # Defines which taskflow within this file to execute when triggered target_taskflow: "main_email_processing" # Optional: Define how webhook payload maps to taskflow input # input_mapping: # taskflow_input_field_1: "${TRIGGER.payload.field_a}" # taskflow_input_field_2: "literal_value"

  • id: "trigger_id_2" type: "schedule" enabled: false config: cron_expression: "0 0 * * *" # e.g., daily at midnight target_taskflow: "daily_report_generation"

taskflows: - taskflow_id: "main_email_processing" # ID referenced by triggers description: "The main sequence for processing incoming emails." # Optional: Define expected input schema for this taskflow # input_schema: { ... JSON Schema ... } config: # Optional: Default config for the taskflow execution itself max_concurrency: 5 timeout: "10m" tasks: - id: "get_email_content" type: "mfo_api_resource_get" config: # Access input provided by the trigger (via input_mapping or default payload) resource_id: "${INPUT.email_resource_id}" successors: ["analyze_email"] # ... other task fields (name, description, outputs) - id: "analyze_email" # ... task definition ... dependencies: ["get_email_content"] # ... # ... other tasks ... subflows: # ... subflow definitions if needed ... conditions: # ... condition definitions if needed ...

  • taskflow_id: "daily_report_generation" description: "Generates a daily report." tasks: # ... tasks for report generation ...

Optional: Global definitions (e.g., secrets, common configurations)

global_configs:

secrets:

unipile_webhook_secret: "lookup_from_vault:unipile_secret"

Key Ideas in this Structure:

workflow_name: Top-level identifier.
triggers: A list defining how a workflow execution starts.
    Each trigger has a type (webhook, schedule, etc.).
    Webhook triggers define provider and webhook_id (matching the URL).
    Crucially, each trigger specifies a target_taskflow ID, linking it to a specific task sequence within the same file.
    input_mapping (optional) could explicitly define how data from the trigger event (e.g., webhook payload) becomes the ${INPUT} for the target taskflow.
taskflows: A list defining the actual execution logic (the processes).
    Each item has a taskflow_id referenced by triggers.
    Contains the familiar tasks, subflows, and conditions structure.
    Tasks can access initial data via ${INPUT.<field_name>}.

This unified structure keeps the trigger configuration alongside the workflow it initiates. The SLOP client/server would need to be adapted to parse this structure, potentially register the webhook endpoints based on the triggers section, and initiate the correct target_taskflow when a trigger fires. I will now proceed to document this schema in more detail (Step 3).

Documenting the unified YAML schema in detail via message to the user. Continuing with Step 3: Here is the detailed documentation for the proposed unified SLOP Workflow YAML schema. SLOP Unified Workflow YAML Definition This document describes a proposed YAML structure for defining complete SLOP workflows, including their triggers and execution logic, within a single file. This structure aims to provide a self-contained definition suitable for management and potentially generation by LLMs. Note: This is a proposed schema. Implementation within the slop-client and slop-server would be required to parse and act upon this structure. Top-Level Fields The root of the YAML file contains metadata for the entire workflow definition. yaml

workflow_name: string # Required description: string # Optional version: string # Optional

triggers: [TriggerDefinition] # Optional, but usually needed taskflows: [TaskflowDefinition] # Required global_configs: object # Optional

workflow_name (string, Required): A unique identifier for this workflow definition file. Used for referencing and management.
description (string, Optional): A human-readable explanation of the workflow's overall purpose.
version (string, Optional): Version string for this workflow definition (e.g., "1.0.0").
triggers (list, Optional): Defines the events or conditions that initiate the execution of specific taskflows within this file. See Trigger Definition.
taskflows (list, Required): Defines the actual sequences of operations (tasks, subflows, conditions). See Taskflow Definition.
global_configs (object, Optional): A section for defining configurations shared across the workflow, such as secret references. See Global Configurations.

Trigger Definition (triggers[]) Each item in the triggers list defines a specific mechanism that can start a taskflow. yaml

  • id: string # Required type: string # Required (e.g., "webhook", "schedule", "manual", "event") enabled: boolean # Optional (default: true) config: object # Required (structure depends on 'type') target_taskflow: string # Required input_mapping: object # Optional

    id (string, Required): A unique identifier for this trigger within this workflow file. type (string, Required): Specifies the kind of trigger. Common types include: webhook: Triggered by an incoming HTTP request to a specific SLOP endpoint. schedule: Triggered based on a time schedule (e.g., cron). manual: Intended to be triggered explicitly via an API call or UI action (details TBD). event: Triggered by an internal SLOP system event (details TBD, could potentially replace the separate EventJobTrigger API). enabled (boolean, Optional): Defaults to true. If false, this trigger is ignored. config (object, Required): Contains configuration specific to the trigger type. For type: webhook: yaml

config: provider: string # Required. Corresponds to :provider in /webhooks/:provider/:webhook_id webhook_id: string # Required. Corresponds to :webhook_id # Optional: Define expected payload schema (JSON Schema) for validation/documentation expected_payload_schema: object # Optional: Reference to a secret for signature verification secret_ref: string # e.g., "github_webhook_secret" (defined in global_configs)

For type: schedule: yaml

    config:
      cron_expression: string # Required. Standard cron format (e.g., "0 * * * *")

    (Define config structures for other types as needed)
target_taskflow (string, Required): The taskflow_id (from the taskflows section in this file) that should be executed when this trigger fires.
input_mapping (object, Optional): Defines how data from the trigger event is mapped to the ${INPUT} context of the target_taskflow. If omitted, the entire trigger payload might be passed as ${INPUT} by default.
    Keys are the field names within the ${INPUT} context (e.g., taskflow_input_field).
    Values can be:
        Literal values (string, number, boolean).
        Expressions referencing the trigger's payload (e.g., "${TRIGGER.payload.some_field}"). The exact syntax (${TRIGGER...}) needs standardization.

Taskflow Definition (taskflows[]) Each item in the taskflows list defines a reusable sequence of operations. yaml

  • taskflow_id: string # Required description: string # Optional input_schema: object # Optional (JSON Schema) config: object # Optional (Taskflow-level config) tasks: [TaskDefinition] # Required subflows: [SubflowDefinition] # Optional conditions: [ConditionDefinition] # Optional

    taskflow_id (string, Required): A unique identifier for this taskflow within this workflow file. Referenced by target_taskflow in triggers. description (string, Optional): Human-readable explanation of what this specific taskflow does. input_schema (object, Optional): A JSON Schema defining the expected structure of the ${INPUT} data for this taskflow. Useful for validation and documentation. config (object, Optional): Configuration specific to the execution of this taskflow (e.g., max_concurrency, timeout). These might override global settings or provide defaults. tasks (list, Required): The list of individual task definitions. The structure of each TaskDefinition (including id, type, config, dependencies, successors, outputs, data passing via ${task_id.output_key}) remains the same as previously documented. Tasks within this taskflow access initial data provided by the trigger via "${INPUT.}" in their config. subflows (list, Optional): Definitions of subflows used within this taskflow. The structure of SubflowDefinition remains the same as previously documented. conditions (list, Optional): Definitions of conditional branching points within this taskflow. The structure of ConditionDefinition remains the same as previously documented.

Global Configurations (global_configs) An optional section for defining shared values or configurations. yaml

global_configs: secrets: secret_name_1: string # e.g., "lookup_from_vault:path/to/secret" secret_name_2: string # Plain value (less secure) # Other potential global settings...

secrets (object, Optional): A map for defining references to secrets (e.g., API keys, webhook secrets). The values might be direct strings or use a specific syntax (like lookup_from_vault:...) indicating how the SLOP system should retrieve the actual secret value securely at runtime. Trigger configurations (secret_ref) and potentially task configurations could reference these keys.

This detailed schema provides a foundation for the unified workflow definition. I will now create some examples based on this structure (Step 4).

Creating examples of the unified workflow YAML structure to illustrate its practical application. Continuing with Step 4: Here are examples illustrating the proposed unified SLOP Workflow YAML structure.

SLOP Workflow YAML: Taskflow Directive Examples (Revised)

Recap: The previous examples have been revised based on user feedback regarding: 1. Variable Syntax: The ${...} syntax has been replaced with CONTEXT.variable (e.g., INPUT.email_resource_id, get_email_content.resource.content). It's crucial to understand this is not standard YAML but a convention interpreted by the SLOP workflow engine at runtime to substitute values from the workflow context. 2. Content Generation: Specific content details (like LLM prompts or email subjects) are now abstracted. The workflow orchestrates tasks that handle content generation, often by referencing external templates (prompt_template_id) or using dedicated tasks (like generate_subject).

Example 1: Email Processing (Content Generation Abstracted)

workflow_name: "UnipileEmailProcessingWorkflow"
description: "Handles incoming emails from Unipile via webhook."
version: "1.2" # Incremented version

global_configs:
  secrets:
    unipile_secret: "lookup:vault/slop/unipile_webhook_secret"

triggers:
  - id: "unipile_new_email_webhook"
    type: "webhook"
    enabled: true
    config:
      provider: "unipile"
      webhook_id: "new_email"
      secret_ref: "unipile_secret"
      expected_payload_schema:
        type: object
        properties:
          resource_id: { type: string }
          thread_id: { type: string }
        required: [resource_id, thread_id]
    target_taskflow: "process_email_sequence"
    input_mapping:
      email_resource_id: "TRIGGER.payload.resource_id"
      email_thread_id: "TRIGGER.payload.thread_id"

taskflows:
  - taskflow_id: "process_email_sequence"
    description: "Fetches, analyzes, and tags an email."
    input_schema:
      type: object
      properties:
        email_resource_id: { type: string }
        email_thread_id: { type: string }
      required: [email_resource_id, email_thread_id]
    tasks:
      - id: "get_email_content"
        name: "Get Email Resource Content"
        type: "mfo_api_resource_get"
        config:
          resource_id: "INPUT.email_resource_id"
        successors: ["analyze_email"]
        outputs:
          - get_email_content.resource

      - id: "analyze_email"
        name: "Analyze Email with LLM (using Template)"
        type: "mfo_api_chat_send" # Or potentially a dedicated "prompt_template_task"
        config:
          prompt_template_id: "email_analysis_v1" # ID referencing an external prompt template
          prompt_context:
            email_content: "get_email_content.resource.content"
          provider_id: "default_llm"
        dependencies: ["get_email_content"]
        successors: ["tag_email_thread"]
        outputs:
          - analyze_email.analysis_result # e.g., { "sender": "...", "subject": "...", "summary": "..." }

      - id: "tag_email_thread"
        name: "Tag Email Thread via Tool"
        type: "mfo_api_tool_execute"
        config:
          tool_id: "unipile_email_tag_thread"
          arguments:
            tag_name: "ai_analyzed"
            thread_id: "INPUT.email_thread_id"
        dependencies: ["analyze_email"]

### Example 2: Scheduled Daily Report (Content Generation Abstracted)

```yaml
workflow_name: "DailyReportingWorkflow"
description: "Generates and distributes a daily sales report."
version: "1.1" # Incremented version

triggers:
  - id: "daily_midnight_schedule"
    type: "schedule"
    enabled: true
    config:
      cron_expression: "0 0 * * *"
    target_taskflow: "generate_and_send_report"

taskflows:
  - taskflow_id: "generate_and_send_report"
    description: "Generates a sales report and emails it."
    tasks:
      - id: "fetch_sales_data"
        name: "Fetch Sales Data"
        type: "mfo_api_tool_execute"
        config:
          tool_id: "internal_db_query_sales"
          arguments:
            period: "last_day"
        successors: ["format_report"]
        outputs:
          - fetch_sales_data.sales_figures

      - id: "format_report"
        name: "Format Sales Report"
        type: "custom_report_formatter"
        config:
          data: "fetch_sales_data.sales_figures"
          template: "daily_sales_template.md"
        dependencies: ["fetch_sales_data"]
        successors: ["generate_subject"]
        outputs:
          - format_report.report_content

      - id: "generate_subject"
        name: "Generate Report Subject"
        type: "simple_template_task" # Example: A task using a simple template
        config:
          template_string: "Daily Sales Report - SYSTEM.DATE"
          # Alternatively, use an LLM call:
          # type: "mfo_api_chat_send"
          # config:
          #   prompt_template_id: "daily_report_subject_v1"
          #   prompt_context: { report_date: "SYSTEM.DATE" }
          #   provider_id: "fast_llm"
        dependencies: ["format_report"]
        successors: ["email_report"]
        outputs:
          - generate_subject.subject_text

      - id: "email_report"
        name: "Email Sales Report"
        type: "mfo_api_tool_execute"
        config:
          tool_id: "send_email"
          arguments:
            to: "management@example.com"
            subject: "generate_subject.subject_text"
            body: "format_report.report_content"
        dependencies: ["generate_subject", "format_report"]

These examples demonstrate how triggers and taskflows can coexist within a single YAML file, linking specific events (like a webhook or a schedule) to the execution of defined task sequences. They also show how data flows from the trigger into the taskflow's input context and how content generation can be abstracted from the main workflow definition.

Explaining how the proposed unified YAML structure would integrate with the existing SLOP system components. Continuing with Step 5: Explaining the integration of the proposed unified YAML structure. Integration with Existing SLOP System The proposed unified YAML structure represents a significant shift from the current architecture where taskflow definitions (YAML) and trigger configurations (Server API /api/config/event-triggers) are separate. Integrating this unified structure would require modifications to both the SLOP server and potentially the slop-client. Here’s how it could integrate and what changes would be needed:

YAML Parsing:
    Current: The system (likely slop-client or a server component loading taskflows for jobs) parses YAML expecting only the taskflow definition (name, tasks, subflows, conditions).
    Unified: A new or modified parser would be needed to understand the top-level workflow_name, triggers, taskflows, and global_configs structure.
Trigger Management (Server-Side Focus Recommended):
    Current: Triggers are managed explicitly via the /api/config/event-triggers endpoints on the server. Webhook requests hit /webhooks/:provider/:webhook_id, a specific handler publishes to an event channel, and a configured EventJobTrigger listening on that channel creates a job (POST /api/jobs).
    Unified: The SLOP server would likely take primary responsibility for processing the triggers section of the unified YAML.
        Loading: The server needs a mechanism to load/register these unified workflow YAML files.
        Webhook Triggers: When loading a YAML with a webhook trigger, the server would need to:
            Ensure its internal routing is configured to handle requests for the specified /webhooks/:provider/:webhook_id.
            Associate that incoming route directly with the target_taskflow defined in the same YAML file.
            Handle security verification using the secret_ref (looking up the secret via global_configs).
            This could potentially replace the need for the separate /api/config/event-triggers database table and API for webhook-based triggers defined in this unified format. The YAML itself becomes the source of truth for the trigger configuration.
        Schedule Triggers: When loading a YAML with a schedule trigger, the server would need to interact with its internal scheduling system to register a job that runs the target_taskflow according to the cron_expression.
Taskflow Initiation and Input Mapping:
    Current: A job (/api/jobs) is created with a job_process_id (taskflow name) and an input payload. The taskflow executor uses this.
    Unified: When a trigger defined in the unified YAML fires (e.g., a matching webhook arrives or a schedule hits):
        The server identifies the target_taskflow from the YAML.
        It processes the input_mapping (if present) using data from the trigger event (e.g., webhook payload, current time for schedule) to construct the initial ${INPUT} context.
        It then initiates the execution of that specific target_taskflow (likely still using its internal job/taskflow execution engine), providing the constructed ${INPUT} context.
Role of slop-client:
    The slop-client's core taskflow execution logic (internal/taskflow interpreting tasks, subflows, conditions, handling ${task_id.output_key}, calling SLOP API tasks) would remain largely relevant if the server still delegates the actual execution of the taskflow steps to a client-like worker.
    However, the client might become less involved in defining or triggering the workflow if the unified YAML is primarily managed and interpreted by the server.
    The client might still be used for manually running taskflows defined in these YAMLs (bypassing the triggers section) or for interacting with the server for other purposes.

Summary of Impact:

Requires a new YAML parser on the server (or wherever these files are processed).
Requires server-side logic to interpret the triggers section and configure internal routing/scheduling.
Potentially simplifies or replaces the existing /api/config/event-triggers system for workflows defined in the unified format.
Requires a mechanism on the server to map trigger events and input_mapping to the initiation of the correct target_taskflow with the right input context.
The core go-taskflow execution logic for tasks/subflows/conditions remains similar, but the overall orchestration layer changes.

This approach centralizes the workflow definition (trigger + logic) into a single artifact, potentially making it easier to manage and understand the end-to-end process, and aligns with your goal of having a structure suitable for LLM generation. I will now prepare the final message containing all the generated documentation and examples (Step 6).

Delivering final comprehensive documentation of the unified YAML workflow structure via messages. Okay, I have completed the design, documentation, and explanation for the proposed unified SLOP workflow YAML structure, delivering the content via messages due to the sandbox limitations. To summarize and provide the complete deliverable in one place, here is the consolidated information: 1. Conceptual Unified YAML Structure: yaml

Top-level workflow definition

workflow_name: "UniqueWorkflowIdentifier" description: "Human-readable description of the entire workflow." version: "1.0"

triggers: - id: "trigger_id_1" # Unique ID for this trigger within the workflow file type: "webhook" # Type of trigger (e.g., webhook, schedule, manual) enabled: true # Whether this trigger is active config: # Webhook-specific configuration provider: "unipile" # Corresponds to :provider in /webhooks/:provider/:webhook_id webhook_id: "new_email" # Corresponds to :webhook_id secret_ref: "unipile_webhook_secret" target_taskflow: "main_email_processing" input_mapping: taskflow_input_field_1: "${TRIGGER.payload.field_a}"

  • id: "trigger_id_2" type: "schedule" config: cron_expression: "0 0 * * *" target_taskflow: "daily_report_generation"

taskflows: - taskflow_id: "main_email_processing" # ID referenced by triggers description: "The main sequence for processing incoming emails." config: { max_concurrency: 5 } tasks: - id: "get_email_content" type: "mfo_api_resource_get" config: { resource_id: "${INPUT.taskflow_input_field_1}" } # ... other task fields ... # ... other tasks ... subflows: [ { ... } ] conditions: [ { ... } ]

  • taskflow_id: "daily_report_generation" # ... taskflow definition ...

global_configs: secrets: unipile_webhook_secret: "lookup:vault/slop/unipile_secret"

  1. Detailed Unified YAML Schema Documentation:

    Top-Level: workflow_name (req), description, version, triggers (list), taskflows (list, req), global_configs. Trigger Definition (triggers[]): id (req), type (req, e.g., "webhook", "schedule"), enabled, config (req, type-specific), target_taskflow (req, links to taskflow_id), input_mapping (optional, maps trigger data to ${INPUT}). Webhook Config: provider (req), webhook_id (req), expected_payload_schema, secret_ref. Schedule Config: cron_expression (req). Taskflow Definition (taskflows[]): taskflow_id (req), description, input_schema, config (taskflow exec config), tasks (list, req), subflows (list), conditions (list). tasks, subflows, conditions structures remain as previously defined, using ${task_id.output_key} for internal data passing and ${INPUT.} for accessing data provided by the trigger. Global Configurations (global_configs): Optional section for shared values like secrets (referenced by secret_ref).

(Refer to the previous message for the full detailed breakdown of each field). 3. Unified Workflow YAML Examples:

Example 1 (Webhook): Showed a workflow_name: UnipileEmailProcessingWorkflow with a webhook trigger for unipile/new_email targeting a taskflow_id: process_email_sequence. Included global_configs for the secret, input_mapping, and the taskflow definition accessing ${INPUT.email_resource_id} and ${INPUT.email_thread_id}.
Example 2 (Schedule): Showed a workflow_name: DailyReportingWorkflow with a schedule trigger (cron_expression: "0 0 * * *") targeting taskflow_id: generate_and_send_report. The taskflow fetched data, formatted it, and emailed it.

(Refer to the previous message for the full YAML code of these examples). 4. Integration Explanation:

This unified structure requires changes to the SLOP server to parse the new format.
The server would manage triggers defined in the YAML, potentially replacing the need for the separate /api/config/event-triggers API for these workflows.
The server would handle routing webhooks or schedules to the correct target_taskflow defined in the same file.
The server would manage input_mapping to provide the correct ${INPUT} context to the taskflow.
The core task execution logic within slop-client (or a server-side worker) remains relevant for running the actual tasks, subflows, and conditions.

This completes the task based on your refined requirements and the adapted plan. I have provided the design, detailed schema, examples, and integration context for the unified workflow YAML structure via these messages. Please let me know if you have any further questions or need additional details on specific parts!

SLOP Workflow YAML: Taskflow Directive Examples

This section provides detailed examples focusing on the taskflows section within the unified SLOP Workflow YAML structure. It illustrates how to define individual tasks, manage dependencies, pass data between tasks, and use the generic slop_tool_call type, based on the schema outlined previously.

Core Taskflow Structure Recap

Within the main workflow YAML, the taskflows section contains a list of taskflow definitions:

workflow_name: "ExampleWorkflow"
# ... triggers section (if any) ...

taskflows:
  - taskflow_id: "main_process"
    description: "The primary sequence of operations."
    tasks: 
      # Task definitions go here
    subflows: 
      # Subflow definitions go here (optional)
    conditions:
      # Condition definitions go here (optional)

# ... other taskflows ...

Example 1: Simple Sequential Tasks with Data Passing

This example shows two tasks where the second task uses an output from the first.

workflow_name: "SequentialDataPassingExample"
version: "1.0"

taskflows:
  - taskflow_id: "sequential_flow"
    description: "Demonstrates passing data between two sequential tasks."
    tasks:
      - id: "extract_initial_data"
        name: "Extract User Info"
        type: "custom_data_extractor" # Assumes a custom task type exists
        config:
          source_file: "/path/to/input.json"
        # This task is assumed to produce an output map like: {"user_id": "usr_123", "email": "test@example.com"}
        outputs: # Documenting expected outputs is good practice
          - extract_initial_data.user_id
          - extract_initial_data.email
        successors: ["send_welcome_email"]

      - id: "send_welcome_email"
        name: "Send Welcome Email via Tool"
        type: "slop_tool_call" # Using the generic tool caller
        config:
          tool_id: "email_sender_tool" # The ID of the SLOP Server tool
          arguments:
            recipient_email: "${extract_initial_data.email}" # Reference output from the previous task
            subject: "Welcome!"
            body_template: "welcome_template.txt"
            user_identifier: "${extract_initial_data.user_id}" # Reference another output
        dependencies: ["extract_initial_data"] # Explicitly depends on the first task

Explanation:

  1. extract_initial_data: A custom task reads data and makes user_id and email available in the context.
  2. send_welcome_email: This task uses the generic slop_tool_call type.
    • tool_id: Specifies which SLOP Server tool to execute.
    • arguments: Provides the necessary inputs for the email_sender_tool.
    • ${extract_initial_data.email} and ${extract_initial_data.user_id}: These are placeholders. The taskflow engine replaces them at runtime with the actual email and user_id values produced by the extract_initial_data task.
    • dependencies: Ensures this task only runs after extract_initial_data completes successfully.

Example 2: Parallel Tasks and Aggregation (Conceptual)

This example outlines a pattern where two tasks run in parallel, and a third task aggregates their results.

workflow_name: "ParallelProcessingExample"
version: "1.0"

taskflows:
  - taskflow_id: "parallel_flow"
    description: "Demonstrates parallel execution and result aggregation."
    tasks:
      - id: "start_node" # Dummy node to start parallel branches
        type: "no_op" # A task that does nothing, just for structure
        successors: ["process_region_a", "process_region_b"]

      - id: "process_region_a"
        name: "Process Data for Region A"
        type: "slop_tool_call"
        config:
          tool_id: "region_processor"
          arguments:
            region: "A"
        # Assumed output: {"processed_count": 150}
        outputs: ["process_region_a.processed_count"]
        dependencies: ["start_node"]
        successors: ["aggregate_results"]

      - id: "process_region_b"
        name: "Process Data for Region B"
        type: "slop_tool_call"
        config:
          tool_id: "region_processor"
          arguments:
            region: "B"
        # Assumed output: {"processed_count": 210}
        outputs: ["process_region_b.processed_count"]
        dependencies: ["start_node"]
        successors: ["aggregate_results"]

      - id: "aggregate_results"
        name: "Aggregate Regional Results"
        type: "custom_aggregator" # Assumes a custom task type exists
        config:
          # This custom task's implementation needs to know how to find 
          # and sum the outputs from the preceding parallel tasks.
          # Option 1: Explicitly list inputs (less flexible if branches change)
          # input_counts: [
          #   "${process_region_a.processed_count}",
          #   "${process_region_b.processed_count}"
          # ]
          # Option 2: Task logic queries context for outputs from dependencies
          aggregation_source_tasks: ["process_region_a", "process_region_b"]
        # Assumed output: {"total_count": 360}
        outputs: ["aggregate_results.total_count"]
        dependencies: ["process_region_a", "process_region_b"] # Depends on both parallel tasks

Explanation:

  1. start_node: Acts as a starting point for the parallel branches.
  2. process_region_a and process_region_b: These tasks run in parallel (as they both depend only on start_node). They use the same slop_tool_call but with different arguments.
  3. aggregate_results: This task depends on both parallel tasks completing. It uses a custom_aggregator type. The implementation of this custom task needs to be designed to gather the processed_count outputs from its dependencies (process_region_a and process_region_b) and sum them.

Example 3: Conditional Branching

This example shows a simple conditional branch.

workflow_name: "ConditionalBranchingExample"
version: "1.0"

taskflows:
  - taskflow_id: "conditional_flow"
    description: "Demonstrates conditional execution path."
    tasks:
      - id: "check_order_value"
        name: "Check Order Value"
        type: "slop_tool_call"
        config:
          tool_id: "order_service_get_value"
          arguments:
            order_id: "${INPUT.order_id}" # Assuming order_id comes from trigger input
        # Assumed output: {"value": 150.75}
        outputs: ["check_order_value.value"]
        successors: ["value_condition"]

      - id: "process_high_value_order"
        name: "Process High Value Order"
        type: "slop_tool_call"
        config:
          tool_id: "high_value_processor"
          arguments:
            order_id: "${INPUT.order_id}"
            value: "${check_order_value.value}"
        dependencies: ["value_condition"] # Depends on the condition outcome

      - id: "process_standard_order"
        name: "Process Standard Order"
        type: "slop_tool_call"
        config:
          tool_id: "standard_processor"
          arguments:
            order_id: "${INPUT.order_id}"
        dependencies: ["value_condition"] # Depends on the condition outcome

    conditions:
      - id: "value_condition"
        name: "Is Order Value > 100?"
        type: "simple_threshold_evaluator" # Assumes a custom condition evaluator type
        config:
          input_value: "${check_order_value.value}" # Get value from the previous task
          threshold: 100
          operator: ">" # Specify comparison operator
        dependencies: ["check_order_value"]
        # Successors list maps evaluator result index to next task ID
        successors:
          # Index 0: Condition is false (value <= 100)
          - "process_standard_order" 
          # Index 1: Condition is true (value > 100)
          - "process_high_value_order"

Explanation:

  1. check_order_value: Fetches the order value.
  2. value_condition: This is a condition node.
    • type: Specifies the Go logic (simple_threshold_evaluator) used to evaluate the condition.
    • config: Provides the input value (${check_order_value.value}), the threshold (100), and the comparison operator (>).
    • The evaluator function is expected to return an integer index corresponding to the outcome.
    • successors: This list maps the returned index to the next task to execute. If the evaluator returns 0 (false, value <= 100), process_standard_order runs. If it returns 1 (true, value > 100), process_high_value_order runs.
  3. process_high_value_order and process_standard_order: Only one of these will execute, depending on the outcome of value_condition.

These examples illustrate the flexibility of the taskflow YAML structure for defining various workflow patterns using task dependencies, data passing, generic tool calls, and conditional logic.