import weave
from contextlib import contextmanager
from typing import Dict
# 중첩된 Threads 조정을 위한 전역 thread 컨텍스트
class ThreadContext:
def __init__(self):
self.app_thread_id = None
self.infra_thread_id = None
self.logic_thread_id = None
def setup_for_request(self, request_id: str):
self.app_thread_id = f"app_{request_id}"
self.infra_thread_id = f"{self.app_thread_id}_infra"
self.logic_thread_id = f"{self.app_thread_id}_logic"
# 전역 인스턴스
thread_ctx = ThreadContext()
class InfrastructureLayer:
"""전용 thread에서 모든 인프라 작업을 처리합니다."""
@weave.op
def authenticate_user(self, user_id: str) -> Dict:
# 인증 로직...
return {"user_id": user_id, "authenticated": True}
@weave.op
def call_payment_gateway(self, amount: float) -> Dict:
# 결제 처리...
return {"status": "approved", "amount": amount}
@weave.op
def update_inventory(self, product_id: str, quantity: int) -> Dict:
# 재고 관리...
return {"product_id": product_id, "updated": True}
def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
"""전용 thread 컨텍스트에서 모든 인프라 작업을 실행합니다."""
with weave.thread(thread_ctx.infra_thread_id):
auth_result = self.authenticate_user(user_id)
payment_result = self.call_payment_gateway(order_data["amount"])
inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])
return {
"auth": auth_result,
"payment": payment_result,
"inventory": inventory_result
}
class BusinessLogicLayer:
"""전용 thread에서 비즈니스 로직을 처리합니다."""
@weave.op
def validate_order(self, order_data: Dict) -> Dict:
# 유효성 검사 로직...
return {"valid": True}
@weave.op
def calculate_pricing(self, order_data: Dict) -> Dict:
# 가격 계산...
return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}
@weave.op
def apply_business_rules(self, order_data: Dict) -> Dict:
# 비즈니스 규칙...
return {"rules_applied": ["standard_processing"], "priority": "normal"}
def execute_logic(self, order_data: Dict) -> Dict:
"""전용 thread 컨텍스트에서 모든 비즈니스 로직을 실행합니다."""
with weave.thread(thread_ctx.logic_thread_id):
validation = self.validate_order(order_data)
pricing = self.calculate_pricing(order_data)
rules = self.apply_business_rules(order_data)
return {"validation": validation, "pricing": pricing, "rules": rules}
class OrderProcessingApp:
"""메인 애플리케이션 오케스트레이터"""
def __init__(self):
self.infra = InfrastructureLayer()
self.business = BusinessLogicLayer()
@weave.op
def process_order(self, user_id: str, order_data: Dict) -> Dict:
"""메인 주문 처리 - 앱 thread에서 하나의 턴이 됩니다."""
# 전용 Threads에서 중첩된 작업 실행
infra_results = self.infra.execute_operations(user_id, order_data)
logic_results = self.business.execute_logic(order_data)
# 최종 오케스트레이션
return {
"order_id": f"order_12345",
"status": "completed",
"infra_results": infra_results,
"logic_results": logic_results
}
# 전역 thread 컨텍스트 조정을 통한 사용법
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
# 이 요청에 대한 thread 컨텍스트 설정
thread_ctx.setup_for_request(request_id)
# 앱 thread 컨텍스트에서 실행
with weave.thread(thread_ctx.app_thread_id):
app = OrderProcessingApp()
result = app.process_order(user_id, order_data)
return result
# 사용 예시
order_result = handle_order_request(
request_id="req_789",
user_id="user_001",
order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)
# 예상 Thread 구조:
#
# App Thread: app_req_789
# └── Turn: process_order() ← 메인 오케스트레이션
#
# Infra Thread: app_req_789_infra
# ├── Turn: authenticate_user() ← 인프라 작업 1
# ├── Turn: call_payment_gateway() ← 인프라 작업 2
# └── Turn: update_inventory() ← 인프라 작업 3
#
# Logic Thread: app_req_789_logic
# ├── Turn: validate_order() ← 비즈니스 로직 작업 1
# ├── Turn: calculate_pricing() ← 비즈니스 로직 작업 2
# └── Turn: apply_business_rules() ← 비즈니스 로직 작업 3
#
# 이점:
# - Threads 간 관심사의 명확한 분리
# - thread ID를 파라미터로 계속 전달할 필요 없음
# - 앱/인프라/로직 레이어의 독립적인 모니터링
# - thread 컨텍스트를 통한 전역 조정