Skip to content

MFO MindFlight Orchestrator - Detailed Documentation

version: 1.0 This document provides a comprehensive guide to the mfo-orchestrator library, detailing its capabilities, internal mechanisms, and usage patterns with examples, incorporating information from the SLOP server API and taskflow definitions. It is intended for developers looking to interact with the Slop server API and leverage its task orchestration features.

1. Introduction

mfo-orchestrator is a Go library designed as a client for the SLOP server, a platform focused on agentic workflows, task automation, and centralized interaction management. The client facilitates interaction with the server's REST API and integrates a powerful task workflow engine based on go-taskflow. This allows developers to define complex, potentially multi-agent workflows using a declarative YAML format, execute them efficiently via the mfo-orchestrator, and manage related server-side resources like jobs, events, memory, and prompts.

The library design emphasizes flexibility and human readability, particularly through its use of YAML for taskflow definitions and configuration. This aligns with preferences for systems where configurations are easily understood and modifiable by humans, potentially supporting dynamic adjustments in multi-agent orchestration scenarios. Clients delegate external interactions and persistence entirely to the SLOP server, promoting stateless client design.

2. Core Concepts

Understanding the following core components is crucial for effectively using mfo-orchestrator.

2.1. API Client (internal/client, internal/api)

The heart of the library's interaction with the SLOP server is the API client. It encapsulates the logic for making HTTP requests to the server's various REST endpoints defined by the SLOP protocol.

  • Initialization: The client is typically initialized with the server's base URL and authentication credentials (JWT or API Token obtained via POST /auth/login). See internal/client/client.go for initialization patterns.
  • Endpoint Abstraction: Specific API functionalities are abstracted into dedicated modules within internal/api/. These cover key SLOP server capabilities:
    • chat.go: Interacting with conversational endpoints (/api/chat).
    • tool.go: Executing registered server-side tools (/api/tools/:tool_id).
    • memory.go: Managing persistent key-value storage (/api/memory).
    • resource.go: Managing files and other resources (/api/resources).
    • job.go: Managing asynchronous background jobs (/api/jobs).
    • notification.go: Managing user notifications (/api/notifications).
    • event.go / Config: Handling event triggers and configurations (/api/config/event-triggers).
  • Request/Response Handling: The client manages request serialization, response deserialization (often into structs defined in internal/models), authentication headers, and error handling, adhering to the SLOP server rules.

2.2. Authentication (internal/auth)

Secure communication with the SLOP server's private /api/* endpoints is handled by the authentication module. It supports JWT tokens (obtained via /auth/login) or API tokens (managed via /api/tokens), configured during client initialization and automatically included in the Authorization: Bearer <token> header of outgoing requests.

2.3. Taskflow Engine (internal/taskflow, go-taskflow)

mfo-orchestrator leverages the go-taskflow library (github.com/noneback/go-taskflow) for defining, building, and executing task workflows locally within the client application, orchestrating calls to the SLOP server.

  • YAML Definition: Workflows are defined declaratively in YAML files, following the structure outlined in yaml-taskflow_directive.mdc. This format describes tasks, their types (mapping to Go functions), configurations, dependencies, conditional logic, and subflows.
  • SLOP API Task Types: Specific task types are defined (or should be defined) in the mfo-orchestrator's Go code to correspond to SLOP API calls (e.g., slop_api_chat_send, slop_api_resource_get, slop_api_memory_store). Examples are provided in slop_yaml_api_task_definitions.mdc. A generic slop_tool_call type might also exist or be implemented to call arbitrary /api/tools/:tool_id endpoints.
  • Task Registry (internal/taskflow/registry.go): A central registry maps task type strings (from YAML type field) to Go factory functions (taskflow.TaskFuncFactory). These factories create the actual executable task functions, often requiring access to the initialized slopClient instance (likely passed via context or closure) to make API calls.
  • Builder (internal/taskflow/builder.go): The builder takes a parsed FlowDefinition (from YAML) and the TaskRegistry to construct an executable taskflow.TaskFlow graph. It resolves dependencies and links tasks.
  • Executor (go-taskflow.Executor, potentially wrapped in internal/taskflow/executor.go): The executor runs the TaskFlow graph. It manages task scheduling, respects dependencies, handles concurrency, and executes tasks (Go functions making SLOP API calls) potentially in parallel.
  • Context & Data Passing (internal/taskflow/context.go): An execution context is passed through the task execution. This context typically holds the slopClient instance and a map for storing task outputs. Tasks access outputs from previous tasks using the ${producing_task_id.output_key} syntax within their config in the YAML, which the Go task implementation resolves by querying the context map.
  • Persistence (internal/taskflow/persistence): While the SLOP server handles primary persistence (memory, resources, jobs), this package within the client might relate to client-side caching or state management for the workflow execution itself, if needed.

2.4. Data Models (internal/models)

This directory contains the Go struct definitions representing data exchanged with the SLOP API (requests/responses) and used within the taskflow engine (e.g., TaskflowDefinition, TaskDefinition). These align with the structures expected or returned by the SLOP server endpoints.

2.5. Prompt Management (internal/prompt, internal/prompts)

For agentic workflows involving LLMs via the SLOP server's /api/chat endpoint, mfo-orchestrator includes utilities for managing prompts used in ChatRequest bodies:

  • Parsing & Storage: Loading and managing prompt templates, potentially from Markdown files.
  • Templating: Injecting variables (including data passed via ${task_id.output_key}) into prompt templates before sending them in an API call.

2.6. Error Handling and Retry Logic

MFO MindFlight Orchestrator supports robust error handling in workflows via a declarative retry mechanism. This is particularly useful for tasks involving LLMs or external APIs where transient errors or output format issues are common.

2.6.1. Retry Configuration in Workflows

Tasks can specify error_handling logic in their YAML definition, referencing error-handling templates defined in the prompt YAML:

- id: "generate_drafts"
  type: "slop_api_chat_send"
  config:
    include_tools: false
    prompt_template_id: "pujol_email_draft"
    prompt_context:
      email_content: "fetch_resource_content.resource.Content"
      email_analysis_structured: "analyze_email.parsed_content"
    error_handling:
      on_error:
        - type: "retry_with_prompt"
          append_to_system_prompt: "retry_with_prompt"   # References a template in the prompt YAML
          max_retries: 2

2.6.2. Error Handling Templates

Error handling templates are defined in the prompt YAML file under error_handling_templates:

templates:
  pujol_email_draft:
    # ... system_prompt, user_prompt, etc. ...
    error_handling_templates:
      retry_with_prompt: |
        ERREUR DÉTECTÉE: {{ .error }}
        Veuillez corriger le format de la réponse en vous assurant que:
        1. Le champ 'to' est un tableau d'objets avec 'identifier' (obligatoire) et 'display_name' (optionnel)
        2. Le corps ne contient pas de balises <body>
        3. La réponse est générée via l'appel à l'outil 'generate_single_draft_content'

2.6.3. Execution Flow

  1. Task Execution: The workflow engine runs the task (e.g., generate_drafts).
  2. Error Detection: If the LLM output is invalid (e.g., missing required fields), the engine detects the error.
  3. Retry with Augmented Prompt:
  4. The engine checks the error_handling.on_error config
  5. It finds type: retry_with_prompt and append_to_system_prompt: retry_with_prompt
  6. It looks up the retry_with_prompt template in the prompt YAML
  7. The error message is injected into the template (as {{ .error }})
  8. The system prompt for the retry is augmented with this error-handling message
  9. Retry Count: The engine tracks the number of retries and stops after max_retries
  10. Result: If the LLM produces valid output, the workflow continues. If not, the error is surfaced after the final retry

2.6.4. Separation of Concerns

  • Workflow YAML: Only references the error-handling template by name
  • Prompt YAML: Contains the actual error-handling message under error_handling_templates
  • Advantage: This keeps business logic (when/how to retry) in the workflow, and error message content in the prompt file—making both easier to maintain and localize

2.6.5. Variable Mapping

  • The variable error is conventionally used and injected by the workflow engine
  • The error message (e.g., "Missing required field 'to'") is passed as {{ .error }} to the error-handling template
  • This is a convention: the engine ensures it always provides the error variable to the template

2.6.6. Validation

The YAML validator checks that any append_to_system_prompt reference in the workflow matches a template in the prompt YAML, preventing typos and missing error-handling templates.

3. Usage and Examples

3.1. API Client Usage (Go)

The core pattern involves initializing the client and calling its methods corresponding to SLOP API endpoints.

// Example (Conceptual - based on structure and API docs)
package main

import (
    "context"
    "log"
    "gitlab.com/webigniter/slop-client/internal/client"
    "gitlab.com/webigniter/slop-client/internal/auth"
    "gitlab.com/webigniter/slop-client/internal/models"
)

