메인 콘텐츠로 건너뛰기
이것은 인터랙티브 노트북입니다. 로컬에서 실행하거나 아래 링크를 사용할 수 있습니다:

멀티 에이전트 시스템을 위한 Structured Outputs

OpenAI는 사용자가 강한 어조의 프롬프트를 사용하지 않고도 모델이 항상 제공된 JSON 스키마를 준수하는 응답을 생성할 수 있도록 Structured Outputs를 출시했습니다. Structured Outputs를 사용하면 형식이 잘못된 응답을 검증하거나 재시도할 필요가 없습니다. 새로운 파라미터인 strict: true를 사용하여 응답이 제공된 스키마를 준수하도록 보장할 수 있습니다. 멀티 에이전트 시스템에서 Structured Outputs를 사용하면 에이전트 간에 일관되고 처리하기 쉬운 데이터를 보장하여 커뮤니케이션을 향상시킵니다. 또한 명시적인 거부를 허용하여 안전성을 개선하고, 재시도나 검증의 필요성을 제거하여 성능을 높입니다. 이는 상호작용을 단순화하고 전체 시스템 효율성을 높여줍니다. 이 튜토리얼에서는 멀티 에이전트 시스템에서 Structured Outputs를 활용하고 이를 Weave로 추적하는 방법을 보여줍니다.
출처: 이 쿡북은 OpenAI의 Structured Outputs 샘플 코드를 바탕으로 하며, Weave를 사용한 시각화 개선을 위해 일부 수정되었습니다.

종속성 설치

이 튜토리얼에는 다음 라이브러리가 필요합니다:
  • 멀티 에이전트 시스템을 만들기 위한 OpenAI.
  • LLM 워크플로우를 추적하고 프롬프트 전략을 평가하기 위한 Weave.
!pip install -qU openai weave wandb
# OpenAI의 버그 수정을 위한 임시 해결책:
# TypeError: Client.__init__() got an unexpected keyword argument 'proxies'
# https://community.openai.com/t/error-with-openai-1-56-0-client-init-got-an-unexpected-keyword-argument-proxies/1040332/15 참조
!pip install "httpx<0.28"
wandb.login()으로 쉽게 로그인할 수 있도록 환경 변수에 WANDB_API_KEY를 설정합니다 (이 값은 Colab의 Secret으로 제공되어야 합니다). 로그를 기록할 W&B 프로젝트의 이름을 name_of_wandb_project에 설정합니다. 참고: name_of_wandb_project는 트레이스를 기록할 팀을 지정하기 위해 {team_name}/{project_name} 형식이 될 수도 있습니다. 그 다음 weave.init()을 호출하여 Weave 클라이언트를 가져옵니다. OpenAI API를 사용할 것이므로 OpenAI API 키도 필요합니다. OpenAI 플랫폼에서 가입하여 고유한 API 키를 받을 수 있습니다 (이 역시 Colab의 Secret으로 제공되어야 합니다).
import base64
import json
import os
from io import BytesIO, StringIO

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import wandb
from google.colab import userdata
from openai import OpenAI

import weave

os.environ["WANDB_API_KEY"] = userdata.get("WANDB_API_KEY")
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")

wandb.login()
name_of_wandb_project = "multi-agent-structured-output"
weave.init(name_of_wandb_project)

client = OpenAI()
MODEL = "gpt-4o-2024-08-06"

에이전트 설정

우리가 다룰 유스 케이스는 데이터 분석 작업입니다. 먼저 4개의 에이전트 시스템을 설정해 보겠습니다:
  • Triaging 에이전트: 호출할 에이전트를 결정
  • Data pre-processing 에이전트: 데이터를 정제하는 등 분석을 위해 데이터를 준비
  • Data Analysis 에이전트: 데이터 분석 수행
  • Data Visualization 에이전트: 분석 결과를 시각화하여 인사이트 추출 각 에이전트에 대한 시스템 프롬프트를 정의하는 것부터 시작하겠습니다.
triaging_system_prompt = """당신은 Triaging Agent입니다. 당신의 역할은 사용자의 쿼리를 평가하고 관련 에이전트에게 라우팅하는 것입니다. 사용 가능한 에이전트는 다음과 같습니다:
- Data Processing Agent: 데이터를 정제, 변환 및 집계합니다.
- Analysis Agent: 통계, 상관관계 및 회귀 분석을 수행합니다.
- Visualization Agent: 막대 그래프, 선 그래프 및 파이 차트를 생성합니다.

send_query_to_agents 툴을 사용하여 사용자의 쿼리를 관련 에이전트에게 전달하세요. 또한 필요한 경우 speak_to_user 툴을 사용하여 사용자로부터 더 많은 정보를 얻으세요."""

