Skip to content

30.04.2025-v2

MFO Slop Client - Detailed Documentation

version: 2.0

This document provides a comprehensive guide to the mfo-client library, detailing its capabilities, internal mechanisms, and usage patterns with examples, incorporating information from the SLOP server API and taskflow definitions. It is intended for developers looking to interact with the Slop server API and leverage its task orchestration features.

1. Introduction

mfo-client is a Go library designed as a client for the SLOP server, a platform focused on agentic workflows, task automation, and centralized interaction management. The client facilitates interaction with the server's REST API and integrates a powerful task workflow engine based on go-taskflow. This allows developers to define complex, potentially multi-agent workflows using a declarative YAML format, execute them efficiently via the mfo-client, and manage related server-side resources like jobs, events, memory, and prompts.

The library design emphasizes flexibility and human readability, particularly through its use of YAML for taskflow definitions and configuration. This aligns with preferences for systems where configurations are easily understood and modifiable by humans, potentially supporting dynamic adjustments in multi-agent orchestration scenarios. Clients delegate external interactions and persistence entirely to the SLOP server, promoting stateless client design.

2. Core Concepts

Understanding the following core components is crucial for effectively using mfo-client.

2.1. API Client (internal/client, internal/api)

The heart of the library's interaction with the SLOP server is the API client. It encapsulates the logic for making HTTP requests to the server's various REST endpoints defined by the SLOP protocol.

  • Initialization: The client is typically initialized with the server's base URL and authentication credentials (JWT or API Token obtained via POST /auth/login). See internal/client/client.go for initialization patterns.
  • Endpoint Abstraction: Specific API functionalities are abstracted into dedicated modules within internal/api/. These cover key SLOP server capabilities:
    • chat.go: Interacting with conversational endpoints (/api/chat).
    • tool.go: Executing registered server-side tools (/api/tools/:tool_id).
    • memory.go: Managing persistent key-value storage (/api/memory).
    • resource.go: Managing files and other resources (/api/resources).
    • job.go: Managing asynchronous background jobs (/api/jobs).
    • notification.go: Managing user notifications (/api/notifications).
    • event.go / Config: Handling event triggers and configurations (/api/config/event-triggers).
  • Request/Response Handling: The client manages request serialization, response deserialization (often into structs defined in internal/models), authentication headers, and error handling, adhering to the SLOP server rules.

2.2. Authentication (internal/auth)

Secure communication with the SLOP server's private /api/* endpoints is handled by the authentication module. It supports JWT tokens (obtained via /auth/login) or API tokens (managed via /api/tokens), configured during client initialization and automatically included in the Authorization: Bearer <token> header of outgoing requests.

2.3. Taskflow Engine (internal/taskflow, go-taskflow)

mfo-client leverages the go-taskflow library (github.com/noneback/go-taskflow) for defining, building, and executing task workflows locally within the client application, orchestrating calls to the SLOP server.

  • YAML Definition: Workflows are defined declaratively in YAML files, following the structure outlined in yaml-taskflow_directive.mdc. This format describes tasks, their types (mapping to Go functions), configurations, dependencies, conditional logic, and subflows.
  • SLOP API Task Types: Specific task types are defined (or should be defined) in the mfo-client's Go code to correspond to SLOP API calls (e.g., mfo_api_chat_send, mfo_api_resource_get, mfo_api_memory_store). Examples are provided in slop_yaml_api_task_definitions.mdc. A generic slop_tool_call type might also exist or be implemented to call arbitrary /api/tools/:tool_id endpoints.
  • Task Registry (internal/taskflow/registry.go): A central registry maps task type strings (from YAML type field) to Go factory functions (taskflow.TaskFuncFactory). These factories create the actual executable task functions, often requiring access to the initialized slopClient instance (likely passed via context or closure) to make API calls.
  • Builder (internal/taskflow/builder.go): The builder takes a parsed FlowDefinition (from YAML) and the TaskRegistry to construct an executable taskflow.TaskFlow graph. It resolves dependencies and links tasks.
  • Executor (go-taskflow.Executor, potentially wrapped in internal/taskflow/executor.go): The executor runs the TaskFlow graph. It manages task scheduling, respects dependencies, handles concurrency, and executes tasks (Go functions making SLOP API calls) potentially in parallel.
  • Context & Data Passing (internal/taskflow/context.go): An execution context is passed through the task execution. This context typically holds the slopClient instance and a map for storing task outputs. Tasks access outputs from previous tasks using the ${producing_task_id.output_key} syntax within their config in the YAML, which the Go task implementation resolves by querying the context map.
  • Persistence (internal/taskflow/persistence): While the SLOP server handles primary persistence (memory, resources, jobs), this package within the client might relate to client-side caching or state management for the workflow execution itself, if needed.

2.4. Data Models (internal/models)

This directory contains the Go struct definitions representing data exchanged with the SLOP API (requests/responses) and used within the taskflow engine (e.g., TaskflowDefinition, TaskDefinition). These align with the structures expected or returned by the SLOP server endpoints.