func main() {
    // Assume credentials obtained securely
    serverURL := "http://localhost:8080" // Replace with actual Slop server URL
    apiKey := "YOUR_API_KEY_OR_JWT"

    // Configure authentication
    authProvider := auth.NewStaticTokenProvider(apiKey)


    // Initialize the client
    slopClient, err := client.NewClient(serverURL,nil,authProvider)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    // Example: Retrieve a resource
    resourceID := "some-resource-id"
    resource, err := slopClient.API.GetResource(context.Background(), resourceID)
    if err != nil {
        log.Fatalf("Failed to get resource: %v", err)
    }
    log.Printf("Resource content: %s", resource.Content)

    // Example: Store data in memory
    memReq := models.StoreMemoryRequest{
        Key:        "my_data_key",
        Value:      map[string]interface{}{"info": "some data"},
        ProviderID: "default_memory", // As required by API docs
    }
    _, err = slopClient.API.StoreMemory(context.Background(), memReq)
    if err != nil {
        log.Fatalf("Failed to store memory: %v", err)
    }
    log.Println("Data stored in memory.")
}

3.2. Taskflow Engine Usage (YAML + Go)

The primary way to orchestrate SLOP API calls is by defining workflows in YAML and executing them using the mfo-orchestrator's taskflow engine.

Step 1: Define Workflow in YAML

Create a YAML file (e.g., process_email_workflow.yml) using SLOP API task types.

name: "ProcessIncomingEmail"
description: "Analyzes an email, stores analysis, and tags the thread."
version: "1.0"

tasks:
  - id: "get_email_content"
    name: "Get Email Resource"
    type: "slop_api_resource_get" # Maps to a Go factory
    config:
      resource_id: "${INPUT.resource_id}" # Assume resource_id is provided as input to the flow
    successors:
      - "analyze_email"

  - id: "analyze_email"
    name: "Analyze Email with LLM"
    type: "slop_api_chat_send"
    config:
      request_body:
        messages:
          - role: "system"
            content: "Analyze the following email content and extract the sender, subject, and a brief summary. Respond in JSON format {"sender": "...", "subject": "...", "summary": "..."}."
          - role: "user"
            content: "${get_email_content.resource.content}" # Get content from previous task
        provider_id: "openai" # Example provider
    dependencies: ["get_email_content"]
    successors:
      - "store_analysis"

  - id: "store_analysis"
    name: "Store Analysis in Memory"
    type: "slop_api_memory_store"
    config:
      key: "email_analysis_${INPUT.resource_id}"
      # Assuming the chat response content is valid JSON string
      value: "${analyze_email.response.content}" 
      provider_id: "default_memory"
    dependencies: ["analyze_email"]
    successors:
      - "tag_email_thread" # Assuming we need a separate step to get thread_id

  # Placeholder: Add task to extract thread_id if not available in INPUT or resource metadata
  # - id: "extract_thread_id"
  #   type: "custom_regex_extractor" # Example custom task
  #   config:
  #     input_string: "${get_email_content.resource.metadata.email_thread_id}" # Example path
  #     regex: "..."
  #   dependencies: ["get_email_content"]
  #   successors: ["tag_email_thread"]

  - id: "tag_email_thread"
    name: "Tag Email Thread"
    type: "slop_api_tool_execute" # Using the generic tool executor type
    config:
      tool_id: "unipile_email_tag_thread" # Specific SLOP tool
      arguments:
        tag_name: "ai_analyzed"
        # Assuming thread_id comes from input or another task like extract_thread_id
        thread_id: "${INPUT.thread_id}" 
    dependencies: ["store_analysis"] # Depends on analysis being stored, and thread_id being available

**Step 2: Register Task Types (Go)

In your mfo-orchestrator application, register Go factory functions for each type used in the YAML. These factories often need the slopClient.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "gitlab.com/webigniter/slop-client/internal/client"
    "gitlab.com/webigniter/slop-client/internal/models"
    "gitlab.com/webigniter/slop-client/internal/taskflow"
    // ... other necessary imports
)

// Define a context structure to hold client and outputs
type FlowContext struct {
    SlopClient *client.Client
    Outputs    map[string]map[string]interface{}
    Inputs     map[string]interface{} // For initial inputs like resource_id
    // Add mutex if concurrent tasks write to Outputs
}

// --- Task Factories --- 

func slopApiResourceGetFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
    return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
        return func(ctxInterface interface{}) {
            ctx := ctxInterface.(*FlowContext)
            taskID := "get_email_content" // Assuming task ID is known or passed

            // Resolve config parameters (handle ${...} syntax)
            resourceIDRaw, _ := taskflow.ResolveConfigValue(config["resource_id"], ctx.Outputs, ctx.Inputs)
            resourceID := resourceIDRaw.(string)

            resource, err := slopClient.API.GetResource(context.Background(), resourceID)
            if err != nil {
                // Handle error appropriately (log, set error state in context)
                fmt.Printf("Task %s failed: %v\n", taskID, err)
                return
            }
            // Store output
            taskflow.SetOutput(ctx.Outputs, taskID, "resource", resource)
            fmt.Printf("Task %s completed.\n", taskID)
        }, nil
    }
}

func slopApiChatSendFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
     return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
        return func(ctxInterface interface{}) {
            ctx := ctxInterface.(*FlowContext)
            taskID := "analyze_email"

            // Resolve config (complex: needs deep resolution for request_body)
            resolvedConfig, err := taskflow.ResolveConfigMap(config, ctx.Outputs, ctx.Inputs)
            if err != nil { /* handle error */ return }

            var chatReq models.ChatRequest
            // Use helper to map resolvedConfig["request_body"] to chatReq struct
            // mapstructure.Decode(resolvedConfig["request_body"], &chatReq)

            chatRespMap, err := slopClient.API.SendChat(context.Background(), chatReq) // Assuming SendChat exists
            if err != nil { /* handle error */ return }

            taskflow.SetOutput(ctx.Outputs, taskID, "response", chatRespMap)
            fmt.Printf("Task %s completed.\n", taskID)
        }, nil
    }
}

// ... Implement factories for slop_api_memory_store, slop_api_tool_execute etc. ...
// These factories will parse their specific 'config', resolve ${...} values,
// call the appropriate slopClient.API method, and store results in ctx.Outputs.

func main() {
    // ... Initialize slopClient ...

    registry := taskflow.NewRegistry()
    // Pass slopClient to factories
    registry.RegisterTask("slop_api_resource_get", slopApiResourceGetFactory(slopClient))
    registry.RegisterTask("slop_api_chat_send", slopApiChatSendFactory(slopClient))
    // ... register other factories ...

    // ... next steps: load YAML, build, execute ...
}

Step 3: Load, Build, and Execute (Go)

Load the YAML, create the TaskFlow, prepare the initial context, and run it.

// ... (inside main, after registry setup)

import (
    "log"
    "os"
    "runtime"
    gotaskflow "github.com/noneback/go-taskflow"
    "gopkg.in/yaml.v3"
    // ... other imports
)

    // Load YAML
    yamlData, err := os.ReadFile("process_email_workflow.yml")
    // ... handle error ...

    var flowDef *taskflow.FlowDefinition
    err = yaml.Unmarshal(yamlData, &flowDef)
    // ... handle error ...

    // Create Builder
    builder := taskflow.NewBuilder(registry)

    // Build TaskFlow
    taskFlow, err := builder.BuildFlow(flowDef)
    // ... handle error ...

    // Create Executor
    executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))

    // Prepare Initial Context
    initialContext := &FlowContext{
        SlopClient: slopClient,
        Outputs:    make(map[string]map[string]interface{}),
        Inputs:     map[string]interface{}{ // Example inputs
            "resource_id": "email-resource-123",
            "thread_id":   "unipile-thread-abc",
        },
    }

    // Execute the flow
    log.Println("Executing taskflow...")
    executor.Run(taskFlow, gotaskflow.WithContext(initialContext)).Wait()

    log.Println("Taskflow execution complete.")
    // Access final outputs if needed: initialContext.Outputs

    // Optional: Dump graph
    // taskFlow.Dump(os.Stdout)

(Note: Helper functions like taskflow.ResolveConfigValue, taskflow.ResolveConfigMap, taskflow.SetOutput are assumed to exist within the internal/taskflow package to handle data passing.)

3.3. Variable Resolution, Scoping, and Best Practices

MFO MindFlight Orchestrator workflows use a powerful and predictable variable resolution system to pass data between triggers, taskflows, and tasks. Understanding this system is crucial for authoring robust, maintainable workflows.