processing_system_prompt = """당신은 Data Processing Agent입니다. 당신의 역할은 다음 툴을 사용하여 데이터를 정제, 변환 및 집계하는 것입니다:
- clean_data
- transform_data
- aggregate_data"""

analysis_system_prompt = """당신은 Analysis Agent입니다. 당신의 역할은 다음 툴을 사용하여 통계, 상관관계 및 회귀 분석을 수행하는 것입니다:
- stat_analysis
- correlation_analysis
- regression_analysis"""

visualization_system_prompt = """당신은 Visualization Agent입니다. 당신의 역할은 다음 툴을 사용하여 막대 그래프, 선 그래프 및 파이 차트를 생성하는 것입니다:
- create_bar_chart
- create_line_chart
- create_pie_chart"""
그런 다음 각 에이전트를 위한 툴을 정의합니다. Triaging 에이전트를 제외한 각 에이전트에게는 해당 역할에 특화된 툴이 제공됩니다: Data pre-processing 에이전트 : 1. 데이터 정제, 2. 데이터 변환, 3. 데이터 집계 Data analysis 에이전트 : 1. 통계 분석, 2. 상관관계 분석, 3. 회귀 분석 Data visualization 에이전트 : 1. 막대 그래프 생성, 2. 선 그래프 생성, 3. 파이 차트 생성
triage_tools = [
    {
        "type": "function",
        "function": {
            "name": "send_query_to_agents",
            "description": "에이전트의 역량에 따라 관련 에이전트에게 사용자 쿼리를 전송합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "agents": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "쿼리를 보낼 에이전트 이름의 배열입니다.",
                    },
                    "query": {
                        "type": "string",
                        "description": "보낼 사용자 쿼리입니다.",
                    },
                },
                "required": ["agents", "query"],
            },
        },
        "strict": True,
    }
]

preprocess_tools = [
    {
        "type": "function",
        "function": {
            "name": "clean_data",
            "description": "중복을 제거하고 누락된 값을 처리하여 제공된 데이터를 정제합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "정제할 데이터셋입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    }
                },
                "required": ["data"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "transform_data",
            "description": "지정된 규칙에 따라 데이터를 변환합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "변환할 데이터입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "rules": {
                        "type": "string",
                        "description": "구조화된 형식으로 지정된 적용할 변환 규칙입니다.",
                    },
                },
                "required": ["data", "rules"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "aggregate_data",
            "description": "지정된 컬럼 및 연산에 따라 데이터를 집계합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "집계할 데이터입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "group_by": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "그룹화할 컬럼들입니다.",
                    },
                    "operations": {
                        "type": "string",
                        "description": "구조화된 형식으로 지정된 수행할 집계 연산입니다.",
                    },
                },
                "required": ["data", "group_by", "operations"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
]

analysis_tools = [
    {
        "type": "function",
        "function": {
            "name": "stat_analysis",
            "description": "주어진 데이터셋에 대해 통계 분석을 수행합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "분석할 데이터셋입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    }
                },
                "required": ["data"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "correlation_analysis",
            "description": "데이터셋의 변수 간 상관 계수를 계산합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "분석할 데이터셋입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "variables": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "상관관계를 계산할 변수 목록입니다.",
                    },
                },
                "required": ["data", "variables"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "regression_analysis",
            "description": "데이터셋에 대해 회귀 분석을 수행합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "분석할 데이터셋입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "dependent_var": {
                        "type": "string",
                        "description": "회귀 분석의 종속 변수입니다.",
                    },
                    "independent_vars": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "독립 변수 목록입니다.",
                    },
                },
                "required": ["data", "dependent_var", "independent_vars"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
]