2.5. Prompt Management (internal/prompt, internal/prompts)

For agentic workflows involving LLMs via the SLOP server's /api/chat endpoint, mfo-client includes utilities for managing prompts used in ChatRequest bodies:

  • Parsing & Storage: Loading and managing prompt templates, potentially from Markdown files.
  • Templating: Injecting variables (including data passed via ${task_id.output_key}) into prompt templates before sending them in an API call.

3. Usage and Examples

3.1. API Client Usage (Go)

The core pattern involves initializing the client and calling its methods corresponding to SLOP API endpoints.

// Example (Conceptual - based on structure and API docs)
package main

import (
    "context"
    "log"
    "gitlab.com/webigniter/slop-client/internal/client"
    "gitlab.com/webigniter/slop-client/internal/auth"
    "gitlab.com/webigniter/slop-client/internal/models"
)

func main() {
    // Assume credentials obtained securely
    serverURL := "http://localhost:8080" // Replace with actual Slop server URL
    apiKey := "YOUR_API_KEY_OR_JWT"

    // Configure authentication
    authProvider := auth.NewStaticTokenProvider(apiKey)


    // Initialize the client
    slopClient, err := client.NewClient(serverURL,nil,authProvider)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    // Example: Retrieve a resource
    resourceID := "some-resource-id"
    resource, err := slopClient.API.GetResource(context.Background(), resourceID)
    if err != nil {
        log.Fatalf("Failed to get resource: %v", err)
    }
    log.Printf("Resource content: %s", resource.Content)

    // Example: Store data in memory
    memReq := models.StoreMemoryRequest{
        Key:        "my_data_key",
        Value:      map[string]interface{}{"info": "some data"},
        ProviderID: "default_memory", // As required by API docs
    }
    _, err = slopClient.API.StoreMemory(context.Background(), memReq)
    if err != nil {
        log.Fatalf("Failed to store memory: %v", err)
    }
    log.Println("Data stored in memory.")
}

3.2. Taskflow Engine Usage (YAML + Go)

The primary way to orchestrate SLOP API calls is by defining workflows in YAML and executing them using the mfo-client's taskflow engine.

Step 1: Define Workflow in YAML

Create a YAML file (e.g., process_email_workflow.yml) using SLOP API task types.

name: "ProcessIncomingEmail"
description: "Analyzes an email, stores analysis, and tags the thread."
version: "1.0"

tasks:
  - id: "get_email_content"
    name: "Get Email Resource"
    type: "mfo_api_resource_get" # Maps to a Go factory
    config:
      resource_id: "${INPUT.resource_id}" # Assume resource_id is provided as input to the flow
    successors:
      - "analyze_email"

  - id: "analyze_email"
    name: "Analyze Email with LLM"
    type: "mfo_api_chat_send"
    config:
      request_body:
        messages:
          - role: "system"
            content: "Analyze the following email content and extract the sender, subject, and a brief summary. Respond in JSON format {"sender": "...", "subject": "...", "summary": "..."}."
          - role: "user"
            content: "${get_email_content.resource.content}" # Get content from previous task
        provider_id: "openai" # Example provider
    dependencies: ["get_email_content"]
    successors:
      - "store_analysis"

  - id: "store_analysis"
    name: "Store Analysis in Memory"
    type: "mfo_api_memory_store"
    config:
      key: "email_analysis_${INPUT.resource_id}"
      # Assuming the chat response content is valid JSON string
      value: "${analyze_email.response.content}" 
      provider_id: "default_memory"
    dependencies: ["analyze_email"]
    successors:
      - "tag_email_thread" # Assuming we need a separate step to get thread_id

  # Placeholder: Add task to extract thread_id if not available in INPUT or resource metadata
  # - id: "extract_thread_id"
  #   type: "custom_regex_extractor" # Example custom task
  #   config:
  #     input_string: "${get_email_content.resource.metadata.email_thread_id}" # Example path
  #     regex: "..."
  #   dependencies: ["get_email_content"]
  #   successors: ["tag_email_thread"]

  - id: "tag_email_thread"
    name: "Tag Email Thread"
    type: "mfo_api_tool_execute" # Using the generic tool executor type
    config:
      tool_id: "unipile_email_tag_thread" # Specific SLOP tool
      arguments:
        tag_name: "ai_analyzed"
        # Assuming thread_id comes from input or another task like extract_thread_id
        thread_id: "${INPUT.thread_id}" 
    dependencies: ["store_analysis"] # Depends on analysis being stored, and thread_id being available

Step 2: Register Task Types (Go)

In your mfo-client application, register Go factory functions for each type used in the YAML. These factories often need the slopClient.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "gitlab.com/webigniter/slop-client/internal/client"
    "gitlab.com/webigniter/slop-client/internal/models"
    "gitlab.com/webigniter/slop-client/internal/taskflow"
    // ... other necessary imports
)

// Define a context structure to hold client and outputs
type FlowContext struct {
    SlopClient *client.Client
    Outputs    map[string]map[string]interface{}
    Inputs     map[string]interface{} // For initial inputs like resource_id
    // Add mutex if concurrent tasks write to Outputs
}