3.3.1. Trigger Types and Input Mapping

  • Trigger Types:
  • webhook: Starts a workflow on an incoming HTTP request.
  • schedule: Starts a workflow on a time schedule (cron). Uses config.cron_expression (e.g., "0 0 * * *") to schedule periodic execution. No payload; input_mapping can provide static values. Example:
    triggers:
      - id: nightly_report
        type: schedule
        enabled: true
        config:
          cron_expression: "0 0 * * *"  # Every midnight
        target_taskflow: generate_report
        input_mapping:
          report_type: nightly
    
  • user_input: Allows a user or UI to trigger a workflow via API (POST /api/trigger/:trigger_id). Accepts a JSON payload, which is mapped to the taskflow input using input_mapping. Example:
    triggers:
      - id: manual_email_process
        type: user_input
        enabled: true
        target_taskflow: main_email_processing
        input_mapping:
          email_id: "TRIGGER.payload.email_id"
          user_note: "TRIGGER.payload.note"
    
    To trigger: POST to /api/trigger/manual_email_process with { "email_id": "...", "note": "..." }
  • event: Starts a workflow on an internal event (future/advanced).

  • Input Mapping:

  • Each trigger can define an input_mapping that maps fields from the trigger payload (or static values for schedule) to the INPUT context of the target taskflow.
  • Example (for webhook or user_input):
    triggers:
      - id: webhook_trigger
        type: webhook
        input_mapping:
          user_id: "TRIGGER.payload.user_id"
          data: "TRIGGER.payload.data"
    
  • For schedule triggers, only static values are supported in input_mapping.

Note: - All triggers can use input_mapping to map incoming payload or static values to the taskflow input context. - user_input triggers are accessible via POST /api/trigger/:trigger_id and are suitable for manual or UI-driven workflow starts. - See the SLOP documentation and model comments for more details and examples.

3.3.2. Variable Resolution System and Order

Variables in task configs and prompt contexts are resolved in the following order: 1. Trigger Payload: TRIGGER.payload.field (from the incoming event) 2. Workflow Input: INPUT.variable_name (from trigger input_mapping) 3. Task Outputs: TASKID.output_key (from previous tasks in the same taskflow) 4. If not found: An error is raised and the task fails

This ensures deterministic and predictable data flow.

3.3.3. Variable Scoping and Isolation

  • Each workflow execution (taskflow instance) gets its own isolated context.
  • Variables are scoped to their respective sources:
  • TRIGGER.payload.* — from the trigger event
  • INPUT.* — from the mapped input
  • TASKID.* — from outputs of specific tasks
  • No global state is shared between executions, preventing cross-run conflicts.

3.3.4. Naming Conventions and Best Practices

  • Use lowercase with underscores for variable names (e.g., user_id)
  • Prefix related variables with a common namespace (e.g., user_data.name)
  • Avoid using reserved prefixes (TRIGGER, INPUT, TASKID) for your own variable names
  • Use descriptive names that indicate the data type or purpose

3.3.5. Error Handling in Variable Resolution

  • If a variable cannot be resolved, the system returns a clear error message indicating which variable failed
  • Missing variables in the input mapping or task config will cause the task to fail
  • This helps catch misconfigurations early and ensures workflow reliability

3.3.6. Example: Variable Resolution in a Workflow

triggers:
  - id: webhook_trigger
    type: webhook
    input_mapping:
      user_id: "TRIGGER.payload.user_id"
      data: "TRIGGER.payload.data"
    target_taskflow: "process_data_flow"

taskflows:
  - taskflow_id: "process_data_flow"
    tasks:
      - id: process_data
        type: "custom_processor"
        config:
          user_id: "INPUT.user_id"
          processed_data: "INPUT.data"
        outputs:
          - process_data.result
      - id: store_result
        type: "storage_task"
        config:
          result: "process_data.result"
        dependencies: ["process_data"]

Summary: - Variables are resolved in a strict order: trigger payload → input mapping → task outputs - Each workflow run is isolated - Good naming and error handling practices are enforced

3.4. Command-Line Utilities

  • taskflow-yaml-checker: Validates YAML workflow definitions.
    ./taskflow-yaml-checker -input ./path/to/your/workflow.yml
    
  • taskflow-visualizer: Generates a visual graph (requires Graphviz dot).
    ./taskflow-visualizer -input ./path/to/your/workflow.yml -output workflow.svg
    

3.5. Services (Webhook Trigger)