visualization_tools = [
    {
        "type": "function",
        "function": {
            "name": "create_bar_chart",
            "description": "제공된 데이터로 막대 그래프를 생성합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "막대 그래프용 데이터입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "x": {"type": "string", "description": "x축 컬럼입니다."},
                    "y": {"type": "string", "description": "y축 컬럼입니다."},
                },
                "required": ["data", "x", "y"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "create_line_chart",
            "description": "제공된 데이터로 선 그래프를 생성합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "선 그래프용 데이터입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "x": {"type": "string", "description": "x축 컬럼입니다."},
                    "y": {"type": "string", "description": "y축 컬럼입니다."},
                },
                "required": ["data", "x", "y"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "create_pie_chart",
            "description": "제공된 데이터로 파이 차트를 생성합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "파이 차트용 데이터입니다. JSON 또는 CSV와 같은 적절한 형식이어야 합니다.",
                    },
                    "labels": {
                        "type": "string",
                        "description": "레이블용 컬럼입니다.",
                    },
                    "values": {
                        "type": "string",
                        "description": "값용 컬럼입니다.",
                    },
                },
                "required": ["data", "labels", "values"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
]

Weave를 사용하여 멀티 에이전트 추적 활성화

다음 로직을 처리하는 코드를 작성해야 합니다:
  • 멀티 에이전트 시스템에 사용자 쿼리 전달
  • 멀티 에이전트 시스템의 내부 작동 처리
  • 툴 호출 실행
# 쿼리 예시

user_query = """
아래에 데이터가 있습니다. 먼저 중복을 제거한 다음 데이터의 통계를 분석하고 선 그래프를 그려주세요.

house_size (m3), house_price ($)
90, 100
80, 90
100, 120
90, 100
"""
사용자 쿼리를 통해 clean_data, stat_analysis, create_line_chart 툴을 호출해야 함을 유추할 수 있습니다. 먼저 툴 호출 실행을 담당하는 실행 함수를 정의하는 것부터 시작하겠습니다. Python 함수에 @weave.op() 데코레이터를 추가하면 언어 모델의 입력, 출력 및 트레이스를 로그로 기록하고 디버깅할 수 있습니다. 멀티 에이전트 시스템을 만들 때 많은 함수가 나타나지만, 함수 위에 단순히 @weave.op()를 추가하는 것만으로 충분합니다.
@weave.op()
def clean_data(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")
    df_deduplicated = df.drop_duplicates()
    return df_deduplicated

@weave.op()
def stat_analysis(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")
    return df.describe()

@weave.op()
def plot_line_chart(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")

    x = df.iloc[:, 0]
    y = df.iloc[:, 1]

    coefficients = np.polyfit(x, y, 1)
    polynomial = np.poly1d(coefficients)
    y_fit = polynomial(x)

    plt.figure(figsize=(10, 6))
    plt.plot(x, y, "o", label="Data Points")
    plt.plot(x, y_fit, "-", label="Best Fit Line")
    plt.title("Line Chart with Best Fit Line")
    plt.xlabel(df.columns[0])
    plt.ylabel(df.columns[1])
    plt.legend()
    plt.grid(True)

    # 표시하기 전에 플롯을 BytesIO 버퍼에 저장
    buf = BytesIO()
    plt.savefig(buf, format="png")
    buf.seek(0)

    # 플롯 표시
    plt.show()

    # 데이터 URL을 위해 이미지를 base64로 인코딩
    image_data = buf.getvalue()
    base64_encoded_data = base64.b64encode(image_data)
    base64_string = base64_encoded_data.decode("utf-8")
    data_url = f"data:image/png;base64,{base64_string}"

    return data_url

# 툴을 실행하는 함수 정의
@weave.op()
def execute_tool(tool_calls, messages):
    for tool_call in tool_calls:
        tool_name = tool_call.function.name
        tool_arguments = json.loads(tool_call.function.arguments)

        if tool_name == "clean_data":
            # 데이터 정제 시뮬레이션
            cleaned_df = clean_data(tool_arguments["data"])
            cleaned_data = {"cleaned_data": cleaned_df.to_dict()}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(cleaned_data)}
            )
            print("Cleaned data: ", cleaned_df)
        elif tool_name == "transform_data":
            # 데이터 변환 시뮬레이션
            transformed_data = {"transformed_data": "sample_transformed_data"}
            messages.append(
                {
                    "role": "tool",
                    "name": tool_name,
                    "content": json.dumps(transformed_data),
                }
            )
        elif tool_name == "aggregate_data":
            # 데이터 집계 시뮬레이션
            aggregated_data = {"aggregated_data": "sample_aggregated_data"}
            messages.append(
                {
                    "role": "tool",
                    "name": tool_name,
                    "content": json.dumps(aggregated_data),
                }
            )
        elif tool_name == "stat_analysis":
            # 통계 분석 시뮬레이션
            stats_df = stat_analysis(tool_arguments["data"])
            stats = {"stats": stats_df.to_dict()}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(stats)}
            )
            print("Statistical Analysis: ", stats_df)
        elif tool_name == "correlation_analysis":
            # 상관관계 분석 시뮬레이션
            correlations = {"correlations": "sample_correlations"}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(correlations)}
            )
        elif tool_name == "regression_analysis":
            # 회귀 분석 시뮬레이션
            regression_results = {"regression_results": "sample_regression_results"}
            messages.append(
                {
                    "role": "tool",
                    "name": tool_name,
                    "content": json.dumps(regression_results),
                }
            )
        elif tool_name == "create_bar_chart":
            # 막대 그래프 생성 시뮬레이션
            bar_chart = {"bar_chart": "sample_bar_chart"}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(bar_chart)}
            )
        elif tool_name == "create_line_chart":
            # 선 그래프 생성 시뮬레이션
            line_chart = {"line_chart": plot_line_chart(tool_arguments["data"])}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(line_chart)}
            )
        elif tool_name == "create_pie_chart":
            # 파이 차트 생성 시뮬레이션
            pie_chart = {"pie_chart": "sample_pie_chart"}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(pie_chart)}
            )
    return messages