// --- Task Factories --- 

func slopApiResourceGetFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
    return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
        return func(ctxInterface interface{}) {
            ctx := ctxInterface.(*FlowContext)
            taskID := "get_email_content" // Assuming task ID is known or passed

            // Resolve config parameters (handle ${...} syntax)
            resourceIDRaw, _ := taskflow.ResolveConfigValue(config["resource_id"], ctx.Outputs, ctx.Inputs)
            resourceID := resourceIDRaw.(string)

            resource, err := slopClient.API.GetResource(context.Background(), resourceID)
            if err != nil {
                // Handle error appropriately (log, set error state in context)
                fmt.Printf("Task %s failed: %v\n", taskID, err)
                return
            }
            // Store output
            taskflow.SetOutput(ctx.Outputs, taskID, "resource", resource)
            fmt.Printf("Task %s completed.\n", taskID)
        }, nil
    }
}

func slopApiChatSendFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
     return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
        return func(ctxInterface interface{}) {
            ctx := ctxInterface.(*FlowContext)
            taskID := "analyze_email"

            // Resolve config (complex: needs deep resolution for request_body)
            resolvedConfig, err := taskflow.ResolveConfigMap(config, ctx.Outputs, ctx.Inputs)
            if err != nil { /* handle error */ return }

            var chatReq models.ChatRequest
            // Use helper to map resolvedConfig["request_body"] to chatReq struct
            // mapstructure.Decode(resolvedConfig["request_body"], &chatReq)

            chatRespMap, err := slopClient.API.SendChat(context.Background(), chatReq) // Assuming SendChat exists
            if err != nil { /* handle error */ return }

            taskflow.SetOutput(ctx.Outputs, taskID, "response", chatRespMap)
            fmt.Printf("Task %s completed.\n", taskID)
        }, nil
    }
}

// ... Implement factories for mfo_api_memory_store, mfo_api_tool_execute etc. ...
// These factories will parse their specific 'config', resolve ${...} values,
// call the appropriate slopClient.API method, and store results in ctx.Outputs.

func main() {
    // ... Initialize slopClient ...

    registry := taskflow.NewRegistry()
    // Pass slopClient to factories
    registry.RegisterTask("mfo_api_resource_get", slopApiResourceGetFactory(slopClient))
    registry.RegisterTask("mfo_api_chat_send", slopApiChatSendFactory(slopClient))
    // ... register other factories ...

    // ... next steps: load YAML, build, execute ...
}

Step 3: Load, Build, and Execute (Go)

Load the YAML, create the TaskFlow, prepare the initial context, and run it.

// ... (inside main, after registry setup)

import (
    "log"
    "os"
    "runtime"
    gotaskflow "github.com/noneback/go-taskflow"
    "gopkg.in/yaml.v3"
    // ... other imports
)

    // Load YAML
    yamlData, err := os.ReadFile("process_email_workflow.yml")
    // ... handle error ...

    var flowDef *taskflow.FlowDefinition
    err = yaml.Unmarshal(yamlData, &flowDef)
    // ... handle error ...

    // Create Builder
    builder := taskflow.NewBuilder(registry)

    // Build TaskFlow
    taskFlow, err := builder.BuildFlow(flowDef)
    // ... handle error ...

    // Create Executor
    executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))

    // Prepare Initial Context
    initialContext := &FlowContext{
        SlopClient: slopClient,
        Outputs:    make(map[string]map[string]interface{}),
        Inputs:     map[string]interface{}{ // Example inputs
            "resource_id": "email-resource-123",
            "thread_id":   "unipile-thread-abc",
        },
    }

    // Execute the flow
    log.Println("Executing taskflow...")
    executor.Run(taskFlow, gotaskflow.WithContext(initialContext)).Wait()

    log.Println("Taskflow execution complete.")
    // Access final outputs if needed: initialContext.Outputs

    // Optional: Dump graph
    // taskFlow.Dump(os.Stdout)

(Note: Helper functions like taskflow.ResolveConfigValue, taskflow.ResolveConfigMap, taskflow.SetOutput are assumed to exist within the internal/taskflow package to handle data passing.)

3.3. Command-Line Utilities

  • taskflow-yaml-checker: Validates YAML workflow definitions.
    ./taskflow-yaml-checker -input ./path/to/your/workflow.yml
    
  • taskflow-visualizer: Generates a visual graph (requires Graphviz dot).
    ./taskflow-visualizer -input ./path/to/your/workflow.yml -output workflow.svg
    

3.4. Services (Webhook Trigger)

Incoming webhooks (POST /webhooks/:provider/:webhook_id) are handled by the SLOP server. These typically trigger server-side logic, which might involve starting a predefined SLOP Job (POST /api/jobs) that executes a specific process_id (which could be a taskflow defined in YAML and run by a mfo-client instance or similar executor within the server environment).

The services/unipile_mailer_webhook example in mfo-client likely shows how a client-side application could listen for webhooks and then use the taskflow engine, but according to SLOP server rules, this responsibility is ideally centralized on the server.

