k
korAI
고급 전체
🔥 고급2026-05-127~9분

DAG 기반 에이전트 오케스트레이션: 병렬 실행과 의존성 관리로 지연 40% 단축

멀티 에이전트 파이프라인에서 선형 실행 대신 DAG(방향 비순환 그래프)로 태스크 의존성을 모델링하면 독립 태스크를 병렬화해 전체 레이턴시를 대폭 줄일 수 있다. 의존성 해석·실패 전파·부분 재실행 패턴까지 다룬다.

multi-agentorchestrationasync

왜 선형 에이전트 체인이 문제인가

단순 순차 파이프라인(A → B → C → D)은 구현이 쉽지만 독립 태스크까지 직렬화한다. 예를 들어 '웹 검색', 'DB 조회', '코드 실행'이 모두 병렬 가능함에도 순서를 강제하면 각 스텝 평균 2초 × 4스텝 = 8초가 걸린다. DAG로 독립 노드를 병렬 실행하면 크리티컬 패스만 남아 3~5초로 단축되는 사례가 흔하다.

실패 모드: 의존성을 잘못 모델링하면 경쟁 조건(race condition)이 발생하거나, 하나의 실패가 불필요하게 전체를 블록한다.

DAG 실행 엔진 구현

import asyncio
from collections import defaultdict
from typing import Callable, Any
import anthropic

client = anthropic.Anthropic()

class DAGExecutor:
    def __init__(self):
        self.tasks: dict[str, Callable] = {}
        self.deps: dict[str, list[str]] = defaultdict(list)
        self.results: dict[str, Any] = {}

    def register(self, name: str, deps: list[str] = None):
        def decorator(fn):
            self.tasks[name] = fn
            self.deps[name] = deps or []
            return fn
        return decorator

    async def run(self) -> dict[str, Any]:
        in_degree = {n: len(d) for n, d in self.deps.items()}
        ready = [n for n, d in in_degree.items() if d == 0]
        pending: dict[str, asyncio.Task] = {}

        while ready or pending:
            # 준비된 태스크 모두 병렬 시작
            for name in ready:
                pending[name] = asyncio.create_task(
                    self.tasks[name](self.results)
                )
            ready.clear()

            # 하나라도 완료되면 의존성 해소
            done, _ = await asyncio.wait(
                pending.values(), return_when=asyncio.FIRST_COMPLETED
            )
            for task in done:
                finished = next(k for k, v in pending.items() if v is task)
                self.results[finished] = task.result()
                del pending[finished]
                for node, node_deps in self.deps.items():
                    if finished in node_deps:
                        in_degree[node] -= 1
                        if in_degree[node] == 0:
                            ready.append(node)
        return self.results

# 사용 예
executor = DAGExecutor()

@executor.register("search", deps=[])
async def search_web(ctx):
    resp = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=256,
        messages=[{"role": "user", "content": "최신 AI 뉴스 요약"}]
    )
    return resp.content[0].text

@executor.register("db_query", deps=[])
async def query_db(ctx):
    await asyncio.sleep(0.5)  # DB 시뮬레이션
    return {"records": 142}

@executor.register("synthesize", deps=["search", "db_query"])
async def synthesize(ctx):
    resp = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=512,
        messages=[{"role": "user",
                   "content": f"검색: {ctx['search']}\nDB: {ctx['db_query']}"}]
    )
    return resp.content[0].text

트레이드오프와 운영 체크리스트

트레이드오프

  • 병렬 LLM 호출은 TPM(분당 토큰) 한도를 빠르게 소진한다. 동시 실행 수를 asyncio.Semaphore(4)로 제한하면 Rate Limit 없이 40~60% 레이턴시 절감을 유지할 수 있다.
  • 노드 수가 20개를 넘으면 DAG 직렬화·디버깅 비용이 증가한다. 10개 이하의 매크로 노드로 묶는 계층 설계를 권장한다.

부분 재실행: 실패 노드만 재실행하려면 self.results를 Redis에 체크포인트로 저장하고, 완료된 노드는 건너뛴다. 이로써 전체 파이프라인 비용의 최대 70%를 절약할 수 있다.

운영 체크리스트

  • [ ] 각 노드에 timeout=30s 설정, 초과 시 asyncio.TimeoutError 처리
  • [ ] 크리티컬 패스 길이를 사전 계산하여 SLA 예측에 활용
  • [ ] 노드별 토큰 사용량을 usage 필드로 집계, 비용 어트리뷰션 확보
  • [ ] DAG 시각화(Graphviz 등)를 CI에 포함해 의존성 회귀 감지
  • [ ] Rate Limit 429 시 지수 백오프 + jitter 적용(초기 1s, 최대 32s)