30.04.2025-v2
MFO Slop Client - Detailed Documentation¶
version: 2.0
This document provides a comprehensive guide to the mfo-client
library, detailing its capabilities, internal mechanisms, and usage patterns with examples, incorporating information from the SLOP server API and taskflow definitions. It is intended for developers looking to interact with the Slop server API and leverage its task orchestration features.
1. Introduction¶
mfo-client
is a Go library designed as a client for the SLOP server, a platform focused on agentic workflows, task automation, and centralized interaction management. The client facilitates interaction with the server's REST API and integrates a powerful task workflow engine based on go-taskflow
. This allows developers to define complex, potentially multi-agent workflows using a declarative YAML format, execute them efficiently via the mfo-client
, and manage related server-side resources like jobs, events, memory, and prompts.
The library design emphasizes flexibility and human readability, particularly through its use of YAML for taskflow definitions and configuration. This aligns with preferences for systems where configurations are easily understood and modifiable by humans, potentially supporting dynamic adjustments in multi-agent orchestration scenarios. Clients delegate external interactions and persistence entirely to the SLOP server, promoting stateless client design.
2. Core Concepts¶
Understanding the following core components is crucial for effectively using mfo-client
.
2.1. API Client (internal/client
, internal/api
)¶
The heart of the library's interaction with the SLOP server is the API client. It encapsulates the logic for making HTTP requests to the server's various REST endpoints defined by the SLOP protocol.
- Initialization: The client is typically initialized with the server's base URL and authentication credentials (JWT or API Token obtained via
POST /auth/login
). Seeinternal/client/client.go
for initialization patterns. - Endpoint Abstraction: Specific API functionalities are abstracted into dedicated modules within
internal/api/
. These cover key SLOP server capabilities:chat.go
: Interacting with conversational endpoints (/api/chat
).tool.go
: Executing registered server-side tools (/api/tools/:tool_id
).memory.go
: Managing persistent key-value storage (/api/memory
).resource.go
: Managing files and other resources (/api/resources
).job.go
: Managing asynchronous background jobs (/api/jobs
).notification.go
: Managing user notifications (/api/notifications
).event.go
/ Config: Handling event triggers and configurations (/api/config/event-triggers
).
- Request/Response Handling: The client manages request serialization, response deserialization (often into structs defined in
internal/models
), authentication headers, and error handling, adhering to the SLOP server rules.
2.2. Authentication (internal/auth
)¶
Secure communication with the SLOP server's private /api/*
endpoints is handled by the authentication module. It supports JWT tokens (obtained via /auth/login
) or API tokens (managed via /api/tokens
), configured during client initialization and automatically included in the Authorization: Bearer <token>
header of outgoing requests.
2.3. Taskflow Engine (internal/taskflow
, go-taskflow
)¶
mfo-client
leverages the go-taskflow
library (github.com/noneback/go-taskflow) for defining, building, and executing task workflows locally within the client application, orchestrating calls to the SLOP server.
- YAML Definition: Workflows are defined declaratively in YAML files, following the structure outlined in
yaml-taskflow_directive.mdc
. This format describes tasks, their types (mapping to Go functions), configurations, dependencies, conditional logic, and subflows. - SLOP API Task Types: Specific task types are defined (or should be defined) in the
mfo-client
's Go code to correspond to SLOP API calls (e.g.,mfo_api_chat_send
,mfo_api_resource_get
,mfo_api_memory_store
). Examples are provided inslop_yaml_api_task_definitions.mdc
. A genericslop_tool_call
type might also exist or be implemented to call arbitrary/api/tools/:tool_id
endpoints. - Task Registry (
internal/taskflow/registry.go
): A central registry maps task type strings (from YAMLtype
field) to Go factory functions (taskflow.TaskFuncFactory
). These factories create the actual executable task functions, often requiring access to the initializedslopClient
instance (likely passed via context or closure) to make API calls. - Builder (
internal/taskflow/builder.go
): The builder takes a parsedFlowDefinition
(from YAML) and theTaskRegistry
to construct an executabletaskflow.TaskFlow
graph. It resolves dependencies and links tasks. - Executor (
go-taskflow.Executor
, potentially wrapped ininternal/taskflow/executor.go
): The executor runs theTaskFlow
graph. It manages task scheduling, respects dependencies, handles concurrency, and executes tasks (Go functions making SLOP API calls) potentially in parallel. - Context & Data Passing (
internal/taskflow/context.go
): An execution context is passed through the task execution. This context typically holds theslopClient
instance and a map for storing task outputs. Tasks access outputs from previous tasks using the${producing_task_id.output_key}
syntax within theirconfig
in the YAML, which the Go task implementation resolves by querying the context map. - Persistence (
internal/taskflow/persistence
): While the SLOP server handles primary persistence (memory, resources, jobs), this package within the client might relate to client-side caching or state management for the workflow execution itself, if needed.
2.4. Data Models (internal/models
)¶
This directory contains the Go struct definitions representing data exchanged with the SLOP API (requests/responses) and used within the taskflow engine (e.g., TaskflowDefinition
, TaskDefinition
). These align with the structures expected or returned by the SLOP server endpoints.
2.5. Prompt Management (internal/prompt
, internal/prompts
)¶
For agentic workflows involving LLMs via the SLOP server's /api/chat
endpoint, mfo-client
includes utilities for managing prompts used in ChatRequest
bodies:
- Parsing & Storage: Loading and managing prompt templates, potentially from Markdown files.
- Templating: Injecting variables (including data passed via
${task_id.output_key}
) into prompt templates before sending them in an API call.
3. Usage and Examples¶
3.1. API Client Usage (Go)¶
The core pattern involves initializing the client and calling its methods corresponding to SLOP API endpoints.
// Example (Conceptual - based on structure and API docs)
package main
import (
"context"
"log"
"gitlab.com/webigniter/slop-client/internal/client"
"gitlab.com/webigniter/slop-client/internal/auth"
"gitlab.com/webigniter/slop-client/internal/models"
)
func main() {
// Assume credentials obtained securely
serverURL := "http://localhost:8080" // Replace with actual Slop server URL
apiKey := "YOUR_API_KEY_OR_JWT"
// Configure authentication
authProvider := auth.NewStaticTokenProvider(apiKey)
// Initialize the client
slopClient, err := client.NewClient(serverURL,nil,authProvider)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
// Example: Retrieve a resource
resourceID := "some-resource-id"
resource, err := slopClient.API.GetResource(context.Background(), resourceID)
if err != nil {
log.Fatalf("Failed to get resource: %v", err)
}
log.Printf("Resource content: %s", resource.Content)
// Example: Store data in memory
memReq := models.StoreMemoryRequest{
Key: "my_data_key",
Value: map[string]interface{}{"info": "some data"},
ProviderID: "default_memory", // As required by API docs
}
_, err = slopClient.API.StoreMemory(context.Background(), memReq)
if err != nil {
log.Fatalf("Failed to store memory: %v", err)
}
log.Println("Data stored in memory.")
}
3.2. Taskflow Engine Usage (YAML + Go)¶
The primary way to orchestrate SLOP API calls is by defining workflows in YAML and executing them using the mfo-client
's taskflow engine.
Step 1: Define Workflow in YAML
Create a YAML file (e.g., process_email_workflow.yml
) using SLOP API task types.
name: "ProcessIncomingEmail"
description: "Analyzes an email, stores analysis, and tags the thread."
version: "1.0"
tasks:
- id: "get_email_content"
name: "Get Email Resource"
type: "mfo_api_resource_get" # Maps to a Go factory
config:
resource_id: "${INPUT.resource_id}" # Assume resource_id is provided as input to the flow
successors:
- "analyze_email"
- id: "analyze_email"
name: "Analyze Email with LLM"
type: "mfo_api_chat_send"
config:
request_body:
messages:
- role: "system"
content: "Analyze the following email content and extract the sender, subject, and a brief summary. Respond in JSON format {"sender": "...", "subject": "...", "summary": "..."}."
- role: "user"
content: "${get_email_content.resource.content}" # Get content from previous task
provider_id: "openai" # Example provider
dependencies: ["get_email_content"]
successors:
- "store_analysis"
- id: "store_analysis"
name: "Store Analysis in Memory"
type: "mfo_api_memory_store"
config:
key: "email_analysis_${INPUT.resource_id}"
# Assuming the chat response content is valid JSON string
value: "${analyze_email.response.content}"
provider_id: "default_memory"
dependencies: ["analyze_email"]
successors:
- "tag_email_thread" # Assuming we need a separate step to get thread_id
# Placeholder: Add task to extract thread_id if not available in INPUT or resource metadata
# - id: "extract_thread_id"
# type: "custom_regex_extractor" # Example custom task
# config:
# input_string: "${get_email_content.resource.metadata.email_thread_id}" # Example path
# regex: "..."
# dependencies: ["get_email_content"]
# successors: ["tag_email_thread"]
- id: "tag_email_thread"
name: "Tag Email Thread"
type: "mfo_api_tool_execute" # Using the generic tool executor type
config:
tool_id: "unipile_email_tag_thread" # Specific SLOP tool
arguments:
tag_name: "ai_analyzed"
# Assuming thread_id comes from input or another task like extract_thread_id
thread_id: "${INPUT.thread_id}"
dependencies: ["store_analysis"] # Depends on analysis being stored, and thread_id being available
Step 2: Register Task Types (Go)
In your mfo-client
application, register Go factory functions for each type
used in the YAML. These factories often need the slopClient
.
package main
import (
"context"
"encoding/json"
"fmt"
"gitlab.com/webigniter/slop-client/internal/client"
"gitlab.com/webigniter/slop-client/internal/models"
"gitlab.com/webigniter/slop-client/internal/taskflow"
// ... other necessary imports
)
// Define a context structure to hold client and outputs
type FlowContext struct {
SlopClient *client.Client
Outputs map[string]map[string]interface{}
Inputs map[string]interface{} // For initial inputs like resource_id
// Add mutex if concurrent tasks write to Outputs
}
// --- Task Factories ---
func slopApiResourceGetFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
return func(ctxInterface interface{}) {
ctx := ctxInterface.(*FlowContext)
taskID := "get_email_content" // Assuming task ID is known or passed
// Resolve config parameters (handle ${...} syntax)
resourceIDRaw, _ := taskflow.ResolveConfigValue(config["resource_id"], ctx.Outputs, ctx.Inputs)
resourceID := resourceIDRaw.(string)
resource, err := slopClient.API.GetResource(context.Background(), resourceID)
if err != nil {
// Handle error appropriately (log, set error state in context)
fmt.Printf("Task %s failed: %v\n", taskID, err)
return
}
// Store output
taskflow.SetOutput(ctx.Outputs, taskID, "resource", resource)
fmt.Printf("Task %s completed.\n", taskID)
}, nil
}
}
func slopApiChatSendFactory(slopClient *client.Client) taskflow.TaskFuncFactory {
return func(config map[string]interface{}) (taskflow.TaskFunc, error) {
return func(ctxInterface interface{}) {
ctx := ctxInterface.(*FlowContext)
taskID := "analyze_email"
// Resolve config (complex: needs deep resolution for request_body)
resolvedConfig, err := taskflow.ResolveConfigMap(config, ctx.Outputs, ctx.Inputs)
if err != nil { /* handle error */ return }
var chatReq models.ChatRequest
// Use helper to map resolvedConfig["request_body"] to chatReq struct
// mapstructure.Decode(resolvedConfig["request_body"], &chatReq)
chatRespMap, err := slopClient.API.SendChat(context.Background(), chatReq) // Assuming SendChat exists
if err != nil { /* handle error */ return }
taskflow.SetOutput(ctx.Outputs, taskID, "response", chatRespMap)
fmt.Printf("Task %s completed.\n", taskID)
}, nil
}
}
// ... Implement factories for mfo_api_memory_store, mfo_api_tool_execute etc. ...
// These factories will parse their specific 'config', resolve ${...} values,
// call the appropriate slopClient.API method, and store results in ctx.Outputs.
func main() {
// ... Initialize slopClient ...
registry := taskflow.NewRegistry()
// Pass slopClient to factories
registry.RegisterTask("mfo_api_resource_get", slopApiResourceGetFactory(slopClient))
registry.RegisterTask("mfo_api_chat_send", slopApiChatSendFactory(slopClient))
// ... register other factories ...
// ... next steps: load YAML, build, execute ...
}
Step 3: Load, Build, and Execute (Go)
Load the YAML, create the TaskFlow
, prepare the initial context, and run it.
// ... (inside main, after registry setup)
import (
"log"
"os"
"runtime"
gotaskflow "github.com/noneback/go-taskflow"
"gopkg.in/yaml.v3"
// ... other imports
)
// Load YAML
yamlData, err := os.ReadFile("process_email_workflow.yml")
// ... handle error ...
var flowDef *taskflow.FlowDefinition
err = yaml.Unmarshal(yamlData, &flowDef)
// ... handle error ...
// Create Builder
builder := taskflow.NewBuilder(registry)
// Build TaskFlow
taskFlow, err := builder.BuildFlow(flowDef)
// ... handle error ...
// Create Executor
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
// Prepare Initial Context
initialContext := &FlowContext{
SlopClient: slopClient,
Outputs: make(map[string]map[string]interface{}),
Inputs: map[string]interface{}{ // Example inputs
"resource_id": "email-resource-123",
"thread_id": "unipile-thread-abc",
},
}
// Execute the flow
log.Println("Executing taskflow...")
executor.Run(taskFlow, gotaskflow.WithContext(initialContext)).Wait()
log.Println("Taskflow execution complete.")
// Access final outputs if needed: initialContext.Outputs
// Optional: Dump graph
// taskFlow.Dump(os.Stdout)
(Note: Helper functions like taskflow.ResolveConfigValue
, taskflow.ResolveConfigMap
, taskflow.SetOutput
are assumed to exist within the internal/taskflow
package to handle data passing.)
3.3. Command-Line Utilities¶
taskflow-yaml-checker
: Validates YAML workflow definitions.taskflow-visualizer
: Generates a visual graph (requires Graphvizdot
).
3.4. Services (Webhook Trigger)¶
Incoming webhooks (POST /webhooks/:provider/:webhook_id
) are handled by the SLOP server. These typically trigger server-side logic, which might involve starting a predefined SLOP Job (POST /api/jobs
) that executes a specific process_id
(which could be a taskflow defined in YAML and run by a mfo-client
instance or similar executor within the server environment).
The services/unipile_mailer_webhook
example in mfo-client
likely shows how a client-side application could listen for webhooks and then use the taskflow engine, but according to SLOP server rules, this responsibility is ideally centralized on the server.
4. Handling Notifications and Webhooks within Taskflows¶
Taskflows executed by mfo-client
can interact with SLOP's notification and event systems:
-
Sending User Notifications: A task (e.g.,
mfo_api_notification_create
type mapping toPOST /api/notifications
) can be added to the YAML workflow to create a notification for a user upon reaching a certain stage.- id: "notify_user_completion" type: "mfo_api_notification_create" # Assumes this type exists config: # Request body for POST /api/notifications user_id: "${INPUT.user_id}" # Or retrieved from context title: "Email Processing Complete" message: "Analysis for resource ${INPUT.resource_id} stored." level: "info" dependencies: ["store_analysis"]
-
Dynamically Configuring Server-Side Triggers (e.g., for Webhooks): The SLOP server allows configuring triggers (
/api/config/event-triggers
) that link server events (likejob.completed
,resource.created
) to actions (like starting another job or calling an external webhook URL). A taskflow can dynamically create or update these triggers.This task configures the SLOP server: "When this job completes, send a POST request to the specified URL". The webhook call itself is handled by the server later, not directly by the taskflow.- id: "setup_completion_webhook" type: "mfo_api_event_trigger_create" # Assumes this type exists, mapping to POST /api/config/event-triggers config: # Request body for the trigger endpoint event_type: "job.completed" # Or another relevant event filter: "job.id == ${SYSTEM.JOB_ID}" # Filter for the current job completion action_type: "webhook" action_config: url: "https://my-external-service.com/webhook-receiver" method: "POST" headers: { "Content-Type": "application/json" } body_template: '{"jobId": "${event.job.id}", "status": "${event.job.status}", "result": "${retrieve_job_result.output}"}' dependencies: ["start_main_processing"] # Run early in the flow # Note: Requires a task 'retrieve_job_result' if result needed in webhook body
-
Calling External Webhooks Directly from Taskflow: If the SLOP server doesn't provide a suitable trigger or tool, you might need to:
- Create a Custom Go Task Type: Implement a task factory in
mfo-client
(e.g.,httpClientPostFactory
) that uses Go'snet/http
client to make the webhook call directly. Register this type. - Use a Generic SLOP Tool: If the SLOP server has a generic tool (e.g.,
system_http_request
) exposed via/api/tools/system_http_request
, use themfo_api_tool_execute
task type in your YAML to call it.
- Create a Custom Go Task Type: Implement a task factory in
5. Advanced Topics & Considerations¶
- Custom Task Implementation: Define custom Go task factories for logic not covered by standard SLOP API calls (e.g., complex data transformation, custom branching logic).
- Error Handling: Implement error handling within task functions (checking API call errors) and potentially define conditional paths or specific error-handling tasks in the YAML workflow.
- State Management: Rely primarily on the SLOP server's Memory (
/api/memory
) and Resource (/api/resources
) endpoints for state persistence between taskflow runs or across different workflows. - Configuration: Manage SLOP server URLs, API keys, and taskflow configurations securely.
- Multi-Agent Orchestration: Use taskflows to define the sequence of actions for an agent. Different agents might be represented by different taskflows or custom tasks within a larger flow. State sharing happens via the SLOP server's memory/resource APIs.
6. Conclusion¶
mfo-client
provides a powerful way to orchestrate interactions with a SLOP server using declarative YAML workflows executed via the go-taskflow
engine. By defining tasks that map to SLOP API calls and leveraging the server's centralized capabilities for persistence, tool execution, and event handling, developers can build complex, robust, and maintainable automation and agentic applications.
7. References¶
- go-taskflow Library: https://github.com/noneback/go-taskflow
- SLOP Server API Documentation: (Provided:
mfo_api_docu.mdc
) - SLOP Server Rules: (Provided:
slop_server_rules.mdc
) - SLOP YAML Taskflow Directives: (Provided:
yaml-taskflow_directive.mdc
) - SLOP YAML API Task Definitions: (Provided:
slop_yaml_api_task_definitions.mdc
) - YAML Specification: https://yaml.org/spec/
- Graphviz: https://graphviz.org/
2.3.1. Detailed YAML Taskflow Structure¶
SLOP Client YAML Taskflow Directives (Updated)¶
This document defines the schema and conventions for defining taskflows in YAML files within the SLOP Client project, utilizing the internal/taskflow
package which leverages github.com/noneback/go-taskflow
.
Core Principles¶
- YAML as Definition: The YAML file is the primary definition of the workflow, including task sequence, dependencies, parameters, subflows, and conditional logic.
- Explicit Data Flow: Data dependencies between tasks are explicitly defined using a specific output referencing syntax (
${producing_task_id.output_key}
). - Generic Tool Execution: A generic task type (
slop_tool_call
ormfo_api_tool_execute
) allows executing arbitrary SLOP Server tools defined in the YAML. - Custom Logic Tasks: Specific Go task factories are used for logic that doesn't map directly to a single SLOP Server tool call (e.g., data parsing, complex branching logic, custom condition evaluation).
- Shared Output Context: A runtime context stores outputs from completed tasks, enabling subsequent tasks and conditions to access them.
- Modularity: Subflows allow grouping related tasks, and conditions enable dynamic branching based on runtime data.
YAML File Structure¶
A taskflow YAML file must have the following top-level structure:
name: "UniqueFlowName" # Required: Identifier for the taskflow.
description: "Human-readable description." # Optional: Purpose of the flow.
version: "1.0" # Optional: Version of the flow definition.
config: # Optional: Global configuration for the flow execution.
max_concurrency: 5 # Optional: Hint for executor concurrency.
timeout: "5m" # Optional: Overall flow timeout hint (implementation status may vary).
tasks: # Required: List of individual task definitions.
- id: "task_id_1"
# ... task fields ...
- id: "task_id_2"
# ... task fields ...
subflows: # Optional: List of subflow definitions (groups of tasks).
- id: "subflow_id_1"
# ... subflow fields ...
conditions: # Optional: List of condition definitions (branching points).
- id: "condition_id_1"
# ... condition fields ...
Task Definition (tasks[]
)¶
Each item in the tasks
list defines a single unit of work.
- id: "unique_task_id" # Required: Unique identifier for this task within the flow (or subflow).
name: "Descriptive Task Name" # Optional: Human-readable name.
description: "What this task does." # Optional: Longer description.
type: "task_type_name" # Required: Maps to a registered TaskFactory in Go.
config: # Optional: Configuration passed to the TaskFactory/TaskFunc.
key1: value1
key2: "${previous_task_id.output_key}" # Example data reference
# Structure depends on the 'type'
dependencies: # Optional: List of task/subflow/condition IDs that must complete before this task starts.
- "dependency_id_1"
- "dependency_id_2"
successors: # Optional: List of task/subflow/condition IDs that depend on this task's completion.
- "successor_id_1"
priority: "normal" # Optional: Execution priority ("low", "normal", "high"). Defaults to "normal".
Generic Task Type: slop_tool_call
/ mfo_api_tool_execute
¶
This special task type
executes a specified SLOP Server tool.
- id: "call_some_tool"
type: "mfo_api_tool_execute" # Or slop_tool_call
config:
tool_id: "specific_slop_tool_endpoint" # Required: The SLOP Server tool ID (e.g., "unipile_email_tag_thread").
arguments: # Required: Arguments for the tool.
arg_name_1: "literal_value"
arg_name_2: ${task_id_producing_value.output_key} # Reference output
# ... other arguments required by the tool
dependencies: ["task_id_producing_value"]
outputs: # Document expected outputs for clarity
- call_some_tool.result # Example output key
Data Passing Convention¶
- Output Storage: Tasks write their results to a shared execution context (e.g.,
map[taskID][outputKey] = value
). Output keys should be documented or standardized for each task type. - Output Referencing: Within a task's or condition's
config
map, values matching the pattern${producing_task_id.output_key}
will be dynamically replaced at runtime with the corresponding value from the shared context. - Resolution: The Go task or condition implementation is responsible for parsing the config, detecting the
${...}
syntax, and retrieving the value from the context. - Literals: Any config value not matching the
${...}
pattern is treated as a literal value (string, number, boolean, etc.).
Example Data Flow¶
tasks:
- id: "extract_data"
type: "custom_extractor" # Assumes this task outputs {"user_id": 123, "order_id": "abc"}
config:
source: "input_payload.json"
successors: ["process_user", "process_order"]
outputs:
- extract_data.user_id
- extract_data.order_id
- id: "process_user"
type: "mfo_api_tool_execute"
config:
tool_id: "user_service_update_profile"
arguments:
user_identifier: ${extract_data.user_id} # Get user_id from extract_data output
new_status: "processed"
dependencies: ["extract_data"]
- id: "process_order"
type: "mfo_api_tool_execute"
config:
tool_id: "order_service_fetch_details"
arguments:
order_ref: ${extract_data.order_id} # Get order_id from extract_data output
dependencies: ["extract_data"]
Subflow Definition (subflows[]
)¶
Defines a group of tasks that can be treated as a single node in the main flow's dependency graph, promoting modularity and simplifying complex workflows.
- id: "unique_subflow_id" # Required: Unique identifier for this subflow within the entire workflow.
name: "Descriptive Subflow Name" # Optional: Human-readable name.
description: "Purpose of this subflow." # Optional: Longer description.
tasks: # Required: List of task definitions internal to this subflow.
- id: "internal_task_1" # IDs only need to be unique within this subflow.
type: "some_task_type"
config:
input: "${external_task.output}" # Can access outputs from outside the subflow
dependencies: []
successors: ["internal_task_2"]
# ... other task fields
- id: "internal_task_2"
type: "another_task_type"
dependencies: ["internal_task_1"]
# ... other task fields
outputs:
- internal_task_2.result # Output can be accessed externally
dependencies: # Optional: Dependencies on nodes outside the subflow.
- "external_dependency_id"
successors: # Optional: External nodes that depend on this subflow's completion.
- "external_successor_id"
priority: "normal" # Optional: Priority for the entire subflow.
Key Points for Subflows:
- Encapsulation: Groups related tasks into a logical unit.
- Scoping: Task IDs within a subflow are local to that subflow.
- Dependencies: Subflows can depend on external nodes, and external nodes can depend on the completion of the entire subflow.
- Execution: A subflow starts after its external dependencies are met and completes only when all its internal tasks finish successfully.
- Data Access:
- Internal tasks access each other's outputs using
${internal_task_id.output_key}
. - Internal tasks can access outputs from tasks outside the subflow using
${external_task_id.output_key}
. - External tasks can access outputs from tasks inside the subflow using
${internal_task_id.output_key}
(assuming the taskflow engine makes these accessible, potentially prefixed likesubflow_id.internal_task_id
- verification needed).
- Internal tasks access each other's outputs using
Condition Definition (conditions[]
)¶
Defines a branching point in the flow based on a condition evaluated at runtime using data from the execution context.
- id: "unique_condition_id"
name: "Descriptive Condition Name"
description: "Decision logic description."
type: "condition_evaluator_type" # Required: Maps to a registered ConditionFactory in Go.
config: # Optional: Configuration for the condition evaluator.
input_value: ${previous_task.some_result} # Pass data needed for evaluation
threshold: 100
dependencies: # Optional: Nodes that must complete before evaluation.
- "previous_task"
successors: # Required: List of node IDs representing the branches. Order matters.
- "branch_0_task_id" # Executes if ConditionFunc returns 0
- "branch_1_task_id" # Executes if ConditionFunc returns 1
# ... up to N branches
# default_branch: "branch_id" # Optional: Behavior depends on go-taskflow support and implementation.
Key Points for Conditions:
- Branching: Enables dynamic workflow paths based on runtime data.
- Evaluation: A Go function (
ConditionFunc
), determined by thetype
, evaluates the condition after itsdependencies
are met. - Go Function: The
ConditionFunc
receives the execution context, accesses itsconfig
(resolving${...}
references), and must return auint
(non-negative integer). - Successor Selection: The returned
uint
acts as a zero-based index into thesuccessors
list. Only the node at that index is executed. - Out-of-Bounds/Default: The behavior if the returned index is invalid or if a
default_branch
is specified needs clarification based on the specificgo-taskflow
version and SLOP client implementation.
Parallelism and Complex Patterns (e.g., Map/Reduce)¶
- The
${task_id.output_key}
syntax is fundamental for defining dependencies. - For map/reduce-like patterns:
- Multiple parallel tasks (map phase) can run concurrently if they don't depend on each other directly.
- A subsequent aggregation task (reduce phase) would list all parallel map tasks in its
dependencies
. - The aggregation task's Go implementation needs to be designed to gather outputs from all preceding map tasks from the execution context (e.g., by knowing the map task IDs or querying the context for outputs matching a pattern).
This requires careful design of the custom aggregation task and potentially extending the context interaction logic.
4. Webhook Integration¶
This section details how the SLOP server and client interact to enable workflows triggered by external events via webhooks.
Documenting Webhook Registration and Triggering in SLOP¶
This section details how the SLOP server handles incoming webhooks and how to configure the system to trigger specific taskflows (jobs) based on these external events.
Overview: From Webhook to Job¶
The process involves several steps connecting an external event to a SLOP taskflow execution:
- External Event Source: An external service (e.g., GitHub, Stripe, a custom application) sends an HTTP POST request (a webhook) to a specific public URL on the SLOP server when a particular event occurs in that service.
- SLOP Public Webhook Endpoint: The SLOP server exposes a generic public endpoint:
POST /webhooks/:provider/:webhook_id
. The:provider
and:webhook_id
path parameters identify the source and specific configuration. - Webhook Handling & Internal Event: The SLOP server receives the incoming request. The internal handler associated with the
:provider
and:webhook_id
parses the request. It's assumed this handler then publishes the relevant event data onto an internal event channel within the SLOP system. The naming convention for this channel might be something likewebhook:<provider>:<webhook_id>
or depend on the specific provider implementation. - Event Trigger Configuration: You configure an
EventJobTrigger
within the SLOP server using the private API endpoints under/api/config/event-triggers
. This configuration acts as a listener. - Trigger Matching: The
EventJobTrigger
listens on a specificevent_channel
. When an event is published on a channel matching the trigger's configuration, the trigger activates. - Job Creation: Upon activation, the trigger automatically creates and queues a new background job (
POST /api/jobs
). The job will execute the taskflow defined by thejob_process_id
specified in the trigger configuration. The payload from the original webhook event is typically passed as input data to the newly created job.
Configuring Event Triggers (Server-Side Registration)¶
The core of registering a webhook handler involves creating an EventJobTrigger
configuration. This tells the SLOP server which taskflow (job process) to run when an event arrives on a specific channel (originating from a webhook).
API Endpoints¶
Management of these triggers is done via the following private API endpoints (requiring authentication):
POST /api/config/event-triggers
: Creates a new event trigger configuration.GET /api/config/event-triggers
: Lists existing trigger configurations (supports filtering bychannel
andenabled
status).GET /api/config/event-triggers/:triggerID
: Retrieves details of a specific trigger by its numeric ID.PUT /api/config/event-triggers/:triggerID
: Updates an existing trigger configuration.DELETE /api/config/event-triggers/:triggerID
: Deletes a trigger configuration.
EventJobTrigger
Object Structure¶
When creating (POST) or updating (PUT) a trigger, you need to provide an object with the following key fields (based on API documentation):
event_channel
(string, required): The name of the internal event channel the trigger should listen to. This must match the channel where the corresponding webhook handler publishes events (e.g.,webhook:github:push
,webhook:unipile:new_email
).job_process_id
(string, required): The identifier of the taskflow (YAML definitionname
) that should be executed when the trigger activates.name
(string, optional): A descriptive name for the trigger configuration.description
(string, optional): A more detailed description.enabled
(boolean, optional, defaults to true): Whether the trigger is active.metadata
(object, optional): Custom key-value pairs for additional information or configuration.
Note: The server assigns a unique numeric id
upon creation.
Creating a Trigger via API Call¶
You would typically use an HTTP client (like the mfo-client
library or curl
) authenticated with the SLOP server to make a POST request to /api/config/event-triggers
with a JSON body representing the EventJobTrigger
object.
Example Request Body (JSON):
{
"name": "Trigger Email Processing on New Unipile Email",
"event_channel": "webhook:unipile:new_email",
"job_process_id": "ProcessIncomingEmail",
"enabled": true,
"metadata": {
"priority": "normal"
}
}
Creating a Trigger via Taskflow (Conceptual)¶
While direct API calls work, you could also potentially manage these triggers using a dedicated task type within a SLOP taskflow itself. This allows for infrastructure-as-code style management of your triggers.
Conceptual YAML Task Definition:
- id: "register_email_webhook_trigger"
name: "Register Unipile Email Trigger"
type: "mfo_api_config_create_event_trigger" # Hypothetical task type
config:
# Fields matching the EventJobTrigger object
name: "Trigger Email Processing on New Unipile Email"
event_channel: "webhook:unipile:new_email"
job_process_id: "ProcessIncomingEmail" # Name of the target taskflow YAML
enabled: true
metadata:
managed_by: "taskflow"
dependencies: []
outputs:
- register_email_webhook_trigger.trigger # The created trigger object from API response
(The actual implementation of the mfo_api_config_create_event_trigger
task factory in Go would handle calling the POST /api/config/event-triggers
endpoint using the provided config.)
Documenting Client-Side Webhook Setup for SLOP¶
This section explains how to configure an external service (the "client" in this context) to send webhooks to your SLOP server instance. This is necessary for triggering automated workflows based on events happening outside of SLOP.
Identifying the Target SLOP Webhook URL¶
The external service needs to send its webhook notifications (typically HTTP POST requests) to a specific public URL provided by the SLOP server. The general format for this URL is:
Components:
<SLOP_SERVER_BASE_URL>
: This is the main address where your SLOP server is accessible publicly (e.g.,https://slop.yourcompany.com
)./webhooks/
: This is a fixed path segment indicating the webhook receiver endpoint.:provider
: This path parameter identifies the type or source of the external service sending the webhook. Examples could includegithub
,stripe
,unipile
,your_custom_app
. The SLOP server must have a handler configured internally that recognizes this provider ID.:webhook_id
: This path parameter provides a unique identifier for this specific webhook configuration. It allows you to distinguish between different events or configurations from the same provider. For example, you might havegithub/repo_push
andgithub/issue_created
as distinct webhook IDs.
Determining :provider
and :webhook_id
:
The specific values you use for :provider
and :webhook_id
are crucial. They must correspond directly to the event_channel
configured in the EventJobTrigger
on the SLOP server side. Typically, the event_channel
follows a pattern like webhook:<provider>:<webhook_id>
. You need to ensure the URL parameters match the channel the trigger is listening on.
Example URL:
If your SLOP server is at https://slop.internal.net
, and you want to receive notifications from Unipile for new emails, and your corresponding EventJobTrigger
listens on the channel webhook:unipile:new_email
, the URL to configure in Unipile would be:
Configuring the External Service¶
While the exact steps vary greatly depending on the external service, the general process involves:
- Access Webhook Settings: Log in to the external service and navigate to its settings or developer section where webhooks can be configured (often called "Webhooks", "Notifications", or "API Integrations").
- Add New Webhook: Create a new webhook configuration.
- Payload URL: Enter the full SLOP Webhook URL constructed as described above into the "Payload URL", "Endpoint URL", or similar field.
- Content Type: Ensure the content type is set to
application/json
if possible, as this is the most common format expected by webhook handlers. - Select Events: Choose the specific event(s) within the external service that should trigger this webhook notification (e.g., "Push events", "New email received", "Payment succeeded").
- Security (Secret Token - Recommended):
- Most services allow you to define a secret token or key for the webhook.
- Generate a strong, random secret and enter it into the external service's configuration.
- Crucially: The corresponding webhook handler within the SLOP server (associated with the
:provider
and:webhook_id
) must be configured with the exact same secret. This allows the SLOP server to verify that incoming requests genuinely originated from the configured external service by checking a signature header (e.g.,X-Hub-Signature-256
for GitHub,Stripe-Signature
for Stripe) sent with the webhook request. - Consult the SLOP server's provider-specific webhook handler documentation or implementation details on how to configure this secret verification.
- Activate: Save and enable the webhook configuration in the external service.
Payload Considerations¶
The external service will send data about the event in the body of the HTTP POST request, typically as a JSON payload. The structure of this payload is defined by the external service.
The SLOP server's webhook handler will parse this payload. When an EventJobTrigger
is activated, this payload data is usually passed as the input (input
field) to the background job (/api/jobs
) that gets created. Your target taskflow (identified by job_process_id
) should be designed to expect and process this input data accordingly.
Testing¶
After configuring the webhook in the external service and setting up the corresponding EventJobTrigger
in SLOP:
- Trigger the relevant event in the external service (e.g., push code to GitHub, receive an email in Unipile).
- Monitor the SLOP server:
- Check if a new job was created (
GET /api/jobs
). - Verify if the job executed the correct taskflow and processed the data as expected.
- Examine server logs for any errors related to webhook reception or job triggering.
- Check if a new job was created (
Documenting Server-Side Webhook Integration in SLOP¶
This section describes the internal process within the SLOP server when it receives an incoming webhook request from an external service, bridging the gap between the public endpoint and the job triggering mechanism.
1. Request Reception¶
An external service sends an HTTP POST request to the configured public endpoint:
The SLOP server's web framework (e.g., Fiber) routes this request based on the path.
2. Handler Identification¶
The server uses the path parameters :provider
and :webhook_id
to identify the specific handler responsible for processing this type of webhook. This likely involves:
- A routing mechanism that maps
/:provider/:webhook_id
patterns to specific Go functions or methods. - A central webhook manager (
internal/webhook/manager.go
?) that looks up registered handlers based on the provider and webhook ID.
If no handler is found matching the provided :provider
and :webhook_id
, the server should return an appropriate error, such as 404 Not Found
.
3. Security Verification (Optional but Crucial)¶
If a secret token was configured during the client-side setup, this is the point where the server verifies the request's authenticity.
- Extract Signature: The handler extracts the signature header sent by the external service (e.g.,
X-Hub-Signature-256
,Stripe-Signature
). - Retrieve Secret: The handler retrieves the expected secret token associated with this specific webhook configuration (
:provider
,:webhook_id
) from its secure storage. - Compute Expected Signature: The handler computes the expected signature based on the received request body and the stored secret, using the algorithm specified by the external service (e.g., HMAC-SHA256).
- Compare Signatures: The handler compares the signature extracted from the header with the computed expected signature.
- Reject if Invalid: If the signatures do not match, the request is considered invalid or tampered with. The handler should reject the request immediately, typically with a
401 Unauthorized
or403 Forbidden
status code, and log the security event. This prevents processing potentially malicious requests.
4. Payload Parsing¶
Assuming the request is authentic (or if no security is configured), the handler proceeds to parse the request body.
- Content Type: It typically expects the body to be in JSON format (
application/json
). - Deserialization: The JSON payload is deserialized into a Go struct or map. The specific structure depends entirely on the data sent by the external service for the given event type.
5. Internal Event Publication¶
After successfully parsing the payload, the core integration step occurs: the handler publishes an internal event within the SLOP system.
- Event Channel: The event is published onto a specific channel name. This channel name must correspond to the
event_channel
thatEventJobTrigger
configurations listen on. The convention is likelywebhook:<provider>:<webhook_id>
(e.g.,webhook:unipile:new_email
). - Event Payload: The payload of the internal event typically contains:
- The parsed data from the original webhook request body.
- Potentially, additional metadata added by the handler (e.g., timestamp of reception).
- Event Bus: This publication likely happens via an internal event bus or messaging system within the SLOP server.
6. Trigger Activation and Job Creation¶
- Listening Triggers: The
EventJobTrigger
system (internal/taskflow/trigger.go
?) continuously monitors the internal event bus. - Matching: When an event is published on a channel (e.g.,
webhook:unipile:new_email
), the system checks if any enabledEventJobTrigger
configurations are listening on that exactevent_channel
. - Job Request: If a match is found, the trigger activates and initiates the creation of a new background job by effectively making a request similar to
POST /api/jobs
. - Job Parameters: The job creation request includes:
job_process_id
: Taken directly from the matchedEventJobTrigger
configuration. This specifies which taskflow YAML definition to run.name
: A descriptive name for the job, potentially generated based on the trigger name or event type.input
: The payload of the internal event (which contains the original webhook data) is passed as the input data for the job.- Other parameters like
user_id
(if applicable, based on the trigger configuration or webhook context) andpriority
might also be set.
7. Job Execution¶
The newly created job is queued and eventually picked up by a worker. The worker loads the specified taskflow (job_process_id
), initializes its execution context, and makes the job's input
data (the original webhook payload) available within that context, typically accessible via a special key like ${INPUT.<field_name>}
within the taskflow's YAML definitions. The taskflow then executes, processing the data received from the external event via the webhook.