Incoming webhooks (POST /webhooks/:provider/:webhook_id) are handled by the SLOP server. These typically trigger server-side logic, which might involve starting a predefined SLOP Job (POST /api/jobs) that executes a specific process_id (which could be a taskflow defined in YAML and run by a mfo-orchestrator instance or similar executor within the server environment).

The services/unipile_mailer_webhook example in mfo-orchestrator likely shows how a client-side application could listen for webhooks and then use the taskflow engine, but according to SLOP server rules, this responsibility is ideally centralized on the server.

4. Handling Notifications and Webhooks within Taskflows

Taskflows executed by mfo-orchestrator can interact with SLOP's notification and event systems:

  • Sending User Notifications: A task (e.g., slop_api_notification_create type mapping to POST /api/notifications) can be added to the YAML workflow to create a notification for a user upon reaching a certain stage.

    - id: "notify_user_completion"
      type: "slop_api_notification_create" # Assumes this type exists
      config:
        # Request body for POST /api/notifications
        user_id: "${INPUT.user_id}" # Or retrieved from context
        title: "Email Processing Complete"
        message: "Analysis for resource ${INPUT.resource_id} stored."
        level: "info"
      dependencies: ["store_analysis"]
    

  • Dynamically Configuring Server-Side Triggers (e.g., for Webhooks): The SLOP server allows configuring triggers (/api/config/event-triggers) that link server events (like job.completed, resource.created) to actions (like starting another job or calling an external webhook URL). A taskflow can dynamically create or update these triggers.

    - id: "setup_completion_webhook"
      type: "slop_api_event_trigger_create" # Assumes this type exists, mapping to POST /api/config/event-triggers
      config:
        # Request body for the trigger endpoint
        event_type: "job.completed" # Or another relevant event
        filter: "job.id == ${SYSTEM.JOB_ID}" # Filter for the current job completion
        action_type: "webhook"
        action_config:
          url: "https://my-external-service.com/webhook-receiver"
          method: "POST"
          headers: { "Content-Type": "application/json" }
          body_template: '{"jobId": "${event.job.id}", "status": "${event.job.status}", "result": "${retrieve_job_result.output}"}'
      dependencies: ["start_main_processing"] # Run early in the flow
      # Note: Requires a task 'retrieve_job_result' if result needed in webhook body
    
    This task configures the SLOP server: "When this job completes, send a POST request to the specified URL". The webhook call itself is handled by the server later, not directly by the taskflow.

  • Calling External Webhooks Directly from Taskflow: If the SLOP server doesn't provide a suitable trigger or tool, you might need to:

    1. Create a Custom Go Task Type: Implement a task factory in mfo-orchestrator (e.g., httpClientPostFactory) that uses Go's net/http client to make the webhook call directly. Register this type.
    2. Use a Generic SLOP Tool: If the SLOP server has a generic tool (e.g., system_http_request) exposed via /api/tools/system_http_request, use the slop_api_tool_execute task type in your YAML to call it.

5. Advanced Topics & Considerations

  • Custom Task Implementation: Define custom Go task factories for logic not covered by standard SLOP API calls (e.g., complex data transformation, custom branching logic).
  • Error Handling: Implement error handling within task functions (checking API call errors) and potentially define conditional paths or specific error-handling tasks in the YAML workflow.
  • State Management: Rely primarily on the SLOP server's Memory (/api/memory) and Resource (/api/resources) endpoints for state persistence between taskflow runs or across different workflows.
  • Configuration: Manage SLOP server URLs, API keys, and taskflow configurations securely.
  • Multi-Agent Orchestration: Use taskflows to define the sequence of actions for an agent. Different agents might be represented by different taskflows or custom tasks within a larger flow. State sharing happens via the SLOP server's memory/resource APIs.

6. Conclusion

mfo-orchestrator provides a powerful way to orchestrate interactions with a SLOP server using declarative YAML workflows executed via the go-taskflow engine. By defining tasks that map to SLOP API calls and leveraging the server's centralized capabilities for persistence, tool execution, and event handling, developers can build complex, robust, and maintainable automation and agentic applications.

7. References

2.3.1. Detailed YAML Taskflow Structure

SLOP Client YAML Taskflow Directives (Updated)

This document defines the schema and conventions for defining taskflows in YAML files within the SLOP Client project, utilizing the internal/taskflow package which leverages github.com/noneback/go-taskflow.

