Workflow Runs API

This document outlines the API endpoints for running and managing workflow executions in PySpur.

Run Workflow (Blocking)

Description: Executes a workflow synchronously and returns the outputs. This is a blocking call that waits for the workflow to complete before returning a response. If the workflow contains a human intervention node, it may pause execution and return a pause exception.

URL: /wf/{workflow_id}/run/

Method: POST

Parameters:

workflow_id: str  # ID of the workflow to run

Request Payload:

class StartRunRequestSchema(BaseModel):
    initial_inputs: Optional[Dict[str, Dict[str, Any]]] = None  # Initial inputs for the workflow
    parent_run_id: Optional[str] = None  # ID of the parent run (for nested workflows)
    files: Optional[Dict[str, List[str]]] = None  # Files to use in the workflow

Response Schema:

Dict[str, Any]  # Dictionary of node outputs

Start Run (Non-Blocking)

Description: Starts a workflow execution asynchronously and returns immediately with the run details. The workflow continues execution in the background. This is useful for long-running workflows where you don’t want to wait for completion.

URL: /wf/{workflow_id}/start_run/

Method: POST

Parameters:

workflow_id: str  # ID of the workflow to run

Request Payload: Same as Run Workflow (Blocking)

Response Schema:

class RunResponseSchema(BaseModel):
    id: str  # Run ID
    workflow_id: str  # ID of the workflow
    workflow_version_id: Optional[str]  # ID of the workflow version
    workflow_version: Optional[WorkflowVersionResponseSchema]  # Details of the workflow version
    status: RunStatus  # Current status of the run
    start_time: datetime  # When the run started
    end_time: Optional[datetime]  # When the run ended (if completed)
    initial_inputs: Optional[Dict[str, Dict[str, Any]]]  # Initial inputs to the workflow
    outputs: Optional[Dict[str, Dict[str, Any]]]  # Outputs from the workflow
    tasks: List[TaskResponseSchema]  # List of tasks in the run
    parent_run_id: Optional[str]  # ID of the parent run (if applicable)
    run_type: str  # Type of run (e.g., "interactive")
    output_file_id: Optional[str]  # ID of the output file
    input_dataset_id: Optional[str]  # ID of the input dataset
    message: Optional[str]  # Additional information about the run
    duration: Optional[float]  # Duration of the run in seconds
    percentage_complete: float  # Percentage of tasks completed

Run Partial Workflow

Description: Executes a partial workflow starting from a specific node, using precomputed outputs for upstream nodes. This is useful for testing specific parts of a workflow without running the entire workflow.

URL: /wf/{workflow_id}/run_partial/

Method: POST

Parameters:

workflow_id: str  # ID of the workflow to run

Request Payload:

class PartialRunRequestSchema(BaseModel):
    node_id: str  # ID of the node to start execution from
    initial_inputs: Optional[Dict[str, Dict[str, Any]]] = None  # Initial inputs for the workflow
    partial_outputs: Optional[Dict[str, Dict[str, Any]]] = None  # Precomputed outputs for upstream nodes

Response Schema:

Dict[str, Any]  # Dictionary of node outputs

Start Batch Run

Description: Starts a batch execution of a workflow over a dataset. The workflow is run once for each row in the dataset, with dataset columns mapped to workflow inputs. Results are written to an output file.

URL: /wf/{workflow_id}/start_batch_run/

Method: POST

Parameters:

workflow_id: str  # ID of the workflow to run

Request Payload:

class BatchRunRequestSchema(BaseModel):
    dataset_id: str  # ID of the dataset to use
    mini_batch_size: int = 10  # Number of rows to process in each mini-batch

Response Schema: Same as Start Run (Non-Blocking)

List Runs

Description: Lists all runs for a specific workflow with pagination support, ordered by start time descending. This endpoint also updates run status based on task status.

URL: /wf/{workflow_id}/runs/

Method: GET

Parameters:

workflow_id: str  # ID of the workflow
page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)

Response Schema:

List[RunResponseSchema]  # List of run details

List Paused Workflows

Description: Lists all workflows that are currently in a paused state, with pagination support. This endpoint is useful for monitoring workflows that require human intervention.

URL: /wf/paused_workflows/

Method: GET

Query Parameters:

page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)

Response Schema:

List[PausedWorkflowResponseSchema]

Where PausedWorkflowResponseSchema contains:

class PausedWorkflowResponseSchema(BaseModel):
    run: RunResponseSchema  # Information about the workflow run
    current_pause: PauseHistoryResponseSchema  # Details about the current pause state
    workflow: WorkflowDefinitionSchema  # The workflow definition

Get Pause History

Description: Retrieves the pause history for a specific workflow run, showing when and why the workflow was paused, and any actions taken to resume it.

URL: /wf/pause_history/{run_id}/

Method: GET

Parameters:

run_id: str  # ID of the workflow run

Response Schema:

List[PauseHistoryResponseSchema]

Where PauseHistoryResponseSchema contains:

class PauseHistoryResponseSchema(BaseModel):
    id: str  # Synthetic ID for API compatibility
    run_id: str  # ID of the run
    node_id: str  # ID of the node where the pause occurred
    pause_message: Optional[str]  # Message explaining the pause reason
    pause_time: datetime  # When the workflow was paused
    resume_time: Optional[datetime]  # When the workflow was resumed (if applicable)
    resume_user_id: Optional[str]  # ID of the user who resumed the workflow
    resume_action: Optional[PauseAction]  # Action taken (APPROVE/DECLINE/OVERRIDE)
    input_data: Optional[Dict[str, Any]]  # Input data at the time of pause
    comments: Optional[str]  # Additional comments about the pause/resume

Process Pause Action

Description: Processes an action on a paused workflow, allowing for approval, decline, or override of a workflow that has been paused for human intervention. The workflow will resume execution based on the action taken.

URL: /wf/process_pause_action/{run_id}/

Method: POST

Parameters:

run_id: str  # ID of the workflow run

Request Payload:

class ResumeRunRequestSchema(BaseModel):
    inputs: Dict[str, Any]  # Human-provided inputs for the paused node
    user_id: str  # ID of the user resuming the workflow
    action: PauseAction  # Action taken (APPROVE/DECLINE/OVERRIDE)
    comments: Optional[str] = None  # Optional comments about the decision

Response Schema: Same as Start Run (Non-Blocking)

Cancel Workflow

Description: Cancels a workflow that is currently paused or running. This will mark the run as CANCELED in the database and update all pending, running, and paused tasks to CANCELED as well.

URL: /wf/cancel_workflow/{run_id}/

Method: POST

Parameters:

run_id: str  # ID of the run to cancel

Response Schema: Same as Start Run (Non-Blocking) with a message indicating the workflow has been canceled successfully.