4. Handling Notifications and Webhooks within Taskflows

Taskflows executed by mfo-client can interact with SLOP's notification and event systems:

  • Sending User Notifications: A task (e.g., mfo_api_notification_create type mapping to POST /api/notifications) can be added to the YAML workflow to create a notification for a user upon reaching a certain stage.

    - id: "notify_user_completion"
      type: "mfo_api_notification_create" # Assumes this type exists
      config:
        # Request body for POST /api/notifications
        user_id: "${INPUT.user_id}" # Or retrieved from context
        title: "Email Processing Complete"
        message: "Analysis for resource ${INPUT.resource_id} stored."
        level: "info"
      dependencies: ["store_analysis"]
    

  • Dynamically Configuring Server-Side Triggers (e.g., for Webhooks): The SLOP server allows configuring triggers (/api/config/event-triggers) that link server events (like job.completed, resource.created) to actions (like starting another job or calling an external webhook URL). A taskflow can dynamically create or update these triggers.

    - id: "setup_completion_webhook"
      type: "mfo_api_event_trigger_create" # Assumes this type exists, mapping to POST /api/config/event-triggers
      config:
        # Request body for the trigger endpoint
        event_type: "job.completed" # Or another relevant event
        filter: "job.id == ${SYSTEM.JOB_ID}" # Filter for the current job completion
        action_type: "webhook"
        action_config:
          url: "https://my-external-service.com/webhook-receiver"
          method: "POST"
          headers: { "Content-Type": "application/json" }
          body_template: '{"jobId": "${event.job.id}", "status": "${event.job.status}", "result": "${retrieve_job_result.output}"}'
      dependencies: ["start_main_processing"] # Run early in the flow
      # Note: Requires a task 'retrieve_job_result' if result needed in webhook body
    
    This task configures the SLOP server: "When this job completes, send a POST request to the specified URL". The webhook call itself is handled by the server later, not directly by the taskflow.

  • Calling External Webhooks Directly from Taskflow: If the SLOP server doesn't provide a suitable trigger or tool, you might need to:

    1. Create a Custom Go Task Type: Implement a task factory in mfo-client (e.g., httpClientPostFactory) that uses Go's net/http client to make the webhook call directly. Register this type.
    2. Use a Generic SLOP Tool: If the SLOP server has a generic tool (e.g., system_http_request) exposed via /api/tools/system_http_request, use the mfo_api_tool_execute task type in your YAML to call it.

5. Advanced Topics & Considerations

  • Custom Task Implementation: Define custom Go task factories for logic not covered by standard SLOP API calls (e.g., complex data transformation, custom branching logic).
  • Error Handling: Implement error handling within task functions (checking API call errors) and potentially define conditional paths or specific error-handling tasks in the YAML workflow.
  • State Management: Rely primarily on the SLOP server's Memory (/api/memory) and Resource (/api/resources) endpoints for state persistence between taskflow runs or across different workflows.
  • Configuration: Manage SLOP server URLs, API keys, and taskflow configurations securely.
  • Multi-Agent Orchestration: Use taskflows to define the sequence of actions for an agent. Different agents might be represented by different taskflows or custom tasks within a larger flow. State sharing happens via the SLOP server's memory/resource APIs.

6. Conclusion

mfo-client provides a powerful way to orchestrate interactions with a SLOP server using declarative YAML workflows executed via the go-taskflow engine. By defining tasks that map to SLOP API calls and leveraging the server's centralized capabilities for persistence, tool execution, and event handling, developers can build complex, robust, and maintainable automation and agentic applications.

7. References

2.3.1. Detailed YAML Taskflow Structure

SLOP Client YAML Taskflow Directives (Updated)

This document defines the schema and conventions for defining taskflows in YAML files within the SLOP Client project, utilizing the internal/taskflow package which leverages github.com/noneback/go-taskflow.

Core Principles

  1. YAML as Definition: The YAML file is the primary definition of the workflow, including task sequence, dependencies, parameters, subflows, and conditional logic.
  2. Explicit Data Flow: Data dependencies between tasks are explicitly defined using a specific output referencing syntax (${producing_task_id.output_key}).
  3. Generic Tool Execution: A generic task type (slop_tool_call or mfo_api_tool_execute) allows executing arbitrary SLOP Server tools defined in the YAML.
  4. Custom Logic Tasks: Specific Go task factories are used for logic that doesn't map directly to a single SLOP Server tool call (e.g., data parsing, complex branching logic, custom condition evaluation).
  5. Shared Output Context: A runtime context stores outputs from completed tasks, enabling subsequent tasks and conditions to access them.
  6. Modularity: Subflows allow grouping related tasks, and conditions enable dynamic branching based on runtime data.

YAML File Structure

A taskflow YAML file must have the following top-level structure:

name: "UniqueFlowName"             # Required: Identifier for the taskflow.
description: "Human-readable description." # Optional: Purpose of the flow.
version: "1.0"                   # Optional: Version of the flow definition.