Core Principles

  1. YAML as Definition: The YAML file is the primary definition of the workflow, including task sequence, dependencies, parameters, subflows, and conditional logic.
  2. Explicit Data Flow: Data dependencies between tasks are explicitly defined using a specific output referencing syntax (${producing_task_id.output_key}).
  3. Generic Tool Execution: A generic task type (slop_tool_call or slop_api_tool_execute) allows executing arbitrary SLOP Server tools defined in the YAML.
  4. Custom Logic Tasks: Specific Go task factories are used for logic that doesn't map directly to a single SLOP Server tool call (e.g., data parsing, complex branching logic, custom condition evaluation).
  5. Shared Output Context: A runtime context stores outputs from completed tasks, enabling subsequent tasks and conditions to access them.
  6. Modularity: Subflows allow grouping related tasks, and conditions enable dynamic branching based on runtime data.

YAML File Structure

A taskflow YAML file must have the following top-level structure:

name: "UniqueFlowName"             # Required: Identifier for the taskflow.
description: "Human-readable description." # Optional: Purpose of the flow.
version: "1.0"                   # Optional: Version of the flow definition.

config:                          # Optional: Global configuration for the flow execution.
  max_concurrency: 5             # Optional: Hint for executor concurrency.
  timeout: "5m"                  # Optional: Overall flow timeout hint (implementation status may vary).

tasks:                           # Required: List of individual task definitions.
  - id: "task_id_1"
    # ... task fields ...
  - id: "task_id_2"
    # ... task fields ...

subflows:                        # Optional: List of subflow definitions (groups of tasks).
  - id: "subflow_id_1"
    # ... subflow fields ...

conditions:                      # Optional: List of condition definitions (branching points).
  - id: "condition_id_1"
    # ... condition fields ...

Task Definition (tasks[])

Each item in the tasks list defines a single unit of work.

- id: "unique_task_id"          # Required: Unique identifier for this task within the flow (or subflow).
  name: "Descriptive Task Name" # Optional: Human-readable name.
  description: "What this task does." # Optional: Longer description.
  type: "task_type_name"        # Required: Maps to a registered TaskFactory in Go.
  config:                       # Optional: Configuration passed to the TaskFactory/TaskFunc.
    key1: value1
    key2: "${previous_task_id.output_key}" # Example data reference
    # Structure depends on the 'type'
  dependencies:                 # Optional: List of task/subflow/condition IDs that must complete before this task starts.
    - "dependency_id_1"
    - "dependency_id_2"
  successors:                   # Optional: List of task/subflow/condition IDs that depend on this task's completion.
    - "successor_id_1"
  priority: "normal"            # Optional: Execution priority ("low", "normal", "high"). Defaults to "normal".

Generic Task Type: slop_tool_call / slop_api_tool_execute

This special task type executes a specified SLOP Server tool.

- id: "call_some_tool"
  type: "slop_api_tool_execute" # Or slop_tool_call
  config:
    tool_id: "specific_slop_tool_endpoint" # Required: The SLOP Server tool ID (e.g., "unipile_email_tag_thread").
    arguments:                             # Required: Arguments for the tool.
      arg_name_1: "literal_value"
      arg_name_2: ${task_id_producing_value.output_key} # Reference output
      # ... other arguments required by the tool
  dependencies: ["task_id_producing_value"]
  outputs: # Document expected outputs for clarity
    - call_some_tool.result # Example output key

Data Passing Convention

  • Output Storage: Tasks write their results to a shared execution context (e.g., map[taskID][outputKey] = value). Output keys should be documented or standardized for each task type.
  • Output Referencing: Within a task's or condition's config map, values matching the pattern ${producing_task_id.output_key} will be dynamically replaced at runtime with the corresponding value from the shared context.
  • Resolution: The Go task or condition implementation is responsible for parsing the config, detecting the ${...} syntax, and retrieving the value from the context.
  • Literals: Any config value not matching the ${...} pattern is treated as a literal value (string, number, boolean, etc.).

Example Data Flow

tasks:
  - id: "extract_data"
    type: "custom_extractor" # Assumes this task outputs {"user_id": 123, "order_id": "abc"}
    config:
      source: "input_payload.json"
    successors: ["process_user", "process_order"]
    outputs:
      - extract_data.user_id
      - extract_data.order_id

  - id: "process_user"
    type: "slop_api_tool_execute"
    config:
      tool_id: "user_service_update_profile"
      arguments:
        user_identifier: ${extract_data.user_id} # Get user_id from extract_data output
        new_status: "processed"
    dependencies: ["extract_data"]

  - id: "process_order"
    type: "slop_api_tool_execute"
    config:
      tool_id: "order_service_fetch_details"
      arguments:
        order_ref: ${extract_data.order_id} # Get order_id from extract_data output
    dependencies: ["extract_data"]

