🔥 고급2026-05-127~9분
DAG 기반 에이전트 오케스트레이션: 병렬 실행과 의존성 관리로 지연 40% 단축
멀티 에이전트 파이프라인에서 선형 실행 대신 DAG(방향 비순환 그래프)로 태스크 의존성을 모델링하면 독립 태스크를 병렬화해 전체 레이턴시를 대폭 줄일 수 있다. 의존성 해석·실패 전파·부분 재실행 패턴까지 다룬다.
multi-agentorchestrationasync
왜 선형 에이전트 체인이 문제인가
단순 순차 파이프라인(A → B → C → D)은 구현이 쉽지만 독립 태스크까지 직렬화한다. 예를 들어 '웹 검색', 'DB 조회', '코드 실행'이 모두 병렬 가능함에도 순서를 강제하면 각 스텝 평균 2초 × 4스텝 = 8초가 걸린다. DAG로 독립 노드를 병렬 실행하면 크리티컬 패스만 남아 3~5초로 단축되는 사례가 흔하다.
실패 모드: 의존성을 잘못 모델링하면 경쟁 조건(race condition)이 발생하거나, 하나의 실패가 불필요하게 전체를 블록한다.
DAG 실행 엔진 구현
import asyncio
from collections import defaultdict
from typing import Callable, Any
import anthropic
client = anthropic.Anthropic()
class DAGExecutor:
def __init__(self):
self.tasks: dict[str, Callable] = {}
self.deps: dict[str, list[str]] = defaultdict(list)
self.results: dict[str, Any] = {}
def register(self, name: str, deps: list[str] = None):
def decorator(fn):
self.tasks[name] = fn
self.deps[name] = deps or []
return fn
return decorator
async def run(self) -> dict[str, Any]:
in_degree = {n: len(d) for n, d in self.deps.items()}
ready = [n for n, d in in_degree.items() if d == 0]
pending: dict[str, asyncio.Task] = {}
while ready or pending:
# 준비된 태스크 모두 병렬 시작
for name in ready:
pending[name] = asyncio.create_task(
self.tasks[name](self.results)
)
ready.clear()
# 하나라도 완료되면 의존성 해소
done, _ = await asyncio.wait(
pending.values(), return_when=asyncio.FIRST_COMPLETED
)
for task in done:
finished = next(k for k, v in pending.items() if v is task)
self.results[finished] = task.result()
del pending[finished]
for node, node_deps in self.deps.items():
if finished in node_deps:
in_degree[node] -= 1
if in_degree[node] == 0:
ready.append(node)
return self.results
# 사용 예
executor = DAGExecutor()
@executor.register("search", deps=[])
async def search_web(ctx):
resp = client.messages.create(
model="claude-opus-4-5",
max_tokens=256,
messages=[{"role": "user", "content": "최신 AI 뉴스 요약"}]
)
return resp.content[0].text
@executor.register("db_query", deps=[])
async def query_db(ctx):
await asyncio.sleep(0.5) # DB 시뮬레이션
return {"records": 142}
@executor.register("synthesize", deps=["search", "db_query"])
async def synthesize(ctx):
resp = client.messages.create(
model="claude-opus-4-5",
max_tokens=512,
messages=[{"role": "user",
"content": f"검색: {ctx['search']}\nDB: {ctx['db_query']}"}]
)
return resp.content[0].text
트레이드오프와 운영 체크리스트
트레이드오프
- 병렬 LLM 호출은 TPM(분당 토큰) 한도를 빠르게 소진한다. 동시 실행 수를
asyncio.Semaphore(4)로 제한하면 Rate Limit 없이 40~60% 레이턴시 절감을 유지할 수 있다. - 노드 수가 20개를 넘으면 DAG 직렬화·디버깅 비용이 증가한다. 10개 이하의 매크로 노드로 묶는 계층 설계를 권장한다.
부분 재실행: 실패 노드만 재실행하려면 self.results를 Redis에 체크포인트로 저장하고, 완료된 노드는 건너뛴다. 이로써 전체 파이프라인 비용의 최대 70%를 절약할 수 있다.
운영 체크리스트
- [ ] 각 노드에
timeout=30s설정, 초과 시asyncio.TimeoutError처리 - [ ] 크리티컬 패스 길이를 사전 계산하여 SLA 예측에 활용
- [ ] 노드별 토큰 사용량을
usage필드로 집계, 비용 어트리뷰션 확보 - [ ] DAG 시각화(Graphviz 등)를 CI에 포함해 의존성 회귀 감지
- [ ] Rate Limit 429 시 지수 백오프 + jitter 적용(초기 1s, 최대 32s)