config:                          # Optional: Global configuration for the flow execution.
  max_concurrency: 5             # Optional: Hint for executor concurrency.
  timeout: "5m"                  # Optional: Overall flow timeout hint (implementation status may vary).

tasks:                           # Required: List of individual task definitions.
  - id: "task_id_1"
    # ... task fields ...
  - id: "task_id_2"
    # ... task fields ...

subflows:                        # Optional: List of subflow definitions (groups of tasks).
  - id: "subflow_id_1"
    # ... subflow fields ...

conditions:                      # Optional: List of condition definitions (branching points).
  - id: "condition_id_1"
    # ... condition fields ...

Task Definition (tasks[])

Each item in the tasks list defines a single unit of work.

- id: "unique_task_id"          # Required: Unique identifier for this task within the flow (or subflow).
  name: "Descriptive Task Name" # Optional: Human-readable name.
  description: "What this task does." # Optional: Longer description.
  type: "task_type_name"        # Required: Maps to a registered TaskFactory in Go.
  config:                       # Optional: Configuration passed to the TaskFactory/TaskFunc.
    key1: value1
    key2: "${previous_task_id.output_key}" # Example data reference
    # Structure depends on the 'type'
  dependencies:                 # Optional: List of task/subflow/condition IDs that must complete before this task starts.
    - "dependency_id_1"
    - "dependency_id_2"
  successors:                   # Optional: List of task/subflow/condition IDs that depend on this task's completion.
    - "successor_id_1"
  priority: "normal"            # Optional: Execution priority ("low", "normal", "high"). Defaults to "normal".

Generic Task Type: slop_tool_call / mfo_api_tool_execute

This special task type executes a specified SLOP Server tool.

- id: "call_some_tool"
  type: "mfo_api_tool_execute" # Or slop_tool_call
  config:
    tool_id: "specific_slop_tool_endpoint" # Required: The SLOP Server tool ID (e.g., "unipile_email_tag_thread").
    arguments:                             # Required: Arguments for the tool.
      arg_name_1: "literal_value"
      arg_name_2: ${task_id_producing_value.output_key} # Reference output
      # ... other arguments required by the tool
  dependencies: ["task_id_producing_value"]
  outputs: # Document expected outputs for clarity
    - call_some_tool.result # Example output key

Data Passing Convention

  • Output Storage: Tasks write their results to a shared execution context (e.g., map[taskID][outputKey] = value). Output keys should be documented or standardized for each task type.
  • Output Referencing: Within a task's or condition's config map, values matching the pattern ${producing_task_id.output_key} will be dynamically replaced at runtime with the corresponding value from the shared context.
  • Resolution: The Go task or condition implementation is responsible for parsing the config, detecting the ${...} syntax, and retrieving the value from the context.
  • Literals: Any config value not matching the ${...} pattern is treated as a literal value (string, number, boolean, etc.).

Example Data Flow

tasks:
  - id: "extract_data"
    type: "custom_extractor" # Assumes this task outputs {"user_id": 123, "order_id": "abc"}
    config:
      source: "input_payload.json"
    successors: ["process_user", "process_order"]
    outputs:
      - extract_data.user_id
      - extract_data.order_id

  - id: "process_user"
    type: "mfo_api_tool_execute"
    config:
      tool_id: "user_service_update_profile"
      arguments:
        user_identifier: ${extract_data.user_id} # Get user_id from extract_data output
        new_status: "processed"
    dependencies: ["extract_data"]

  - id: "process_order"
    type: "mfo_api_tool_execute"
    config:
      tool_id: "order_service_fetch_details"
      arguments:
        order_ref: ${extract_data.order_id} # Get order_id from extract_data output
    dependencies: ["extract_data"]

Subflow Definition (subflows[])

Defines a group of tasks that can be treated as a single node in the main flow's dependency graph, promoting modularity and simplifying complex workflows.

- id: "unique_subflow_id"          # Required: Unique identifier for this subflow within the entire workflow.
  name: "Descriptive Subflow Name" # Optional: Human-readable name.
  description: "Purpose of this subflow." # Optional: Longer description.
  tasks: # Required: List of task definitions internal to this subflow.
    - id: "internal_task_1" # IDs only need to be unique within this subflow.
      type: "some_task_type"
      config:
        input: "${external_task.output}" # Can access outputs from outside the subflow
      dependencies: []
      successors: ["internal_task_2"]
      # ... other task fields
    - id: "internal_task_2"
      type: "another_task_type"
      dependencies: ["internal_task_1"]
      # ... other task fields
      outputs:
        - internal_task_2.result # Output can be accessed externally
  dependencies: # Optional: Dependencies on nodes outside the subflow.
    - "external_dependency_id"
  successors:   # Optional: External nodes that depend on this subflow's completion.
    - "external_successor_id"
  priority: "normal" # Optional: Priority for the entire subflow.