Subflow Definition (subflows[])

Subflows encapsulate a group of related tasks, allowing for modular, reusable workflow components. Subflows are defined in the subflows: section and use only pure YAML.

subflows:
  - id: "process_order"
    name: "Process Order"
    description: "Fetch and process order data"
    input_schema:  # (Optional) Validate input before subflow execution
      required: ["order_id"]
      properties:
        order_id:
          type: string
    tasks:
      - id: "fetch"
        type: "fetch_data"
        config:
          order_id: "${INPUT.order_id}"
      - id: "process"
        type: "process_data"
        dependencies: ["fetch"]
        config:
          data: "${fetch.result}"
    dependencies: ["external_task"]
    successors: ["next_task"]

Key Points: - All config values are YAML, not JSON strings. - Inputs and outputs are referenced using ${...} syntax. - If input_schema is provided, the engine validates the input map before invoking the subflow. - Outputs from subflow tasks are accessible to downstream tasks by their task ID and output key.

Example: Subflow with Input Validation

subflows:
  - id: "user_registration"
    input_schema:
      required: ["email", "password"]
      properties:
        email:
          type: string
        password:
          type: string
    tasks:
      - id: "validate_email"
        type: "email_validator"
        config:
          email: "${INPUT.email}"
      - id: "create_user"
        type: "user_creator"
        dependencies: ["validate_email"]
        config:
          email: "${INPUT.email}"
          password: "${INPUT.password}"

Condition Definition (conditions[])

Conditions enable dynamic branching in your workflow based on runtime data. They are defined in the conditions: section of your YAML and use a pure YAML structure—never embed JSON.

conditions:
  - id: "check_threshold"
    name: "Check if value exceeds threshold"
    description: "Branch if input_value is greater than 10 and status is active"
    type: "expr"  # Uses antonmedv/expr for expression evaluation
    config:
      expression: "input_value > 10 && status == 'active'"
      vars:
        input_value: 42
        status: "active"
    dependencies: ["some_task"]
    successors: ["branch_0_task", "branch_1_task"]

Key Points: - expression is a string, not a JSON object. - vars is a YAML map, not a JSON string. - All config values are pure YAML. - The return value of the condition function (as an integer) selects the branch (0-based index) from successors.

Variable Resolution Order

  1. vars from the config (static, highest precedence)
  2. Context payload (INPUT)
  3. Outputs from previous tasks (taskID.key)
  4. If not found, the variable is undefined and may cause an error.

Example: Using Context and Task Outputs

conditions:
  - id: "should_retry"
    type: "expr"
    config:
      expression: "attempts < max_attempts && last_result == 'error'"
      vars:
        max_attempts: 3
      # 'attempts' and 'last_result' can come from previous tasks or INPUT
    dependencies: ["run_task"]
    successors: ["retry_task", "end_flow"]

Best Practices

  • Never embed JSON: All config, schemas, and data structures must be valid YAML.
  • Use ${...} references for all dynamic data passing.
  • Document input and output keys for each task and subflow.
  • Validate inputs using input_schema for subflows when possible.
  • Keep variable names descriptive and avoid reserved prefixes unless required.

Example: Full Workflow with Conditions and Subflows

name: "Order Processing Flow"
tasks:
  - id: "start"
    type: "trigger"
    config:
      order_id: "${INPUT.order_id}"
    successors: ["order_check"]

conditions:
  - id: "order_check"
    type: "expr"
    config:
      expression: "order_id != ''"
    dependencies: ["start"]
    successors: ["process_order", "fail"]

subflows:
  - id: "process_order"
    input_schema:
      required: ["order_id"]
      properties:
        order_id:
          type: string
    tasks:
      - id: "fetch"
        type: "fetch_data"
        config:
          order_id: "${INPUT.order_id}"
      - id: "process"
        type: "process_data"
        dependencies: ["fetch"]
        config:
          data: "${fetch.result}"
    dependencies: ["order_check"]
    successors: ["end"]

tasks:
  - id: "fail"
    type: "notify_failure"
    config:
      reason: "Order ID missing"
    dependencies: ["order_check"]

  - id: "end"
    type: "finalize"
    dependencies: ["process_order", "fail"]