77 lines
2.9 KiB
Python
77 lines
2.9 KiB
Python
import logging
|
|
from .client import OpenAIClient
|
|
from .models import OpenAIAssistant as OpenAIAssistantModel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class OpenAIAssistant:
|
|
"""
|
|
OpenAI Assistant for handling AI interactions.
|
|
"""
|
|
def __init__(self, name):
|
|
"""
|
|
Initialize the assistant by loading its configuration from the database.
|
|
"""
|
|
try:
|
|
self.config = OpenAIAssistantModel.objects.get(name=name)
|
|
self.client = OpenAIClient(self.config.api_key).get_client()
|
|
except OpenAIAssistantModel.DoesNotExist:
|
|
raise ValueError(f"Assistant '{name}' not found in the database.")
|
|
|
|
def chat_completion(self, user_message):
|
|
"""
|
|
Call OpenAI's chat completion API.
|
|
"""
|
|
try:
|
|
response = self.client.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": self.config.description, # Use description as the system prompt
|
|
},
|
|
{"role": "user", "content": user_message},
|
|
],
|
|
)
|
|
return response.choices[0].message.content
|
|
except Exception as e:
|
|
logger.error(f"Error in chat completion: {e}")
|
|
return f"Error in chat completion: {e}"
|
|
|
|
def agent_workflow(self, user_message):
|
|
"""
|
|
Call OpenAI's advanced agent workflow API.
|
|
"""
|
|
try:
|
|
if not self.config.assistant_id:
|
|
raise ValueError(f"Assistant '{self.config.name}' does not have an associated assistant ID.")
|
|
|
|
assistant = self.client.beta.assistants.retrieve(self.config.assistant_id)
|
|
thread = self.client.beta.threads.create()
|
|
|
|
# Create a message in the thread
|
|
self.client.beta.threads.messages.create(thread_id=thread.id, role="user", content=user_message)
|
|
|
|
# Run the assistant workflow
|
|
run = self.client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)
|
|
|
|
# Poll for the result
|
|
while run.status in ["queued", "in_progress"]:
|
|
run = self.client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
|
|
if run.status == "completed":
|
|
messages = self.client.beta.threads.messages.list(thread_id=thread.id)
|
|
return messages.data[0].content[0].text.value
|
|
|
|
return "Unexpected error: Workflow did not complete."
|
|
except Exception as e:
|
|
logger.error(f"Error in agent workflow: {e}")
|
|
return f"Error in agent workflow: {e}"
|
|
|
|
def handle_message(self, user_message):
|
|
"""
|
|
Automatically select the correct method based on assistant type.
|
|
"""
|
|
if self.config.is_special_assistant():
|
|
return self.agent_workflow(user_message)
|
|
return self.chat_completion(user_message)
|