Key Points for Subflows:

  • Encapsulation: Groups related tasks into a logical unit.
  • Scoping: Task IDs within a subflow are local to that subflow.
  • Dependencies: Subflows can depend on external nodes, and external nodes can depend on the completion of the entire subflow.
  • Execution: A subflow starts after its external dependencies are met and completes only when all its internal tasks finish successfully.
  • Data Access:
    • Internal tasks access each other's outputs using ${internal_task_id.output_key}.
    • Internal tasks can access outputs from tasks outside the subflow using ${external_task_id.output_key}.
    • External tasks can access outputs from tasks inside the subflow using ${internal_task_id.output_key} (assuming the taskflow engine makes these accessible, potentially prefixed like subflow_id.internal_task_id - verification needed).

Condition Definition (conditions[])

Defines a branching point in the flow based on a condition evaluated at runtime using data from the execution context.

- id: "unique_condition_id"
  name: "Descriptive Condition Name"
  description: "Decision logic description."
  type: "condition_evaluator_type" # Required: Maps to a registered ConditionFactory in Go.
  config:                           # Optional: Configuration for the condition evaluator.
    input_value: ${previous_task.some_result} # Pass data needed for evaluation
    threshold: 100
  dependencies:                     # Optional: Nodes that must complete before evaluation.
    - "previous_task"
  successors:                       # Required: List of node IDs representing the branches. Order matters.
    - "branch_0_task_id"          # Executes if ConditionFunc returns 0
    - "branch_1_task_id"          # Executes if ConditionFunc returns 1
    # ... up to N branches
  # default_branch: "branch_id" # Optional: Behavior depends on go-taskflow support and implementation.

Key Points for Conditions:

  • Branching: Enables dynamic workflow paths based on runtime data.
  • Evaluation: A Go function (ConditionFunc), determined by the type, evaluates the condition after its dependencies are met.
  • Go Function: The ConditionFunc receives the execution context, accesses its config (resolving ${...} references), and must return a uint (non-negative integer).
  • Successor Selection: The returned uint acts as a zero-based index into the successors list. Only the node at that index is executed.
  • Out-of-Bounds/Default: The behavior if the returned index is invalid or if a default_branch is specified needs clarification based on the specific go-taskflow version and SLOP client implementation.

Parallelism and Complex Patterns (e.g., Map/Reduce)

  • The ${task_id.output_key} syntax is fundamental for defining dependencies.
  • For map/reduce-like patterns:
    • Multiple parallel tasks (map phase) can run concurrently if they don't depend on each other directly.
    • A subsequent aggregation task (reduce phase) would list all parallel map tasks in its dependencies.
    • The aggregation task's Go implementation needs to be designed to gather outputs from all preceding map tasks from the execution context (e.g., by knowing the map task IDs or querying the context for outputs matching a pattern).

This requires careful design of the custom aggregation task and potentially extending the context interaction logic.

4. Webhook Integration

This section details how the SLOP server and client interact to enable workflows triggered by external events via webhooks.

Documenting Webhook Registration and Triggering in SLOP

This section details how the SLOP server handles incoming webhooks and how to configure the system to trigger specific taskflows (jobs) based on these external events.

Overview: From Webhook to Job

The process involves several steps connecting an external event to a SLOP taskflow execution:

  1. External Event Source: An external service (e.g., GitHub, Stripe, a custom application) sends an HTTP POST request (a webhook) to a specific public URL on the SLOP server when a particular event occurs in that service.
  2. SLOP Public Webhook Endpoint: The SLOP server exposes a generic public endpoint: POST /webhooks/:provider/:webhook_id. The :provider and :webhook_id path parameters identify the source and specific configuration.
  3. Webhook Handling & Internal Event: The SLOP server receives the incoming request. The internal handler associated with the :provider and :webhook_id parses the request. It's assumed this handler then publishes the relevant event data onto an internal event channel within the SLOP system. The naming convention for this channel might be something like webhook:<provider>:<webhook_id> or depend on the specific provider implementation.
  4. Event Trigger Configuration: You configure an EventJobTrigger within the SLOP server using the private API endpoints under /api/config/event-triggers. This configuration acts as a listener.
  5. Trigger Matching: The EventJobTrigger listens on a specific event_channel. When an event is published on a channel matching the trigger's configuration, the trigger activates.
  6. Job Creation: Upon activation, the trigger automatically creates and queues a new background job (POST /api/jobs). The job will execute the taskflow defined by the job_process_id specified in the trigger configuration. The payload from the original webhook event is typically passed as input data to the newly created job.

Configuring Event Triggers (Server-Side Registration)

The core of registering a webhook handler involves creating an EventJobTrigger configuration. This tells the SLOP server which taskflow (job process) to run when an event arrives on a specific channel (originating from a webhook).

API Endpoints

Management of these triggers is done via the following private API endpoints (requiring authentication):

  • POST /api/config/event-triggers: Creates a new event trigger configuration.
  • GET /api/config/event-triggers: Lists existing trigger configurations (supports filtering by channel and enabled status).
  • GET /api/config/event-triggers/:triggerID: Retrieves details of a specific trigger by its numeric ID.
  • PUT /api/config/event-triggers/:triggerID: Updates an existing trigger configuration.
  • DELETE /api/config/event-triggers/:triggerID: Deletes a trigger configuration.