다음으로, 각 서브 에이전트를 위한 툴 핸들러를 생성합니다. 이들은 모델에 전달되는 고유한 프롬프트와 툴 세트를 가집니다. 출력은 툴 호출을 수행하는 실행 함수로 전달됩니다.
# 각 에이전트의 처리를 담당하는 함수 정의
@weave.op()
def handle_data_processing_agent(query, conversation_messages):
    messages = [{"role": "system", "content": processing_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=preprocess_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

@weave.op()
def handle_analysis_agent(query, conversation_messages):
    messages = [{"role": "system", "content": analysis_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=analysis_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

@weave.op()
def handle_visualization_agent(query, conversation_messages):
    messages = [{"role": "system", "content": visualization_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=visualization_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)
마지막으로, 사용자 쿼리 처리를 관리하는 총괄 툴을 생성합니다. 이 함수는 사용자 쿼리를 받아 모델로부터 응답을 얻고, 이를 다른 에이전트에게 전달하여 실행하도록 관리합니다.
# 사용자 입력 및 triaging을 처리하는 함수
@weave.op()
def handle_user_message(user_query, conversation_messages=None):
    if conversation_messages is None:
        conversation_messages = []
    user_message = {"role": "user", "content": user_query}
    conversation_messages.append(user_message)

    messages = [{"role": "system", "content": triaging_system_prompt}]
    messages.extend(conversation_messages)

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=triage_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )

    for tool_call in response.choices[0].message.tool_calls:
        if tool_call.function.name == "send_query_to_agents":
            agents = json.loads(tool_call.function.arguments)["agents"]
            query = json.loads(tool_call.function.arguments)["query"]
            for agent in agents:
                if agent == "Data Processing Agent":
                    handle_data_processing_agent(query, conversation_messages)
                elif agent == "Analysis Agent":
                    handle_analysis_agent(query, conversation_messages)
                elif agent == "Visualization Agent":
                    handle_visualization_agent(query, conversation_messages)

    outputs = extract_tool_contents(conversation_messages)

    return outputs

functions = [
    "clean_data",
    "transform_data",
    "stat_analysis",
    "aggregate_data",
    "correlation_analysis",
    "regression_analysis",
    "create_bar_chart",
    "create_line_chart",
    "create_pie_chart",
]

@weave.op()
def extract_tool_contents(data):
    contents = {}
    contents["all"] = data
    for element in data:
        if (
            isinstance(element, dict)
            and element.get("role") == "tool"
            and element.get("name") in functions
        ):
            name = element["name"]
            content_str = element["content"]
            try:
                content_json = json.loads(content_str)
                if "chart" not in element.get("name"):
                    contents[name] = [content_json]
                else:
                    first_key = next(iter(content_json))
                    second_level = content_json[first_key]
                    if isinstance(second_level, dict):
                        second_key = next(iter(second_level))
                        contents[name] = second_level[second_key]
                    else:
                        contents[name] = second_level
            except json.JSONDecodeError:
                print(f"Error decoding JSON for {name}")
                contents[name] = None

    return contents

Weave에서 멀티 에이전트 시스템 실행 및 시각화

마지막으로, 사용자의 입력을 사용하여 기본 handle_user_message 함수를 실행하고 결과를 관찰합니다.
handle_user_message(user_query)
Weave의 URL을 클릭하면 다음과 같이 실행 과정이 추적되는 것을 볼 수 있습니다. Traces 페이지에서 입력과 출력을 확인할 수 있습니다. 명확성을 위해 각 출력을 클릭했을 때 표시되는 결과의 스크린샷이 다이어그램에 추가되었습니다. Weave는 OpenAI API와의 인테그레이션을 제공하여 비용이 자동으로 계산됩니다. 따라서 맨 오른쪽에서 비용과 지연 시간도 확인할 수 있습니다. 1-1.png 라인을 클릭하면 멀티 에이전트 시스템 내에서 실행된 중간 프로세스를 볼 수 있습니다. 예를 들어, analysis_agent의 입력과 출력을 보면 Structured Output 형식임을 알 수 있습니다. OpenAI의 Structured Output은 에이전트 간의 협업을 용이하게 하지만, 시스템이 복잡해질수록 이러한 상호작용이 일어나는 형식을 파악하기 어려워집니다. Weave를 사용하면 이러한 중간 프로세스와 그 입력 및 출력을 손에 잡힐 듯이 이해할 수 있습니다.
3.png
Weave에서 트레이싱이 어떻게 처리되는지 자세히 살펴보세요!

결론

이 튜토리얼에서는 Structured Output을 사용하여 멀티 에이전트 시스템을 편리하게 개발하는 방법과, 입력, 최종 출력 및 중간 출력 형식을 추적하기 위해 OpenAI에서 제공하는 Weave를 사용하는 방법을 배웠습니다.