# slack_integration.py
import logging
import os
import sys
from logging import getLogger
import requests
from dotenv import load_dotenv
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
# Load environment variables from .env file
load_dotenv()
# Replace with your bot's user ID and workflow ID
BOT_USER_ID = "U08HMJ15AHF" # Your bot's user ID in Slack
WORKFLOW_ID = "S58" # Your chatbot workflow ID in PySpur
PYSPUR_API_URL = "http://localhost:6080/api" # Change to your PySpur API URL
# Configure logger
logger = getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(handler)
# Initialize Slack app
app = App(
token=os.environ.get("SLACK_BOT_TOKEN"),
signing_secret=os.environ.get("SLACK_SIGNING_SECRET"),
logger=logger,
)
# Handler for @mentions
@app.event("app_mention")
def handle_app_mention(event, say, logger):
logger.info(f"Received mention: {event}")
thread_ts = event.get("thread_ts") or event.get("ts")
user_external_id = event.get("user")
try:
# Create or get user
user_data = {
"external_id": user_external_id,
"user_metadata": {"platform": "slack"}
}
user_response = requests.post(f"{PYSPUR_API_URL}/user/", json=user_data)
if user_response.status_code not in [200, 409]: # 409 means user already exists
logger.error(f"Failed to create user: {user_response.text}")
say(text="Sorry, I encountered an error processing your request.", thread_ts=thread_ts)
return
user_id = user_response.json().get("id")
# Create a new session or get existing one
session_data = {
"workflow_id": WORKFLOW_ID,
"user_id": user_id,
"external_id": thread_ts
}
session_response = requests.post(f"{PYSPUR_API_URL}/session/", json=session_data)
if session_response.status_code != 200:
logger.error(f"Failed to create session: {session_response.text}")
say(text="Sorry, I encountered an error processing your request.", thread_ts=thread_ts)
return
# Get the message text
message = app.client.conversations_history(
channel=event["channel"], ts=thread_ts
)
# Call the workflow API
url = f"{PYSPUR_API_URL}/wf/{WORKFLOW_ID}/run/?run_type=blocking"
data = {
"initial_inputs": {
"input_node": {
"user_message": message["messages"][0]["text"],
"session_id": session_response.json()["id"],
"message_history": []
}
}
}
response = requests.post(url, json=data)
response_data = response.json()
# Get the assistant's message from the output node
assistant_message = response_data.get("output_node", {}).get("assistant_message", "")
# Send the response back to Slack
if assistant_message:
say(text=assistant_message, thread_ts=thread_ts)
else:
say(text="I encountered an issue processing your request.", thread_ts=thread_ts)
except Exception as e:
logger.error(f"Error processing request: {e}")
say(text="Sorry, I encountered an error processing your request.", thread_ts=thread_ts)
# Handler for thread replies
@app.event("message")
def handle_thread_replies(event, say, logger):
# Only process thread replies (not the first message) and ignore bot messages
if (
"thread_ts" in event
and event.get("ts") != event.get("thread_ts")
and not event.get("bot_id")
):
thread_ts = event["thread_ts"]
channel_id = event["channel"]
user_external_id = event.get("user")
try:
# Create or get user
user_data = {
"external_id": user_external_id,
"user_metadata": {"platform": "slack"}
}
user_response = requests.post(f"{PYSPUR_API_URL}/user/", json=user_data)
if user_response.status_code not in [200, 409]:
logger.error(f"Failed to create user: {user_response.text}")
say(text="Sorry, I encountered an error processing your request.", thread_ts=thread_ts)
return
user_id = user_response.json().get("id")
# Create a new session or get existing one
session_data = {
"workflow_id": WORKFLOW_ID,
"user_id": user_id,
"external_id": thread_ts
}
session_response = requests.post(f"{PYSPUR_API_URL}/session/", json=session_data)
if session_response.status_code != 200:
logger.error(f"Failed to create session: {session_response.text}")
say(text="Sorry, I encountered an error processing your request.", thread_ts=thread_ts)
return
# Get all replies in the thread
result = app.client.conversations_replies(channel=channel_id, ts=thread_ts)
# Format messages as a conversation history
chat_messages = []
for message in result["messages"]:
role = "assistant" if message.get("user") == BOT_USER_ID else "user"
chat_messages.append({"role": role, "content": message.get("text", "")})
# Get message history and current message
message_history = chat_messages[:-1] if len(chat_messages) > 1 else []
user_message = chat_messages[-1]["content"] if chat_messages else ""
# Call the workflow API
url = f"{PYSPUR_API_URL}/wf/{WORKFLOW_ID}/run/?run_type=blocking"
data = {
"initial_inputs": {
"input_node": {
"user_message": user_message,
"session_id": session_response.json()["id"],
"message_history": message_history,
}
}
}
response = requests.post(url, json=data)
response_data = response.json()
# Get the assistant's message
assistant_message = response_data.get("output_node", {}).get("assistant_message", "")
# Send the response back to Slack
if assistant_message:
say(text=assistant_message, thread_ts=thread_ts)
else:
say(text="I encountered an issue processing your request.", thread_ts=thread_ts)
except Exception as e:
logger.error(f"Error processing thread reply: {e}")
say(text="Sorry, I had trouble processing your message.", thread_ts=thread_ts)
if __name__ == "__main__":
# Start the app using Socket Mode
handler = SocketModeHandler(app, os.environ.get("SLACK_APP_TOKEN"))
handler.start()