EventJobTrigger Object Structure

When creating (POST) or updating (PUT) a trigger, you need to provide an object with the following key fields (based on API documentation):

  • event_channel (string, required): The name of the internal event channel the trigger should listen to. This must match the channel where the corresponding webhook handler publishes events (e.g., webhook:github:push, webhook:unipile:new_email).
  • job_process_id (string, required): The identifier of the taskflow (YAML definition name) that should be executed when the trigger activates.
  • name (string, optional): A descriptive name for the trigger configuration.
  • description (string, optional): A more detailed description.
  • enabled (boolean, optional, defaults to true): Whether the trigger is active.
  • metadata (object, optional): Custom key-value pairs for additional information or configuration.

Note: The server assigns a unique numeric id upon creation.

Creating a Trigger via API Call

You would typically use an HTTP client (like the mfo-client library or curl) authenticated with the SLOP server to make a POST request to /api/config/event-triggers with a JSON body representing the EventJobTrigger object.

Example Request Body (JSON):

{
  "name": "Trigger Email Processing on New Unipile Email",
  "event_channel": "webhook:unipile:new_email",
  "job_process_id": "ProcessIncomingEmail",
  "enabled": true,
  "metadata": {
    "priority": "normal"
  }
}

Creating a Trigger via Taskflow (Conceptual)

While direct API calls work, you could also potentially manage these triggers using a dedicated task type within a SLOP taskflow itself. This allows for infrastructure-as-code style management of your triggers.

Conceptual YAML Task Definition:

- id: "register_email_webhook_trigger"
  name: "Register Unipile Email Trigger"
  type: "mfo_api_config_create_event_trigger" # Hypothetical task type
  config:
    # Fields matching the EventJobTrigger object
    name: "Trigger Email Processing on New Unipile Email"
    event_channel: "webhook:unipile:new_email"
    job_process_id: "ProcessIncomingEmail" # Name of the target taskflow YAML
    enabled: true
    metadata:
      managed_by: "taskflow"
  dependencies: []
  outputs:
    - register_email_webhook_trigger.trigger # The created trigger object from API response

(The actual implementation of the mfo_api_config_create_event_trigger task factory in Go would handle calling the POST /api/config/event-triggers endpoint using the provided config.)

Documenting Client-Side Webhook Setup for SLOP

This section explains how to configure an external service (the "client" in this context) to send webhooks to your SLOP server instance. This is necessary for triggering automated workflows based on events happening outside of SLOP.

Identifying the Target SLOP Webhook URL

The external service needs to send its webhook notifications (typically HTTP POST requests) to a specific public URL provided by the SLOP server. The general format for this URL is:

<SLOP_SERVER_BASE_URL>/webhooks/:provider/:webhook_id

