MFO MindFlight Orchestrator - Detailed Documentation¶
version: 1.0 This document provides a comprehensive guide to the mfo-orchestrator
library, detailing its capabilities, internal mechanisms, and usage patterns with examples, incorporating information from the SLOP server API and taskflow definitions. It is intended for developers looking to interact with the Slop server API and leverage its task orchestration features.
1. Introduction¶
mfo-orchestrator
is a Go library designed as a client for the SLOP server, a platform focused on agentic workflows, task automation, and centralized interaction management. The client facilitates interaction with the server's REST API and integrates a powerful task workflow engine based on go-taskflow
. This allows developers to define complex, potentially multi-agent workflows using a declarative YAML format, execute them efficiently via the mfo-orchestrator
, and manage related server-side resources like jobs, events, memory, and prompts.
The library design emphasizes flexibility and human readability, particularly through its use of YAML for taskflow definitions and configuration. This aligns with preferences for systems where configurations are easily understood and modifiable by humans, potentially supporting dynamic adjustments in multi-agent orchestration scenarios. Clients delegate external interactions and persistence entirely to the SLOP server, promoting stateless client design.
2. Core Concepts¶
Understanding the following core components is crucial for effectively using mfo-orchestrator
.
2.1. API Client (internal/client
, internal/api
)¶
The heart of the library's interaction with the SLOP server is the API client. It encapsulates the logic for making HTTP requests to the server's various REST endpoints defined by the SLOP protocol.
- Initialization: The client is typically initialized with the server's base URL and authentication credentials (JWT or API Token obtained via
POST /auth/login
). Seeinternal/client/client.go
for initialization patterns. - Endpoint Abstraction: Specific API functionalities are abstracted into dedicated modules within
internal/api/
. These cover key SLOP server capabilities:chat.go
: Interacting with conversational endpoints (/api/chat
).tool.go
: Executing registered server-side tools (/api/tools/:tool_id
).memory.go
: Managing persistent key-value storage (/api/memory
).resource.go
: Managing files and other resources (/api/resources
).job.go
: Managing asynchronous background jobs (/api/jobs
).notification.go
: Managing user notifications (/api/notifications
).event.go
/ Config: Handling event triggers and configurations (/api/config/event-triggers
).
- Request/Response Handling: The client manages request serialization, response deserialization (often into structs defined in
internal/models
), authentication headers, and error handling, adhering to the SLOP server rules.
2.2. Authentication (internal/auth
)¶
Secure communication with the SLOP server's private /api/*
endpoints is handled by the authentication module. It supports JWT tokens (obtained via /auth/login
) or API tokens (managed via /api/tokens
), configured during client initialization and automatically included in the Authorization: Bearer <token>
header of outgoing requests.
2.3. Taskflow Engine (internal/taskflow
, go-taskflow
)¶
mfo-orchestrator
leverages the go-taskflow
library (github.com/noneback/go-taskflow) for defining, building, and executing task workflows locally within the client application, orchestrating calls to the SLOP server.
- YAML Definition: Workflows are defined declaratively in YAML files, following the structure outlined in
yaml-taskflow_directive.mdc
. This format describes tasks, their types (mapping to Go functions), configurations, dependencies, conditional logic, and subflows. - SLOP API Task Types: Specific task types are defined (or should be defined) in the
mfo-orchestrator
's Go code to correspond to SLOP API calls (e.g.,slop_api_chat_send
,slop_api_resource_get
,slop_api_memory_store
). Examples are provided inslop_yaml_api_task_definitions.mdc
. A genericslop_tool_call
type might also exist or be implemented to call arbitrary/api/tools/:tool_id
endpoints. - Task Registry (
internal/taskflow/registry.go
): A central registry maps task type strings (from YAMLtype
field) to Go factory functions (taskflow.TaskFuncFactory
). These factories create the actual executable task functions, often requiring access to the initializedslopClient
instance (likely passed via context or closure) to make API calls. - Builder (
internal/taskflow/builder.go
): The builder takes a parsedFlowDefinition
(from YAML) and theTaskRegistry
to construct an executabletaskflow.TaskFlow
graph. It resolves dependencies and links tasks. - Executor (
go-taskflow.Executor
, potentially wrapped ininternal/taskflow/executor.go
): The executor runs theTaskFlow
graph. It manages task scheduling, respects dependencies, handles concurrency, and executes tasks (Go functions making SLOP API calls) potentially in parallel. - Context & Data Passing (
internal/taskflow/context.go
): An execution context is passed through the task execution. This context typically holds theslopClient
instance and a map for storing task outputs. Tasks access outputs from previous tasks using the${producing_task_id.output_key}
syntax within theirconfig
in the YAML, which the Go task implementation resolves by querying the context map. - Persistence (
internal/taskflow/persistence
): While the SLOP server handles primary persistence (memory, resources, jobs), this package within the client might relate to client-side caching or state management for the workflow execution itself, if needed.
2.4. Data Models (internal/models
)¶
This directory contains the Go struct definitions representing data exchanged with the SLOP API (requests/responses) and used within the taskflow engine (e.g., TaskflowDefinition
, TaskDefinition
). These align with the structures expected or returned by the SLOP server endpoints.
2.5. Prompt Management (internal/prompt
, internal/prompts
)¶
For agentic workflows involving LLMs via the SLOP server's /api/chat
endpoint, mfo-orchestrator
includes utilities for managing prompts used in ChatRequest
bodies:
- Parsing & Storage: Loading and managing prompt templates, potentially from Markdown files.
- Templating: Injecting variables (including data passed via
${task_id.output_key}
) into prompt templates before sending them in an API call.
2.6. Error Handling and Retry Logic¶
MFO MindFlight Orchestrator supports robust error handling in workflows via a declarative retry mechanism. This is particularly useful for tasks involving LLMs or external APIs where transient errors or output format issues are common.
2.6.1. Retry Configuration in Workflows¶
Tasks can specify error_handling
logic in their YAML definition, referencing error-handling templates defined in the prompt YAML:
- id: "generate_drafts"
type: "slop_api_chat_send"
config:
include_tools: false
prompt_template_id: "pujol_email_draft"
prompt_context:
email_content: "fetch_resource_content.resource.Content"
email_analysis_structured: "analyze_email.parsed_content"
error_handling:
on_error:
- type: "retry_with_prompt"
append_to_system_prompt: "retry_with_prompt" # References a template in the prompt YAML
max_retries: 2
2.6.2. Error Handling Templates¶
Error handling templates are defined in the prompt YAML file under error_handling_templates
:
templates:
pujol_email_draft:
# ... system_prompt, user_prompt, etc. ...
error_handling_templates:
retry_with_prompt: |
ERREUR DÉTECTÉE: {{ .error }}
Veuillez corriger le format de la réponse en vous assurant que:
1. Le champ 'to' est un tableau d'objets avec 'identifier' (obligatoire) et 'display_name' (optionnel)
2. Le corps ne contient pas de balises <body>
3. La réponse est générée via l'appel à l'outil 'generate_single_draft_content'
2.6.3. Execution Flow¶
- Task Execution: The workflow engine runs the task (e.g.,
generate_drafts
). - Error Detection: If the LLM output is invalid (e.g., missing required fields), the engine detects the error.
- Retry with Augmented Prompt:
- The engine checks the
error_handling.on_error
config - It finds
type: retry_with_prompt
andappend_to_system_prompt: retry_with_prompt
- It looks up the
retry_with_prompt
template in the prompt YAML - The error message is injected into the template (as
{{ .error }}
) - The system prompt for the retry is augmented with this error-handling message
- Retry Count: The engine tracks the number of retries and stops after
max_retries
- Result: If the LLM produces valid output, the workflow continues. If not, the error is surfaced after the final retry
2.6.4. Separation of Concerns¶
- Workflow YAML: Only references the error-handling template by name
- Prompt YAML: Contains the actual error-handling message under
error_handling_templates
- Advantage: This keeps business logic (when/how to retry) in the workflow, and error message content in the prompt file—making both easier to maintain and localize
2.6.5. Variable Mapping¶
- The variable
error
is conventionally used and injected by the workflow engine - The error message (e.g., "Missing required field 'to'") is passed as
{{ .error }}
to the error-handling template - This is a convention: the engine ensures it always provides the
error
variable to the template
2.6.6. Validation¶
The YAML validator checks that any append_to_system_prompt
reference in the workflow matches a template in the prompt YAML, preventing typos and missing error-handling templates.
3. Usage and Examples¶
3.1. API Client Usage (Go)¶
The core pattern involves initializing the client and calling its methods corresponding to SLOP API endpoints.
// Example (Conceptual - based on structure and API docs)
package main
import (
"context"
"log"
"gitlab.com/webigniter/slop-client/internal/client"
"gitlab.com/webigniter/slop-client/internal/auth"
"gitlab.com/webigniter/slop-client/internal/models"
)
func main() {
// Assume credentials obtained securely
serverURL := "http://localhost:8080" // Replace with actual Slop server URL
apiKey := "YOUR_API_KEY_OR_JWT"
// Configure authentication
authProvider := auth.NewStaticTokenProvider(apiKey)
// Initialize the client
slopClient, err := client.NewClient(serverURL,nil,authProvider)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
// Example: Retrieve a resource
resourceID := "some-resource-id"
resource, err := slopClient.API.GetResource(context.Background(), resourceID)
if err != nil {
log.Fatalf("Failed to get resource: %v", err)
}
log.Printf("Resource content: %s", resource.Content)
// Example: Store data in memory
memReq := models.StoreMemoryRequest{
Key: "my_data_key",
Value: map[string]interface{}{"info": "some data"},
ProviderID: "default_memory", // As required by API docs
}
_, err = slopClient.API.StoreMemory(context.Background(), memReq)
if err != nil {
log.Fatalf("Failed to store memory: %v", err)
}
log.Println("Data stored in memory.")
}
3.2. Taskflow Engine Usage (YAML + Go)¶
The primary way to orchestrate SLOP API calls is by defining workflows in YAML and executing them using the mfo-orchestrator
's taskflow engine.
Step 1: Define Workflow in YAML
Create a YAML file (e.g., process_email_workflow.yml
) using SLOP API task types.
name: "ProcessIncomingEmail"
description: "Analyzes an email, stores analysis, and tags the thread."
version: "1.0"
tasks:
- id: "get_email_content"
name: "Get Email Resource"
type: "slop_api_resource_get" # Maps to a Go factory
config:
resource_id: "${INPUT.resource_id}" # Assume resource_id is provided as input to the flow
successors:
- "analyze_email"
- id: "analyze_email"
name: "Analyze Email with LLM"
type: "slop_api_chat_send"
config:
request_body:
messages:
- role: "system"
content: "Analyze the following email content and extract the sender, subject, and a brief summary. Respond in JSON format {"sender": "...", "subject": "...", "summary": "..."}."
- role: "user"
content: "${get_email_content.resource.content}" # Get content from previous task
provider_id: "openai" # Example provider
dependencies: ["get_email_content"]
successors:
- "store_analysis"
- id: "store_analysis"
name: "Store Analysis in Memory"
type: "slop_api_memory_store"
config:
key: "email_analysis_${INPUT.resource_id}"
# Assuming the chat response content is valid JSON string
value: "${analyze_email.response.content}"
provider_id: "default_memory"
dependencies: ["analyze_email"]
successors:
- "tag_email_thread" # Assuming we need a separate step to get thread_id
# Placeholder: Add task to extract thread_id if not available in INPUT or resource metadata
# - id: "extract_thread_id"
# type: "custom_regex_extractor" # Example custom task
# config:
# input_string: "${get_email_content.resource.metadata.email_thread_id}" # Example path
# regex: "..."
# dependencies: ["get_email_content"]
# successors: ["tag_email_thread"]
- id: "tag_email_thread"
name: "Tag Email Thread"
type: "slop_api_tool_execute" # Using the generic tool executor type
config:
tool_id: "unipile_email_tag_thread" # Specific SLOP tool
arguments:
tag_name: "ai_analyzed"
# Assuming thread_id comes from input or another task like extract_thread_id
thread_id: "${INPUT.thread_id}"
dependencies: ["store_analysis"] # Depends on analysis being stored, and thread_id being available
**Step 2: Register Task Types (Go)
In your mfo-orchestrator
application, register Go factory functions for each type
used in the YAML. These factories often need the slopClient
.
package main
import (
"context"
"encoding/json"
"fmt"
"gitlab.com/webigniter/slop-client/internal/client"
"gitlab.com/webigniter/slop-client/internal/models"
"gitlab.com/webigniter/slop-client/internal/taskflow"
// ... other necessary imports
)
// Define a context structure to hold client and outputs
type FlowContext struct {
SlopClient *client.Client
Outputs map[string]map[string]interface{}
Inputs map[string]interface{} // For initial inputs like resource_id
// Add mutex if concurrent tasks write to Outputs
}
// --- Task Factories ---
func slopApiResourceGetFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
return func(ctxInterface interface{}) {
ctx := ctxInterface.(*FlowContext)
taskID := "get_email_content" // Assuming task ID is known or passed
// Resolve config parameters (handle ${...} syntax)
resourceIDRaw, _ := taskflow.ResolveConfigValue(config["resource_id"], ctx.Outputs, ctx.Inputs)
resourceID := resourceIDRaw.(string)
resource, err := slopClient.API.GetResource(context.Background(), resourceID)
if err != nil {
// Handle error appropriately (log, set error state in context)
fmt.Printf("Task %s failed: %v\n", taskID, err)
return
}
// Store output
taskflow.SetOutput(ctx.Outputs, taskID, "resource", resource)
fmt.Printf("Task %s completed.\n", taskID)
}, nil
}
}
func slopApiChatSendFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
return func(ctxInterface interface{}) {
ctx := ctxInterface.(*FlowContext)
taskID := "analyze_email"
// Resolve config (complex: needs deep resolution for request_body)
resolvedConfig, err := taskflow.ResolveConfigMap(config, ctx.Outputs, ctx.Inputs)
if err != nil { /* handle error */ return }
var chatReq models.ChatRequest
// Use helper to map resolvedConfig["request_body"] to chatReq struct
// mapstructure.Decode(resolvedConfig["request_body"], &chatReq)
chatRespMap, err := slopClient.API.SendChat(context.Background(), chatReq) // Assuming SendChat exists
if err != nil { /* handle error */ return }
taskflow.SetOutput(ctx.Outputs, taskID, "response", chatRespMap)
fmt.Printf("Task %s completed.\n", taskID)
}, nil
}
}
// ... Implement factories for slop_api_memory_store, slop_api_tool_execute etc. ...
// These factories will parse their specific 'config', resolve ${...} values,
// call the appropriate slopClient.API method, and store results in ctx.Outputs.
func main() {
// ... Initialize slopClient ...
registry := taskflow.NewRegistry()
// Pass slopClient to factories
registry.RegisterTask("slop_api_resource_get", slopApiResourceGetFactory(slopClient))
registry.RegisterTask("slop_api_chat_send", slopApiChatSendFactory(slopClient))
// ... register other factories ...
// ... next steps: load YAML, build, execute ...
}
Step 3: Load, Build, and Execute (Go)
Load the YAML, create the TaskFlow
, prepare the initial context, and run it.
// ... (inside main, after registry setup)
import (
"log"
"os"
"runtime"
gotaskflow "github.com/noneback/go-taskflow"
"gopkg.in/yaml.v3"
// ... other imports
)
// Load YAML
yamlData, err := os.ReadFile("process_email_workflow.yml")
// ... handle error ...
var flowDef *taskflow.FlowDefinition
err = yaml.Unmarshal(yamlData, &flowDef)
// ... handle error ...
// Create Builder
builder := taskflow.NewBuilder(registry)
// Build TaskFlow
taskFlow, err := builder.BuildFlow(flowDef)
// ... handle error ...
// Create Executor
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
// Prepare Initial Context
initialContext := &FlowContext{
SlopClient: slopClient,
Outputs: make(map[string]map[string]interface{}),
Inputs: map[string]interface{}{ // Example inputs
"resource_id": "email-resource-123",
"thread_id": "unipile-thread-abc",
},
}
// Execute the flow
log.Println("Executing taskflow...")
executor.Run(taskFlow, gotaskflow.WithContext(initialContext)).Wait()
log.Println("Taskflow execution complete.")
// Access final outputs if needed: initialContext.Outputs
// Optional: Dump graph
// taskFlow.Dump(os.Stdout)
(Note: Helper functions like taskflow.ResolveConfigValue
, taskflow.ResolveConfigMap
, taskflow.SetOutput
are assumed to exist within the internal/taskflow
package to handle data passing.)
3.3. Variable Resolution, Scoping, and Best Practices¶
MFO MindFlight Orchestrator workflows use a powerful and predictable variable resolution system to pass data between triggers, taskflows, and tasks. Understanding this system is crucial for authoring robust, maintainable workflows.
3.3.1. Trigger Types and Input Mapping¶
- Trigger Types:
webhook
: Starts a workflow on an incoming HTTP request.schedule
: Starts a workflow on a time schedule (cron). Usesconfig.cron_expression
(e.g.,"0 0 * * *"
) to schedule periodic execution. No payload;input_mapping
can provide static values. Example:user_input
: Allows a user or UI to trigger a workflow via API (POST/api/trigger/:trigger_id
). Accepts a JSON payload, which is mapped to the taskflow input usinginput_mapping
. Example:To trigger: POST totriggers: - id: manual_email_process type: user_input enabled: true target_taskflow: main_email_processing input_mapping: email_id: "TRIGGER.payload.email_id" user_note: "TRIGGER.payload.note"
/api/trigger/manual_email_process
with{ "email_id": "...", "note": "..." }
-
event
: Starts a workflow on an internal event (future/advanced). -
Input Mapping:
- Each trigger can define an
input_mapping
that maps fields from the trigger payload (or static values for schedule) to theINPUT
context of the target taskflow. - Example (for webhook or user_input):
- For schedule triggers, only static values are supported in
input_mapping
.
Note: - All triggers can use input_mapping
to map incoming payload or static values to the taskflow input context. - user_input
triggers are accessible via POST /api/trigger/:trigger_id
and are suitable for manual or UI-driven workflow starts. - See the SLOP documentation and model comments for more details and examples.
3.3.2. Variable Resolution System and Order¶
Variables in task configs and prompt contexts are resolved in the following order: 1. Trigger Payload: TRIGGER.payload.field
(from the incoming event) 2. Workflow Input: INPUT.variable_name
(from trigger input_mapping) 3. Task Outputs: TASKID.output_key
(from previous tasks in the same taskflow) 4. If not found: An error is raised and the task fails
This ensures deterministic and predictable data flow.
3.3.3. Variable Scoping and Isolation¶
- Each workflow execution (taskflow instance) gets its own isolated context.
- Variables are scoped to their respective sources:
TRIGGER.payload.*
— from the trigger eventINPUT.*
— from the mapped inputTASKID.*
— from outputs of specific tasks- No global state is shared between executions, preventing cross-run conflicts.
3.3.4. Naming Conventions and Best Practices¶
- Use lowercase with underscores for variable names (e.g.,
user_id
) - Prefix related variables with a common namespace (e.g.,
user_data.name
) - Avoid using reserved prefixes (
TRIGGER
,INPUT
,TASKID
) for your own variable names - Use descriptive names that indicate the data type or purpose
3.3.5. Error Handling in Variable Resolution¶
- If a variable cannot be resolved, the system returns a clear error message indicating which variable failed
- Missing variables in the input mapping or task config will cause the task to fail
- This helps catch misconfigurations early and ensures workflow reliability
3.3.6. Example: Variable Resolution in a Workflow¶
triggers:
- id: webhook_trigger
type: webhook
input_mapping:
user_id: "TRIGGER.payload.user_id"
data: "TRIGGER.payload.data"
target_taskflow: "process_data_flow"
taskflows:
- taskflow_id: "process_data_flow"
tasks:
- id: process_data
type: "custom_processor"
config:
user_id: "INPUT.user_id"
processed_data: "INPUT.data"
outputs:
- process_data.result
- id: store_result
type: "storage_task"
config:
result: "process_data.result"
dependencies: ["process_data"]
Summary: - Variables are resolved in a strict order: trigger payload → input mapping → task outputs - Each workflow run is isolated - Good naming and error handling practices are enforced
3.4. Command-Line Utilities¶
taskflow-yaml-checker
: Validates YAML workflow definitions.taskflow-visualizer
: Generates a visual graph (requires Graphvizdot
).
3.5. Services (Webhook Trigger)¶
Incoming webhooks (POST /webhooks/:provider/:webhook_id
) are handled by the SLOP server. These typically trigger server-side logic, which might involve starting a predefined SLOP Job (POST /api/jobs
) that executes a specific process_id
(which could be a taskflow defined in YAML and run by a mfo-orchestrator
instance or similar executor within the server environment).
The services/unipile_mailer_webhook
example in mfo-orchestrator
likely shows how a client-side application could listen for webhooks and then use the taskflow engine, but according to SLOP server rules, this responsibility is ideally centralized on the server.
4. Handling Notifications and Webhooks within Taskflows¶
Taskflows executed by mfo-orchestrator
can interact with SLOP's notification and event systems:
-
Sending User Notifications: A task (e.g.,
slop_api_notification_create
type mapping toPOST /api/notifications
) can be added to the YAML workflow to create a notification for a user upon reaching a certain stage.- id: "notify_user_completion" type: "slop_api_notification_create" # Assumes this type exists config: # Request body for POST /api/notifications user_id: "${INPUT.user_id}" # Or retrieved from context title: "Email Processing Complete" message: "Analysis for resource ${INPUT.resource_id} stored." level: "info" dependencies: ["store_analysis"]
-
Dynamically Configuring Server-Side Triggers (e.g., for Webhooks): The SLOP server allows configuring triggers (
/api/config/event-triggers
) that link server events (likejob.completed
,resource.created
) to actions (like starting another job or calling an external webhook URL). A taskflow can dynamically create or update these triggers.This task configures the SLOP server: "When this job completes, send a POST request to the specified URL". The webhook call itself is handled by the server later, not directly by the taskflow.- id: "setup_completion_webhook" type: "slop_api_event_trigger_create" # Assumes this type exists, mapping to POST /api/config/event-triggers config: # Request body for the trigger endpoint event_type: "job.completed" # Or another relevant event filter: "job.id == ${SYSTEM.JOB_ID}" # Filter for the current job completion action_type: "webhook" action_config: url: "https://my-external-service.com/webhook-receiver" method: "POST" headers: { "Content-Type": "application/json" } body_template: '{"jobId": "${event.job.id}", "status": "${event.job.status}", "result": "${retrieve_job_result.output}"}' dependencies: ["start_main_processing"] # Run early in the flow # Note: Requires a task 'retrieve_job_result' if result needed in webhook body
-
Calling External Webhooks Directly from Taskflow: If the SLOP server doesn't provide a suitable trigger or tool, you might need to:
- Create a Custom Go Task Type: Implement a task factory in
mfo-orchestrator
(e.g.,httpClientPostFactory
) that uses Go'snet/http
client to make the webhook call directly. Register this type. - Use a Generic SLOP Tool: If the SLOP server has a generic tool (e.g.,
system_http_request
) exposed via/api/tools/system_http_request
, use theslop_api_tool_execute
task type in your YAML to call it.
- Create a Custom Go Task Type: Implement a task factory in
5. Advanced Topics & Considerations¶
- Custom Task Implementation: Define custom Go task factories for logic not covered by standard SLOP API calls (e.g., complex data transformation, custom branching logic).
- Error Handling: Implement error handling within task functions (checking API call errors) and potentially define conditional paths or specific error-handling tasks in the YAML workflow.
- State Management: Rely primarily on the SLOP server's Memory (
/api/memory
) and Resource (/api/resources
) endpoints for state persistence between taskflow runs or across different workflows. - Configuration: Manage SLOP server URLs, API keys, and taskflow configurations securely.
- Multi-Agent Orchestration: Use taskflows to define the sequence of actions for an agent. Different agents might be represented by different taskflows or custom tasks within a larger flow. State sharing happens via the SLOP server's memory/resource APIs.
6. Conclusion¶
mfo-orchestrator
provides a powerful way to orchestrate interactions with a SLOP server using declarative YAML workflows executed via the go-taskflow
engine. By defining tasks that map to SLOP API calls and leveraging the server's centralized capabilities for persistence, tool execution, and event handling, developers can build complex, robust, and maintainable automation and agentic applications.
7. References¶
- go-taskflow Library: https://github.com/noneback/go-taskflow
- SLOP Server API Documentation: (Provided:
slop_api_docu.mdc
) - SLOP Server Rules: (Provided:
slop_server_rules.mdc
) - SLOP YAML Taskflow Directives: (Provided:
yaml-taskflow_directive.mdc
) - SLOP YAML API Task Definitions: (Provided:
slop_yaml_api_task_definitions.mdc
) - YAML Specification: https://yaml.org/spec/
- Graphviz: https://graphviz.org/
2.3.1. Detailed YAML Taskflow Structure¶
SLOP Client YAML Taskflow Directives (Updated)¶
This document defines the schema and conventions for defining taskflows in YAML files within the SLOP Client project, utilizing the internal/taskflow
package which leverages github.com/noneback/go-taskflow
.
Core Principles¶
- YAML as Definition: The YAML file is the primary definition of the workflow, including task sequence, dependencies, parameters, subflows, and conditional logic.
- Explicit Data Flow: Data dependencies between tasks are explicitly defined using a specific output referencing syntax (
${producing_task_id.output_key}
). - Generic Tool Execution: A generic task type (
slop_tool_call
orslop_api_tool_execute
) allows executing arbitrary SLOP Server tools defined in the YAML. - Custom Logic Tasks: Specific Go task factories are used for logic that doesn't map directly to a single SLOP Server tool call (e.g., data parsing, complex branching logic, custom condition evaluation).
- Shared Output Context: A runtime context stores outputs from completed tasks, enabling subsequent tasks and conditions to access them.
- Modularity: Subflows allow grouping related tasks, and conditions enable dynamic branching based on runtime data.
YAML File Structure¶
A taskflow YAML file must have the following top-level structure:
name: "UniqueFlowName" # Required: Identifier for the taskflow.
description: "Human-readable description." # Optional: Purpose of the flow.
version: "1.0" # Optional: Version of the flow definition.
config: # Optional: Global configuration for the flow execution.
max_concurrency: 5 # Optional: Hint for executor concurrency.
timeout: "5m" # Optional: Overall flow timeout hint (implementation status may vary).
tasks: # Required: List of individual task definitions.
- id: "task_id_1"
# ... task fields ...
- id: "task_id_2"
# ... task fields ...
subflows: # Optional: List of subflow definitions (groups of tasks).
- id: "subflow_id_1"
# ... subflow fields ...
conditions: # Optional: List of condition definitions (branching points).
- id: "condition_id_1"
# ... condition fields ...
Task Definition (tasks[]
)¶
Each item in the tasks
list defines a single unit of work.
- id: "unique_task_id" # Required: Unique identifier for this task within the flow (or subflow).
name: "Descriptive Task Name" # Optional: Human-readable name.
description: "What this task does." # Optional: Longer description.
type: "task_type_name" # Required: Maps to a registered TaskFactory in Go.
config: # Optional: Configuration passed to the TaskFactory/TaskFunc.
key1: value1
key2: "${previous_task_id.output_key}" # Example data reference
# Structure depends on the 'type'
dependencies: # Optional: List of task/subflow/condition IDs that must complete before this task starts.
- "dependency_id_1"
- "dependency_id_2"
successors: # Optional: List of task/subflow/condition IDs that depend on this task's completion.
- "successor_id_1"
priority: "normal" # Optional: Execution priority ("low", "normal", "high"). Defaults to "normal".
Generic Task Type: slop_tool_call
/ slop_api_tool_execute
¶
This special task type
executes a specified SLOP Server tool.
- id: "call_some_tool"
type: "slop_api_tool_execute" # Or slop_tool_call
config:
tool_id: "specific_slop_tool_endpoint" # Required: The SLOP Server tool ID (e.g., "unipile_email_tag_thread").
arguments: # Required: Arguments for the tool.
arg_name_1: "literal_value"
arg_name_2: ${task_id_producing_value.output_key} # Reference output
# ... other arguments required by the tool
dependencies: ["task_id_producing_value"]
outputs: # Document expected outputs for clarity
- call_some_tool.result # Example output key
Data Passing Convention¶
- Output Storage: Tasks write their results to a shared execution context (e.g.,
map[taskID][outputKey] = value
). Output keys should be documented or standardized for each task type. - Output Referencing: Within a task's or condition's
config
map, values matching the pattern${producing_task_id.output_key}
will be dynamically replaced at runtime with the corresponding value from the shared context. - Resolution: The Go task or condition implementation is responsible for parsing the config, detecting the
${...}
syntax, and retrieving the value from the context. - Literals: Any config value not matching the
${...}
pattern is treated as a literal value (string, number, boolean, etc.).
Example Data Flow¶
tasks:
- id: "extract_data"
type: "custom_extractor" # Assumes this task outputs {"user_id": 123, "order_id": "abc"}
config:
source: "input_payload.json"
successors: ["process_user", "process_order"]
outputs:
- extract_data.user_id
- extract_data.order_id
- id: "process_user"
type: "slop_api_tool_execute"
config:
tool_id: "user_service_update_profile"
arguments:
user_identifier: ${extract_data.user_id} # Get user_id from extract_data output
new_status: "processed"
dependencies: ["extract_data"]
- id: "process_order"
type: "slop_api_tool_execute"
config:
tool_id: "order_service_fetch_details"
arguments:
order_ref: ${extract_data.order_id} # Get order_id from extract_data output
dependencies: ["extract_data"]
Subflow Definition (subflows[]
)¶
Subflows encapsulate a group of related tasks, allowing for modular, reusable workflow components. Subflows are defined in the subflows:
section and use only pure YAML.
subflows:
- id: "process_order"
name: "Process Order"
description: "Fetch and process order data"
input_schema: # (Optional) Validate input before subflow execution
required: ["order_id"]
properties:
order_id:
type: string
tasks:
- id: "fetch"
type: "fetch_data"
config:
order_id: "${INPUT.order_id}"
- id: "process"
type: "process_data"
dependencies: ["fetch"]
config:
data: "${fetch.result}"
dependencies: ["external_task"]
successors: ["next_task"]
Key Points: - All config values are YAML, not JSON strings. - Inputs and outputs are referenced using ${...}
syntax. - If input_schema
is provided, the engine validates the input map before invoking the subflow. - Outputs from subflow tasks are accessible to downstream tasks by their task ID and output key.
Example: Subflow with Input Validation¶
subflows:
- id: "user_registration"
input_schema:
required: ["email", "password"]
properties:
email:
type: string
password:
type: string
tasks:
- id: "validate_email"
type: "email_validator"
config:
email: "${INPUT.email}"
- id: "create_user"
type: "user_creator"
dependencies: ["validate_email"]
config:
email: "${INPUT.email}"
password: "${INPUT.password}"
Condition Definition (conditions[]
)¶
Conditions enable dynamic branching in your workflow based on runtime data. They are defined in the conditions:
section of your YAML and use a pure YAML structure—never embed JSON.
conditions:
- id: "check_threshold"
name: "Check if value exceeds threshold"
description: "Branch if input_value is greater than 10 and status is active"
type: "expr" # Uses antonmedv/expr for expression evaluation
config:
expression: "input_value > 10 && status == 'active'"
vars:
input_value: 42
status: "active"
dependencies: ["some_task"]
successors: ["branch_0_task", "branch_1_task"]
Key Points: - expression
is a string, not a JSON object. - vars
is a YAML map, not a JSON string. - All config values are pure YAML. - The return value of the condition function (as an integer) selects the branch (0-based index) from successors
.
Variable Resolution Order¶
vars
from the config (static, highest precedence)- Context payload (
INPUT
) - Outputs from previous tasks (
taskID.key
) - If not found, the variable is undefined and may cause an error.
Example: Using Context and Task Outputs¶
conditions:
- id: "should_retry"
type: "expr"
config:
expression: "attempts < max_attempts && last_result == 'error'"
vars:
max_attempts: 3
# 'attempts' and 'last_result' can come from previous tasks or INPUT
dependencies: ["run_task"]
successors: ["retry_task", "end_flow"]
Best Practices¶
- Never embed JSON: All config, schemas, and data structures must be valid YAML.
- Use
${...}
references for all dynamic data passing. - Document input and output keys for each task and subflow.
- Validate inputs using
input_schema
for subflows when possible. - Keep variable names descriptive and avoid reserved prefixes unless required.
Example: Full Workflow with Conditions and Subflows¶
name: "Order Processing Flow"
tasks:
- id: "start"
type: "trigger"
config:
order_id: "${INPUT.order_id}"
successors: ["order_check"]
conditions:
- id: "order_check"
type: "expr"
config:
expression: "order_id != ''"
dependencies: ["start"]
successors: ["process_order", "fail"]
subflows:
- id: "process_order"
input_schema:
required: ["order_id"]
properties:
order_id:
type: string
tasks:
- id: "fetch"
type: "fetch_data"
config:
order_id: "${INPUT.order_id}"
- id: "process"
type: "process_data"
dependencies: ["fetch"]
config:
data: "${fetch.result}"
dependencies: ["order_check"]
successors: ["end"]
tasks:
- id: "fail"
type: "notify_failure"
config:
reason: "Order ID missing"
dependencies: ["order_check"]
- id: "end"
type: "finalize"
dependencies: ["process_order", "fail"]