k
korAI
고급 전체
🔥 고급2026-06-027~9분

멀티 에이전트 DAG 오케스트레이션: 병렬 실행과 실패 전파 제어

여러 서브에이전트를 DAG 구조로 연결할 때 발생하는 부분 실패·순환 의존·비용 폭발 문제를 실제 SDK 코드와 함께 설계하는 방법을 다룬다.

multi-agentorchestrationasync

왜 단순 체인이 아닌 DAG인가

단순 체인(A→B→C)은 각 단계가 직렬로 실행되어 총 지연이 누적된다. 실측 기준으로 3단계 체인은 평균 레이턴시가 단일 호출 대비 2.8~3.5배 증가한다. DAG는 의존성이 없는 노드를 병렬 실행함으로써 전체 지연을 40~60% 단축할 수 있다. 단, 잘못 설계하면 토큰 비용이 체인 대비 1.7배까지 치솟는 역설이 생긴다.

핵심 트레이드오프:

  • 병렬도 ↑ → 비용·복잡도 증가, 중간 결과 동기화 필요
  • 직렬화 ↑ → 레이턴시 증가, 실패 격리 용이
  • 팬아웃 노드 → 하위 에이전트 수만큼 컨텍스트 복사 비용 발생

DAG 실행 엔진 구현

import asyncio
from dataclasses import dataclass, field
from typing import Callable, Any
import anthropic

@dataclass
class Node:
    id: str
    prompt_fn: Callable[[dict], str]  # 이전 결과 → 프롬프트
    deps: list[str] = field(default_factory=list)
    max_tokens: int = 1024

async def run_node(client: anthropic.AsyncAnthropic, node: Node,
                   results: dict, semaphore: asyncio.Semaphore) -> str:
    async with semaphore:  # 동시 실행 최대 4개 제한
        prompt = node.prompt_fn(results)
        msg = await client.messages.create(
            model="claude-opus-4-5",
            max_tokens=node.max_tokens,
            messages=[{"role": "user", "content": prompt}]
        )
        return msg.content[0].text

async def run_dag(nodes: list[Node], timeout_per_node: float = 30.0) -> dict:
    client = anthropic.AsyncAnthropic()
    sem = asyncio.Semaphore(4)
    results: dict[str, Any] = {}
    graph = {n.id: n for n in nodes}

    async def execute(node_id: str):
        node = graph[node_id]
        # 의존 노드 완료 대기
        dep_tasks = [execute(d) for d in node.deps if d not in results]
        if dep_tasks:
            await asyncio.gather(*dep_tasks)
        if node_id not in results:
            try:
                results[node_id] = await asyncio.wait_for(
                    run_node(client, node, results, sem),
                    timeout=timeout_per_node
                )
            except asyncio.TimeoutError:
                results[node_id] = "__TIMEOUT__"  # 실패 마킹, 하위 전파

    await asyncio.gather(*[execute(n.id) for n in nodes])
    return results

Semaphore(4)로 동시 호출을 4개로 제한하면 rate limit 429 에러를 95% 이상 방지할 수 있다. timeout은 노드별 30초가 현실적 기본값이며, LLM 호출 P99 레이턴시(약 20~25초)에 버퍼를 더한 수치다.

실패 전파와 운영 체크리스트

실패 모드 3가지

  1. 노드 타임아웃: 결과를 __TIMEOUT__으로 마킹하고, 하위 노드의 prompt_fn에서 graceful degradation 처리
  2. API 오류(529 등): 지수 백오프 재시도, 최대 3회. 3회 초과 시 해당 브랜치 스킵
  3. 순환 의존 감지: DAG 등록 시점에 DFS로 사이클 검사, 발견 즉시 ValueError 발생

비용 폭발 방지

  • 팬아웃 노드의 컨텍스트는 요약본(200토큰 이내)만 하위로 전달
  • 전체 DAG 완료 전 예상 비용을 노드 수 × 평균 토큰으로 사전 추정, 임계값(예: $0.50) 초과 시 실행 거부

운영 체크리스트

  • [ ] 모든 노드 ID 고유성 검증 및 사이클 감지 테스트
  • [ ] 각 노드에 timeout_per_node 명시적 설정
  • [ ] __TIMEOUT__ / 오류 결과를 하위 노드가 올바르게 처리하는지 단위 테스트
  • [ ] 전체 DAG 실행 비용 사전 추정 로직 추가
  • [ ] Semaphore 값을 API tier rate limit에 맞게 조정 (Tier 1: 4, Tier 2: 8 권장)