AgentChat), 핵심 멀티 에이전트 기능 (Core), 외부 서비스와의 인테그레이션 (Extensions)을 위한 컴포넌트를 제공합니다. 또한 AutoGen은 코드 없이 에이전트 프로토타이핑을 할 수 있는 Studio를 제공합니다. 자세한 내용은 공식 AutoGen 문서를 방문하세요.
이 가이드는 AutoGen에 대한 기본적인 이해가 있다고 가정합니다.
autogen_agentchat, autogen_core, autogen_ext 내의 상호작용을 자동으로 추적할 수 있습니다. 이 가이드에서는 AutoGen과 함께 Weave를 사용하는 다양한 예시를 살펴보겠습니다.
필수 조건
시작하기 전에 AutoGen과 Weave가 설치되어 있는지 확인하세요. 또한 사용하려는 LLM 제공업체(예: OpenAI, Anthropic)의 SDK가 필요합니다.잘못된 코드 신고
복사
AI에게 묻기
pip install autogen_agentchat "autogen_ext[openai,anthropic]" weave
잘못된 코드 신고
복사
AI에게 묻기
import os
os.environ["OPENAI_API_KEY"] = "<your-openai-api-key>"
os.environ["ANTHROPIC_API_KEY"] = "<your-anthropic-api-key>"
기본 설정
스크립트 시작 부분에서 Weave를 초기화하여 추적을 시작합니다.잘못된 코드 신고
복사
AI에게 묻기
import weave
weave.init("autogen-demo")
단순 모델 클라이언트 추적하기
Weave는 AutoGen 내의 모델 클라이언트에 대한 직접적인 호출을 추적할 수 있습니다.클라이언트 create 호출 추적
이 예제는OpenAIChatCompletionClient에 대한 단일 호출을 추적하는 방법을 보여줍니다.
잘못된 코드 신고
복사
AI에게 묻기
import asyncio
from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
# from autogen_ext.models.anthropic import AnthropicChatCompletionClient
async def simple_client_call(model_name = "gpt-4o"):
model_client = OpenAIChatCompletionClient(
model=model_name,
)
# 또는 Anthropic이나 다른 모델 클라이언트를 사용할 수 있습니다.
# model_client = AnthropicChatCompletionClient(
# model="claude-3-haiku-20240307"
# )
response = await model_client.create(
[UserMessage(content="Hello, how are you?", source="user")]
)
print(response)
asyncio.run(simple_client_call())
스트리밍을 포함한 클라이언트 create 호출 추적
Weave는 스트리밍 응답 추적도 지원합니다.잘못된 코드 신고
복사
AI에게 묻기
async def simple_client_call_stream(model_name = "gpt-4o"):
openai_model_client = OpenAIChatCompletionClient(model=model_name)
async for item in openai_model_client.create_stream(
[UserMessage(content="Hello, how are you?", source="user")]
):
print(item, flush=True, end="")
asyncio.run(simple_client_call_stream())
Weave의 캐시된 호출 기록
AutoGen의ChatCompletionCache를 사용할 수 있으며, Weave는 이러한 상호작용을 추적하여 응답이 캐시에서 왔는지 아니면 새로 호출되었는지 보여줍니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_ext.models.cache import ChatCompletionCache
async def run_cache_client(model_name = "gpt-4o"):
openai_model_client = OpenAIChatCompletionClient(model=model_name)
cache_client = ChatCompletionCache(openai_model_client,)
response = await cache_client.create(
[UserMessage(content="Hello, how are you?", source="user")]
)
print(response) # OpenAI로부터의 응답을 출력해야 합니다
response = await cache_client.create(
[UserMessage(content="Hello, how are you?", source="user")]
)
print(response) # 캐시된 응답을 출력해야 합니다
asyncio.run(run_cache_client())
도구 호출을 포함한 에이전트 추적
Weave는 에이전트와 도구 사용을 추적하여 에이전트가 도구를 선택하고 실행하는 과정에 대한 가시성을 제공합니다.잘못된 코드 신고
복사
AI에게 묻기
from autogen_agentchat.agents import AssistantAgent
async def get_weather(city: str) -> str:
return f"{city}의 날씨는 화창하며 기온은 73도입니다."
async def run_agent_with_tools(model_name = "gpt-4o"):
model_client = OpenAIChatCompletionClient(model=model_name)
agent = AssistantAgent(
name="weather_agent",
model_client=model_client,
tools=[get_weather],
system_message="당신은 도움이 되는 비서입니다.",
reflect_on_tool_use=True,
)
# 콘솔로 출력을 스트리밍하려면:
# await Console(agent.run_stream(task="뉴욕 날씨는 어때?"))
res = await agent.run(task="뉴욕 날씨는 어때?")
print(res)
await model_client.close()
asyncio.run(run_agent_with_tools())
GroupChat 추적 - RoundRobin
RoundRobinGroupChat과 같은 그룹 채팅 내의 상호작용은 Weave에 의해 추적되어 에이전트 간의 대화 흐름을 따라갈 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
# 전체 그룹 채팅을 추적하고 싶으므로 여기에 weave op를 추가합니다.
# 이는 전적으로 선택 사항이지만 사용을 적극 권장합니다.
@weave.op
async def run_round_robin_group_chat(model_name="gpt-4o"):
model_client = OpenAIChatCompletionClient(model=model_name)
primary_agent = AssistantAgent(
"primary",
model_client=model_client,
system_message="당신은 도움이 되는 AI 비서입니다.",
)
critic_agent = AssistantAgent(
"critic",
model_client=model_client,
system_message="건설적인 피드백을 제공하세요. 피드백이 반영되면 'APPROVE'라고 응답하세요.",
)
text_termination = TextMentionTermination("APPROVE")
team = RoundRobinGroupChat(
[primary_agent, critic_agent], termination_condition=text_termination
)
await team.reset()
# 콘솔로 출력을 스트리밍하려면:
# await Console(team.run_stream(task="가을에 관한 짧은 시를 써주세요."))
result = await team.run(task="가을에 관한 짧은 시를 써주세요.")
print(result)
await model_client.close()
asyncio.run(run_round_robin_group_chat())
메모리 추적
AutoGen의 메모리 컴포넌트는 Weave로 추적할 수 있습니다. 가독성을 높이기 위해@weave.op()를 사용하여 단일 추적 아래에 메모리 작업을 그룹화할 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
# 메모리 추가 호출과 메모리 가져오기 호출을 단일 추적 아래에서
# 추적하기 위해 여기에 weave op를 추가합니다.
# 이는 전적으로 선택 사항이지만 사용을 적극 권장합니다.
@weave.op
async def run_memory_agent(model_name="gpt-4o"):
user_memory = ListMemory()
await user_memory.add(
MemoryContent(
content="날씨 정보는 미터법 단위를 사용해야 합니다.",
mime_type=MemoryMimeType.TEXT,
)
)
await user_memory.add(
MemoryContent(
content="식사 레시피는 반드시 비건이어야 합니다.", mime_type=MemoryMimeType.TEXT
)
)
async def get_weather(city: str, units: str = "imperial") -> str:
if units == "imperial":
return f"{city}의 날씨는 73 °F이며 화창합니다."
elif units == "metric":
return f"{city}의 날씨는 23 °C이며 화창합니다."
else:
return f"죄송합니다, {city}의 날씨를 알 수 없습니다."
model_client = OpenAIChatCompletionClient(model=model_name)
assistant_agent = AssistantAgent(
name="assistant_agent",
model_client=model_client,
tools=[get_weather],
memory=[user_memory],
)
# 콘솔로 출력을 스트리밍하려면:
# stream = assistant_agent.run_stream(task="뉴욕 날씨는 어때?")
# await Console(stream)
result = await assistant_agent.run(task="뉴욕 날씨는 어때?")
print(result)
await model_client.close()
asyncio.run(run_memory_agent())
RAG 워크플로우 추적하기
ChromaDBVectorMemory와 같은 메모리 시스템을 사용한 문서 인덱싱 및 검색을 포함한 Retrieval Augmented Generation (RAG) 워크플로우를 추적할 수 있습니다. RAG 프로세스에 @weave.op() 데코레이터를 사용하면 전체 흐름을 시각화하는 데 도움이 됩니다.
RAG 예제에는
chromadb가 필요합니다. pip install chromadb로 설치하세요.잘못된 코드 신고
복사
AI에게 묻기
# !pip install -q chromadb
# 환경에 chromadb가 설치되어 있는지 확인하세요: `pip install chromadb`
import re
from typing import List
import os
from pathlib import Path
import aiofiles
import aiohttp
from autogen_core.memory import Memory, MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import (
ChromaDBVectorMemory,
PersistentChromaDBVectorMemoryConfig,
)
class SimpleDocumentIndexer:
def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
self.memory = memory
self.chunk_size = chunk_size
async def _fetch_content(self, source: str) -> str:
if source.startswith(("http://", "https://")):
async with aiohttp.ClientSession() as session:
async with session.get(source) as response:
return await response.text()
else:
async with aiofiles.open(source, "r", encoding="utf-8") as f:
return await f.read()
def _strip_html(self, text: str) -> str:
text = re.sub(r"<[^>]*>", " ", text)
text = re.sub(r"\\s+", " ", text)
return text.strip()
def _split_text(self, text: str) -> List[str]:
chunks: list[str] = []
for i in range(0, len(text), self.chunk_size):
chunk = text[i : i + self.chunk_size]
chunks.append(chunk.strip())
return chunks
async def index_documents(self, sources: List[str]) -> int:
total_chunks = 0
for source in sources:
try:
content = await self._fetch_content(source)
if "<" in content and ">" in content:
content = self._strip_html(content)
chunks = self._split_text(content)
for i, chunk in enumerate(chunks):
await self.memory.add(
MemoryContent(
content=chunk,
mime_type=MemoryMimeType.TEXT,
metadata={"source": source, "chunk_index": i},
)
)
total_chunks += len(chunks)
except Exception as e:
print(f"인덱싱 중 오류 발생 {source}: {str(e)}")
return total_chunks
@weave.op
async def run_rag_agent(model_name="gpt-4o"):
rag_memory = ChromaDBVectorMemory(
config=PersistentChromaDBVectorMemoryConfig(
collection_name="autogen_docs",
persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen_weave"),
k=3,
score_threshold=0.4,
)
)
# await rag_memory.clear() # 필요한 경우 기존 메모리를 지우려면 주석 해제
async def index_autogen_docs() -> None:
indexer = SimpleDocumentIndexer(memory=rag_memory)
sources = [
"https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
"https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html",
]
chunks: int = await indexer.index_documents(sources)
print(f"{len(sources)}개의 AutoGen 문서에서 {chunks}개의 청크를 인덱싱했습니다.")
# 컬렉션이 비어 있거나 다시 인덱싱하려는 경우에만 인덱싱합니다.
# 데모 목적을 위해 매번 인덱싱하거나 이미 인덱싱되었는지 확인할 수 있습니다.
# 이 예제는 매 실행마다 인덱싱을 시도합니다. 체크 로직 추가를 고려해 보세요.
await index_autogen_docs()
model_client = OpenAIChatCompletionClient(model=model_name)
rag_assistant = AssistantAgent(
name="rag_assistant",
model_client=model_client,
memory=[rag_memory],
)
# 콘솔로 출력을 스트리밍하려면:
# stream = rag_assistant.run_stream(task="AgentChat이 무엇인가요?")
# await Console(stream)
result = await rag_assistant.run(task="AgentChat이 무엇인가요?")
print(result)
await rag_memory.close()
await model_client.close()
asyncio.run(run_rag_agent())
에이전트 런타임 추적
Weave는SingleThreadedAgentRuntime과 같은 AutoGen의 에이전트 런타임 내 작업을 추적할 수 있습니다. 런타임 실행 함수 주변에 @weave.op()를 사용하면 관련 추적을 그룹화할 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from dataclasses import dataclass
from typing import Callable
from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
default_subscription,
message_handler,
AgentId,
SingleThreadedAgentRuntime
)
@dataclass
class Message:
content: int
@default_subscription
class Modifier(RoutedAgent):
def __init__(self, modify_val: Callable[[int], int]) -> None:
super().__init__("A modifier agent.")
self._modify_val = modify_val
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
val = self._modify_val(message.content)
print(f"{'-'*80}\\nModifier:\\nModified {message.content} to {val}")
await self.publish_message(Message(content=val), DefaultTopicId())
@default_subscription
class Checker(RoutedAgent):
def __init__(self, run_until: Callable[[int], bool]) -> None:
super().__init__("A checker agent.")
self._run_until = run_until
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
if not self._run_until(message.content):
print(f"{'-'*80}\\nChecker:\\n{message.content} passed the check, continue.")
await self.publish_message(
Message(content=message.content), DefaultTopicId()
)
else:
print(f"{'-'*80}\\nChecker:\\n{message.content} failed the check, stopping.")
# 전체 에이전트 런타임 호출을 단일 추적 아래에서
# 추적하기 위해 여기에 weave op를 추가합니다.
# 이는 전적으로 선택 사항이지만 사용을 적극 권장합니다.
@weave.op
async def run_agent_runtime() -> None:
runtime = SingleThreadedAgentRuntime()
await Modifier.register(
runtime,
"modifier",
lambda: Modifier(modify_val=lambda x: x - 1),
)
await Checker.register(
runtime,
"checker",
lambda: Checker(run_until=lambda x: x <= 1),
)
runtime.start()
await runtime.send_message(Message(content=3), AgentId("checker", "default"))
await runtime.stop_when_idle()
asyncio.run(run_agent_runtime())
워크플로우 추적 (Sequential)
에이전트 상호작용의 시퀀스를 정의하는 복잡한 에이전트 워크플로우를 추적할 수 있습니다.@weave.op()를 사용하여 전체 워크플로우에 대한 상위 수준의 추적을 제공할 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_core import TopicId, type_subscription
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage
@dataclass
class WorkflowMessage:
content: str
concept_extractor_topic_type = "ConceptExtractorAgent"
writer_topic_type = "WriterAgent"
format_proof_topic_type = "FormatProofAgent"
user_topic_type = "User"
@type_subscription(topic_type=concept_extractor_topic_type)
class ConceptExtractorAgent(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("A concept extractor agent.")
self._system_message = SystemMessage(
content=(
"당신은 마케팅 분석가입니다. 제품 설명이 주어지면 다음을 식별하세요:\n"
"- 주요 기능\n"
"- 타겟 오디언스\n"
"- 고유한 판매 포인트(USP)\n\n"
)
)
self._model_client = model_client
@message_handler
async def handle_user_description(self, message: WorkflowMessage, ctx: MessageContext) -> None:
prompt = f"Product description: {message.content}"
llm_result = await self._model_client.create(
messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
cancellation_token=ctx.cancellation_token,
)
response = llm_result.content
assert isinstance(response, str)
print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
await self.publish_message(
WorkflowMessage(response), topic_id=TopicId(writer_topic_type, source=self.id.key)
)
@type_subscription(topic_type=writer_topic_type)
class WriterAgent(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("A writer agent.")
self._system_message = SystemMessage(
content=(
"당신은 마케팅 카피라이터입니다. 기능, 오디언스, USP를 설명하는 텍스트 블록이 주어지면 "
"이러한 점을 강조하는 매력적인 마케팅 카피(뉴스레터 섹션 등)를 작성하세요. "
"결과물은 짧아야 하며(약 150단어), 단일 텍스트 블록으로 카피만 출력하세요."
)
)
self._model_client = model_client
@message_handler
async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
prompt = f"Below is the info about the product:\\n\\n{message.content}"
llm_result = await self._model_client.create(
messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
cancellation_token=ctx.cancellation_token,
)
response = llm_result.content
assert isinstance(response, str)
print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
await self.publish_message(
WorkflowMessage(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key)
)
@type_subscription(topic_type=format_proof_topic_type)
class FormatProofAgent(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("A format & proof agent.")
self._system_message = SystemMessage(
content=(
"당신은 에디터입니다. 초안 카피가 주어지면 문법을 수정하고, 명확성을 높이며, 일관된 어조를 보장하고, "
"형식을 갖추어 세련되게 만드세요. 최종 개선된 카피를 단일 텍스트 블록으로 출력하세요."
)
)
self._model_client = model_client
@message_handler
async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
prompt = f"Draft copy:\\n{message.content}."
llm_result = await self._model_client.create(
messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
cancellation_token=ctx.cancellation_token,
)
response = llm_result.content
assert isinstance(response, str)
print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
await self.publish_message(
WorkflowMessage(response), topic_id=TopicId(user_topic_type, source=self.id.key)
)
@type_subscription(topic_type=user_topic_type)
class UserAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("A user agent that outputs the final copy to the user.")
@message_handler
async def handle_final_copy(self, message: WorkflowMessage, ctx: MessageContext) -> None:
print(f"\\n{'-'*80}\\n{self.id.type} received final copy:\\n{message.content}")
# 전체 에이전트 워크플로우를 단일 추적 아래에서
# 추적하기 위해 여기에 weave op를 추가합니다.
# 이는 전적으로 선택 사항이지만 사용을 적극 권장합니다.
@weave.op(call_display_name="Sequential Agent Workflow")
async def run_agent_workflow(model_name="gpt-4o"):
model_client = OpenAIChatCompletionClient(model=model_name)
runtime = SingleThreadedAgentRuntime()
await ConceptExtractorAgent.register(runtime, type=concept_extractor_topic_type, factory=lambda: ConceptExtractorAgent(model_client=model_client))
await WriterAgent.register(runtime, type=writer_topic_type, factory=lambda: WriterAgent(model_client=model_client))
await FormatProofAgent.register(runtime, type=format_proof_topic_type, factory=lambda: FormatProofAgent(model_client=model_client))
await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())
runtime.start()
await runtime.publish_message(
WorkflowMessage(
content="24시간 동안 음료를 차갑게 유지해주는 친환경 스테인리스 스틸 물병"
),
topic_id=TopicId(concept_extractor_topic_type, source="default"),
)
await runtime.stop_when_idle()
await model_client.close()
asyncio.run(run_agent_workflow())
코드 실행기(Code Executor) 추적
Docker 필요
이 예제는 Docker를 사용한 코드 실행을 포함하며 모든 환경(예: 직접적인 Colab)에서 작동하지 않을 수 있습니다. 시도하기 전에 로컬에서 Docker가 실행 중인지 확인하세요.
잘못된 코드 신고
복사
AI에게 묻기
import tempfile
from autogen_core import DefaultTopicId
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
@dataclass
class CodeGenMessage:
content: str
@default_subscription
class Assistant(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("An assistant agent.")
self._model_client = model_client
self._chat_history: List[LLMMessage] = [
SystemMessage(
content="""Python 스크립트를 마크다운 블록에 작성하면 실행됩니다.
항상 현재 디렉토리의 파일에 그림을 저장하세요. plt.show()를 사용하지 마세요. 이 작업을 완료하는 데 필요한 모든 코드는 단일 응답 내에 포함되어야 합니다.""",
)
]
@message_handler
async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
self._chat_history.append(UserMessage(content=message.content, source="user"))
result = await self._model_client.create(self._chat_history)
print(f"\\n{'-'*80}\\nAssistant:\\n{result.content}")
self._chat_history.append(AssistantMessage(content=result.content, source="assistant"))
await self.publish_message(CodeGenMessage(content=result.content), DefaultTopicId())
def extract_markdown_code_blocks(markdown_text: str) -> List[CodeBlock]:
pattern = re.compile(r"```(?:\\s*([\\w\\+\\-]+))?\\n([\\s\\S]*?)```")
matches = pattern.findall(markdown_text)
code_blocks: List[CodeBlock] = []
for match in matches:
language = match[0].strip() if match[0] else ""
code_content = match[1]
code_blocks.append(CodeBlock(code=code_content, language=language))
return code_blocks
@default_subscription
class Executor(RoutedAgent):
def __init__(self, code_executor: CodeExecutor) -> None:
super().__init__("An executor agent.")
self._code_executor = code_executor
@message_handler
async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
code_blocks = extract_markdown_code_blocks(message.content)
if code_blocks:
result = await self._code_executor.execute_code_blocks(
code_blocks, cancellation_token=ctx.cancellation_token
)
print(f"\\n{'-'*80}\\nExecutor:\\n{result.output}")
await self.publish_message(CodeGenMessage(content=result.output), DefaultTopicId())
# 전체 코드 생성 워크플로우를 단일 추적 아래에서
# 추적하기 위해 여기에 weave op를 추가합니다.
# 이는 전적으로 선택 사항이지만 사용을 적극 권장합니다.
@weave.op(call_display_name="CodeGen Agent Workflow")
async def run_codegen(model_name="gpt-4o"): # 업데이트된 모델
work_dir = tempfile.mkdtemp()
runtime = SingleThreadedAgentRuntime()
# 이 예제를 위해 Docker가 실행 중인지 확인하세요
try:
async with DockerCommandLineCodeExecutor(work_dir=work_dir) as executor:
model_client = OpenAIChatCompletionClient(model=model_name)
await Assistant.register(runtime, "assistant", lambda: Assistant(model_client=model_client))
await Executor.register(runtime, "executor", lambda: Executor(executor))
runtime.start()
await runtime.publish_message(
CodeGenMessage(content="2024-01-01부터 현재까지 NVDA 대 TSLA 주식 수익률 플롯을 생성하세요."),
DefaultTopicId(),
)
await runtime.stop_when_idle()
await model_client.close()
except Exception as e:
print(f"Docker 코드 실행기 예제를 실행할 수 없습니다: {e}")
print("Docker가 설치되어 있고 실행 중인지 확인하세요.")
finally:
import shutil
shutil.rmtree(work_dir)
asyncio.run(run_codegen())
더 알아보기
- Weave:
- AutoGen: