Workflow Management API

This document outlines the API endpoints for managing workflows in PySpur.

Create Workflow

Description: Creates a new workflow. If no definition is provided, creates a default workflow with an input node. For chatbots, creates a workflow with required input/output fields for handling chat interactions. The workflow name will be made unique if a workflow with the same name already exists.

URL: /wf/

Method: POST

Request Payload:

class WorkflowCreateRequestSchema(BaseModel):
    name: str  # Name of the workflow
    description: str = ""  # Description of the workflow
    definition: Optional[WorkflowDefinitionSchema] = None  # Definition of the workflow

Where WorkflowDefinitionSchema contains:

class WorkflowDefinitionSchema(BaseModel):
    nodes: List[WorkflowNodeSchema]  # List of nodes in the workflow
    links: List[WorkflowLinkSchema]  # List of links between nodes
    test_inputs: List[Dict[str, Any]] = []  # Test inputs for the workflow
    spur_type: SpurType = SpurType.WORKFLOW  # Type of workflow (WORKFLOW, CHATBOT, AGENT)

Response Schema:

class WorkflowResponseSchema(BaseModel):
    id: str  # Workflow ID
    name: str  # Name of the workflow
    description: Optional[str]  # Description of the workflow
    definition: WorkflowDefinitionSchema  # Definition of the workflow
    created_at: datetime  # When the workflow was created
    updated_at: datetime  # When the workflow was last updated

Update Workflow

Description: Updates an existing workflow’s definition, name, and description. The workflow definition is required for updates. This endpoint allows for modifying the structure and behavior of a workflow.

URL: /wf/{workflow_id}/

Method: PUT

Parameters:

workflow_id: str  # ID of the workflow to update

Request Payload: Same as Create Workflow

Response Schema: Same as Create Workflow

List Workflows

Description: Lists all workflows with pagination support, ordered by creation date descending. Only valid workflows that can be properly validated are included in the response.

URL: /wf/

Method: GET

Query Parameters:

page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)

Response Schema:

List[WorkflowResponseSchema]

Get Workflow

Description: Retrieves a specific workflow by its ID, including its complete definition, metadata, and timestamps.

URL: /wf/{workflow_id}/

Method: GET

Parameters:

workflow_id: str  # ID of the workflow to retrieve

Response Schema: Same as Create Workflow

Reset Workflow

Description: Resets a workflow to its initial state with just an input node. This is useful when you want to start over with a workflow design without deleting and recreating it.

URL: /wf/{workflow_id}/reset/

Method: PUT

Parameters:

workflow_id: str  # ID of the workflow to reset

Response Schema: Same as Create Workflow

Delete Workflow

Description: Deletes a workflow and its associated test files. This operation is permanent and will remove all data related to the workflow, including test files stored in the file system.

URL: /wf/{workflow_id}/

Method: DELETE

Parameters:

workflow_id: str  # ID of the workflow to delete

Response: 204 No Content

Duplicate Workflow

Description: Creates a copy of an existing workflow with “(Copy)” appended to its name. This is useful for creating variations of a workflow without modifying the original.

URL: /wf/{workflow_id}/duplicate/

Method: POST

Parameters:

workflow_id: str  # ID of the workflow to duplicate

Response Schema: Same as Create Workflow

Get Workflow Output Variables

Description: Retrieves the output variables (leaf nodes) of a workflow, including their node IDs and variable names. This is useful for understanding what outputs are available from a workflow.

URL: /wf/{workflow_id}/output_variables/

Method: GET

Parameters:

workflow_id: str  # ID of the workflow

Response Schema:

List[Dict[str, str]]  # List of output variables with node IDs and variable names

Each dictionary in the list contains:

{
    "node_id": str,  # ID of the node
    "variable_name": str,  # Name of the output variable
    "prefixed_variable": str  # Variable name prefixed with node ID (node_id-variable_name)
}

Upload Test Files

Description: Uploads test files for a specific node in a workflow and returns their paths. The files are stored in a workflow-specific directory and can be used as inputs for testing the workflow.

URL: /wf/upload_test_files/

Method: POST

Form Data:

workflow_id: str  # ID of the workflow
files: List[UploadFile]  # List of files to upload
node_id: str  # ID of the node to associate files with

Response Schema:

Dict[str, List[str]]  # Dictionary mapping node ID to list of file paths

Example:

{
    "node_id": ["test_files/workflow_id/timestamp_filename.ext", ...]
}

Get Workflow Versions

Description: Retrieves all versions of a workflow, ordered by version number descending, with pagination support. This allows tracking the evolution of a workflow over time and reverting to previous versions if needed.

URL: /wf/{workflow_id}/versions/

Method: GET

Parameters:

workflow_id: str  # ID of the workflow
page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)

Response Schema:

List[WorkflowVersionResponseSchema]

Where WorkflowVersionResponseSchema contains:

class WorkflowVersionResponseSchema(BaseModel):
    version: int  # Version number
    name: str  # Name of the workflow version
    description: Optional[str]  # Description of the workflow version
    definition: Any  # Definition of the workflow version
    definition_hash: str  # Hash of the definition for tracking changes
    created_at: datetime  # When the version was created
    updated_at: datetime  # When the version was last updated

List Paused Workflows

Description: Lists all workflows that are currently in a paused state, with pagination support. This endpoint is useful for monitoring workflows that require human intervention.

URL: /wf/paused_workflows/

Method: GET

Query Parameters:

page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)

Response Schema:

List[PausedWorkflowResponseSchema]

Where PausedWorkflowResponseSchema contains:

class PausedWorkflowResponseSchema(BaseModel):
    run: RunResponseSchema  # Information about the workflow run
    current_pause: PauseHistoryResponseSchema  # Details about the current pause state
    workflow: WorkflowDefinitionSchema  # The workflow definition

Get Pause History

Description: Retrieves the pause history for a specific workflow run, showing when and why the workflow was paused, and any actions taken to resume it.

URL: /wf/pause_history/{run_id}/

Method: GET

Parameters:

run_id: str  # ID of the workflow run

Response Schema:

List[PauseHistoryResponseSchema]

Where PauseHistoryResponseSchema contains:

class PauseHistoryResponseSchema(BaseModel):
    id: str  # Synthetic ID for API compatibility
    run_id: str  # ID of the run
    node_id: str  # ID of the node where the pause occurred
    pause_message: Optional[str]  # Message explaining the pause reason
    pause_time: datetime  # When the workflow was paused
    resume_time: Optional[datetime]  # When the workflow was resumed (if applicable)
    resume_user_id: Optional[str]  # ID of the user who resumed the workflow
    resume_action: Optional[PauseAction]  # Action taken (APPROVE/DECLINE/OVERRIDE)
    input_data: Optional[Dict[str, Any]]  # Input data at the time of pause
    comments: Optional[str]  # Additional comments about the pause/resume

Process Pause Action

Description: Processes an action on a paused workflow, allowing for approval, decline, or override of a workflow that has been paused for human intervention. The workflow will resume execution based on the action taken.

URL: /wf/process_pause_action/{run_id}/

Method: POST

Parameters:

run_id: str  # ID of the workflow run

Request Payload:

class ResumeRunRequestSchema(BaseModel):
    inputs: Dict[str, Any]  # Human-provided inputs for the paused node
    user_id: str  # ID of the user resuming the workflow
    action: PauseAction  # Action taken (APPROVE/DECLINE/OVERRIDE)
    comments: Optional[str] = None  # Optional comments about the decision

Response Schema:

class RunResponseSchema(BaseModel):
    id: str  # Run ID
    workflow_id: str  # ID of the workflow
    workflow_version_id: Optional[str]  # ID of the workflow version
    workflow_version: Optional[WorkflowVersionResponseSchema]  # Details of the workflow version
    status: RunStatus  # Current status of the run
    start_time: datetime  # When the run started
    end_time: Optional[datetime]  # When the run ended (if completed)
    initial_inputs: Optional[Dict[str, Dict[str, Any]]]  # Initial inputs to the workflow
    outputs: Optional[Dict[str, Dict[str, Any]]]  # Outputs from the workflow
    tasks: List[TaskResponseSchema]  # List of tasks in the run
    parent_run_id: Optional[str]  # ID of the parent run (if applicable)
    run_type: str  # Type of run (e.g., "interactive")
    output_file_id: Optional[str]  # ID of the output file
    input_dataset_id: Optional[str]  # ID of the input dataset
    message: Optional[str]  # Additional information about the run
    duration: Optional[float]  # Duration of the run in seconds
    percentage_complete: float  # Percentage of tasks completed