Components:

  • <SLOP_SERVER_BASE_URL>: This is the main address where your SLOP server is accessible publicly (e.g., https://slop.yourcompany.com).
  • /webhooks/: This is a fixed path segment indicating the webhook receiver endpoint.
  • :provider: This path parameter identifies the type or source of the external service sending the webhook. Examples could include github, stripe, unipile, your_custom_app. The SLOP server must have a handler configured internally that recognizes this provider ID.
  • :webhook_id: This path parameter provides a unique identifier for this specific webhook configuration. It allows you to distinguish between different events or configurations from the same provider. For example, you might have github/repo_push and github/issue_created as distinct webhook IDs.

Determining :provider and :webhook_id:

The specific values you use for :provider and :webhook_id are crucial. They must correspond directly to the event_channel configured in the EventJobTrigger on the SLOP server side. Typically, the event_channel follows a pattern like webhook:<provider>:<webhook_id>. You need to ensure the URL parameters match the channel the trigger is listening on.

Example URL:

If your SLOP server is at https://slop.internal.net, and you want to receive notifications from Unipile for new emails, and your corresponding EventJobTrigger listens on the channel webhook:unipile:new_email, the URL to configure in Unipile would be:

https://slop.internal.net/webhooks/unipile/new_email

Configuring the External Service

While the exact steps vary greatly depending on the external service, the general process involves:

  1. Access Webhook Settings: Log in to the external service and navigate to its settings or developer section where webhooks can be configured (often called "Webhooks", "Notifications", or "API Integrations").
  2. Add New Webhook: Create a new webhook configuration.
  3. Payload URL: Enter the full SLOP Webhook URL constructed as described above into the "Payload URL", "Endpoint URL", or similar field.
  4. Content Type: Ensure the content type is set to application/json if possible, as this is the most common format expected by webhook handlers.
  5. Select Events: Choose the specific event(s) within the external service that should trigger this webhook notification (e.g., "Push events", "New email received", "Payment succeeded").
  6. Security (Secret Token - Recommended):
    • Most services allow you to define a secret token or key for the webhook.
    • Generate a strong, random secret and enter it into the external service's configuration.
    • Crucially: The corresponding webhook handler within the SLOP server (associated with the :provider and :webhook_id) must be configured with the exact same secret. This allows the SLOP server to verify that incoming requests genuinely originated from the configured external service by checking a signature header (e.g., X-Hub-Signature-256 for GitHub, Stripe-Signature for Stripe) sent with the webhook request.
    • Consult the SLOP server's provider-specific webhook handler documentation or implementation details on how to configure this secret verification.
  7. Activate: Save and enable the webhook configuration in the external service.

Payload Considerations

The external service will send data about the event in the body of the HTTP POST request, typically as a JSON payload. The structure of this payload is defined by the external service.

The SLOP server's webhook handler will parse this payload. When an EventJobTrigger is activated, this payload data is usually passed as the input (input field) to the background job (/api/jobs) that gets created. Your target taskflow (identified by job_process_id) should be designed to expect and process this input data accordingly.

Testing

After configuring the webhook in the external service and setting up the corresponding EventJobTrigger in SLOP:

  1. Trigger the relevant event in the external service (e.g., push code to GitHub, receive an email in Unipile).
  2. Monitor the SLOP server:
    • Check if a new job was created (GET /api/jobs).
    • Verify if the job executed the correct taskflow and processed the data as expected.
    • Examine server logs for any errors related to webhook reception or job triggering.

Documenting Server-Side Webhook Integration in SLOP

This section describes the internal process within the SLOP server when it receives an incoming webhook request from an external service, bridging the gap between the public endpoint and the job triggering mechanism.

1. Request Reception

An external service sends an HTTP POST request to the configured public endpoint:

POST /webhooks/:provider/:webhook_id

The SLOP server's web framework (e.g., Fiber) routes this request based on the path.

2. Handler Identification

The server uses the path parameters :provider and :webhook_id to identify the specific handler responsible for processing this type of webhook. This likely involves:

  • A routing mechanism that maps /:provider/:webhook_id patterns to specific Go functions or methods.
  • A central webhook manager (internal/webhook/manager.go?) that looks up registered handlers based on the provider and webhook ID.

If no handler is found matching the provided :provider and :webhook_id, the server should return an appropriate error, such as 404 Not Found.

3. Security Verification (Optional but Crucial)

If a secret token was configured during the client-side setup, this is the point where the server verifies the request's authenticity.

  • Extract Signature: The handler extracts the signature header sent by the external service (e.g., X-Hub-Signature-256, Stripe-Signature).
  • Retrieve Secret: The handler retrieves the expected secret token associated with this specific webhook configuration (:provider, :webhook_id) from its secure storage.
  • Compute Expected Signature: The handler computes the expected signature based on the received request body and the stored secret, using the algorithm specified by the external service (e.g., HMAC-SHA256).
  • Compare Signatures: The handler compares the signature extracted from the header with the computed expected signature.
  • Reject if Invalid: If the signatures do not match, the request is considered invalid or tampered with. The handler should reject the request immediately, typically with a 401 Unauthorized or 403 Forbidden status code, and log the security event. This prevents processing potentially malicious requests.

4. Payload Parsing

Assuming the request is authentic (or if no security is configured), the handler proceeds to parse the request body.

  • Content Type: It typically expects the body to be in JSON format (application/json).
  • Deserialization: The JSON payload is deserialized into a Go struct or map. The specific structure depends entirely on the data sent by the external service for the given event type.

5. Internal Event Publication

After successfully parsing the payload, the core integration step occurs: the handler publishes an internal event within the SLOP system.

  • Event Channel: The event is published onto a specific channel name. This channel name must correspond to the event_channel that EventJobTrigger configurations listen on. The convention is likely webhook:<provider>:<webhook_id> (e.g., webhook:unipile:new_email).
  • Event Payload: The payload of the internal event typically contains:
    • The parsed data from the original webhook request body.
    • Potentially, additional metadata added by the handler (e.g., timestamp of reception).
  • Event Bus: This publication likely happens via an internal event bus or messaging system within the SLOP server.

6. Trigger Activation and Job Creation

  • Listening Triggers: The EventJobTrigger system (internal/taskflow/trigger.go?) continuously monitors the internal event bus.
  • Matching: When an event is published on a channel (e.g., webhook:unipile:new_email), the system checks if any enabled EventJobTrigger configurations are listening on that exact event_channel.
  • Job Request: If a match is found, the trigger activates and initiates the creation of a new background job by effectively making a request similar to POST /api/jobs.
  • Job Parameters: The job creation request includes:
    • job_process_id: Taken directly from the matched EventJobTrigger configuration. This specifies which taskflow YAML definition to run.
    • name: A descriptive name for the job, potentially generated based on the trigger name or event type.
    • input: The payload of the internal event (which contains the original webhook data) is passed as the input data for the job.
    • Other parameters like user_id (if applicable, based on the trigger configuration or webhook context) and priority might also be set.

7. Job Execution

The newly created job is queued and eventually picked up by a worker. The worker loads the specified taskflow (job_process_id), initializes its execution context, and makes the job's input data (the original webhook payload) available within that context, typically accessible via a special key like ${INPUT.<field_name>} within the taskflow's YAML definitions. The taskflow then executes, processing the data received from the external event via the webhook.