LangGraph의 astream_events: 에이전트 실행을 실시간으로 들여다보기
💡 한 줄 요약:
astream_events()는 LangGraph 그래프 실행 중 발생하는 모든 이벤트(LLM 토큰, 툴 호출, 노드 시작/종료 등)를 실시간 스트림으로 받을 수 있는 비동기 API입니다.
들어가며
AI 에이전트가 응답을 생성하는 데 25초가 걸린다고 상상해보세요. 사용자는 평균 8초 후에 페이지를 이탈합니다. 하지만 스트리밍을 도입하면 어떨까요? 총 연산 시간은 동일하지만, 첫 번째 토큰이 0.4초 만에 나타납니다. 이탈률이 급격히 줄어듭니다.
이것은 단순한 성능 최적화가 아닙니다. 사용자 인식의 전환입니다. 답변 전체를 기다리는 것과, 단어 하나하나가 나타나는 것을 보는 경험은 인지적으로 완전히 다릅니다.
LangGraph는 이를 위해 여러 스트리밍 API를 제공합니다. 이 글에서는 그 중 가장 강력한 astream_events()에 대해 깊이 알아봅니다.
이 글을 읽고 나면 다음을 할 수 있습니다:
✅
stream(),astream(),astream_events()의 차이를 이해한다✅ 이벤트 타입을 필터링하여 원하는 정보만 추출한다
✅ 토큰 단위 스트리밍을 실제 코드로 구현한다
✅ 특정 노드의 이벤트만 선택적으로 처리한다
사전 지식
이 글을 이해하려면 아래 개념에 대한 기본 지식이 필요합니다:
LangGraph 기본 구조: 노드(Node), 엣지(Edge), 그래프(Graph) 개념
Python async/await: 비동기 프로그래밍 기초
LangChain 메시지 형식:
HumanMessage,AIMessage등
LangGraph의 스트리밍 API 비교
LangGraph는 목적에 따라 세 가지 스트리밍 방식을 제공합니다.
| 메서드 | 스트리밍 단위 | LLM 토큰 포함 | 주요 용도 |
|---|---|---|---|
stream("values") |
노드 완료 후 전체 상태 | ❌ | 디버깅, 백엔드 오케스트레이션 |
stream("updates") |
노드 완료 후 변경된 키만 | ❌ | 진행 상태 표시 |
astream_events() |
LLM 토큰 + 모든 생명주기 이벤트 | ✅ | 프론트엔드 실시간 렌더링 |
stream()과 astream()은 노드가 완료된 후 결과를 받습니다. 반면 astream_events()는 노드 실행 도중 발생하는 모든 이벤트를 실시간으로 받을 수 있습니다. 특히 LLM이 토큰을 하나씩 생성할 때마다 스트리밍할 수 있다는 점이 핵심 차이입니다.
astream_events()란?
개념 정의
astream_events()는 LangGraph 그래프 실행 중 발생하는 모든 이벤트를 비동기 이터레이터로 반환하는 메서드입니다. LangChain의 콜백 인프라를 내부적으로 활용하여, 노드 시작/종료, LLM 토큰 생성, 툴 호출 등 그래프 실행의 전체 생명주기를 세밀하게 관찰할 수 있습니다.
쉽게 말하면, 에이전트 내부에 카메라를 달아서 매 순간 무슨 일이 벌어지는지 실시간 중계방송처럼 볼 수 있는 API입니다.
왜 필요한가?
기존 방식 (stream()) |
astream_events() 사용 시 |
|---|---|
| 노드가 끝나야 결과를 받음 | 노드 실행 중 실시간으로 토큰 수신 |
| LLM 생성 과정 불투명 | 토큰 하나하나를 즉시 UI에 렌더링 가능 |
| 어느 노드에서 왔는지 파악 어려움 | langgraph_node 메타데이터로 출처 확인 |
| 툴 호출 시점 알 수 없음 | on_tool_start/end 이벤트로 정확히 파악 |
핵심 원리 이해하기
1단계: 기본 사용법
astream_events()는 async for 루프와 함께 사용합니다. version="v2" 파라미터는 LangGraph 그래프에서 필수입니다.
from langchain_core.messages import HumanMessage
async for event in graph.astream_events(
{"messages": [HumanMessage(content="서울 날씨 알려줘")]},
version="v2",
config={"configurable": {"thread_id": "session-1"}},
):
print(event)
각 이벤트(event)는 다음과 같은 구조의 딕셔너리입니다:
{
"event": "on_chat_model_stream", # 이벤트 타입
"name": "ChatOpenAI", # 컴포넌트 이름
"data": {"chunk": AIMessageChunk}, # 실제 데이터
"metadata": {
"langgraph_node": "chatbot", # 이벤트가 발생한 노드
"langgraph_step": 1, # 실행 단계
}
}
2단계: 이벤트 타입 이해하기
astream_events()가 방출하는 주요 이벤트 타입은 다음과 같습니다:
| 이벤트 타입 | 발생 시점 |
|---|---|
on_chain_start / on_chain_end |
LangGraph 노드가 시작 / 완료될 때 |
on_chat_model_start |
LLM이 프롬프트를 받을 때 |
on_chat_model_stream |
LLM이 토큰을 하나 생성할 때마다 |
on_chat_model_end |
LLM이 응답 생성을 완료할 때 |
on_tool_start / on_tool_end |
툴(도구)이 호출되고 결과를 반환할 때 |
on_custom_event |
노드에서 수동으로 emit한 커스텀 이벤트 |
3단계: 토큰 스트리밍 구현하기
가장 흔한 사용 사례인 토큰 단위 스트리밍을 구현합니다.
async def stream_tokens(graph, user_input: str):
"""LLM이 생성하는 토큰을 하나씩 yield"""
async for event in graph.astream_events(
{"messages": [HumanMessage(content=user_input)]},
version="v2",
):
event_type = event["event"]
# LLM이 토큰을 생성할 때만 처리
if event_type == "on_chat_model_stream":
chunk = event["data"]["chunk"]
token = chunk.content
if token: # 빈 청크 제외 (tool-call 델타 등)
yield token
# 사용 예시
async for token in stream_tokens(graph, "파이썬이란 무엇인가요?"):
print(token, end="", flush=True)
💬 포인트:
chunk.content가 빈 문자열인 경우도 있습니다. 툴 호출 델타처럼 텍스트 토큰이 아닌 청크가 섞여 오기 때문에 반드시if token:체크가 필요합니다.
실제 활용 예시
예시 1: 특정 노드의 이벤트만 필터링하기
에이전트에 여러 노드가 있을 때, chatbot 노드에서 생성되는 토큰만 받고 싶다면:
TARGET_NODE = "chatbot"
async for event in graph.astream_events(input, version="v2"):
# 노드 정보는 metadata에서 꺼냅니다
node_name = event.get("metadata", {}).get("langgraph_node", "")
if event["event"] == "on_chat_model_stream" and node_name == TARGET_NODE:
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
예시 2: 툴 호출 감지 + 토큰 스트리밍 함께 처리하기
인터랙티브 UI를 구축할 때 "검색 중..." 같은 상태 메시지와 실제 토큰을 함께 처리합니다:
async for event in graph.astream_events(input, version="v2"):
event_type = event["event"]
if event_type == "on_tool_start":
tool_name = event["name"]
print(f"\n🔧 도구 실행 중: {tool_name}...")
elif event_type == "on_tool_end":
print(f"\n✅ 도구 완료!")
elif event_type == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
elif event_type == "on_chain_end" and event["name"] == "LangGraph":
print("\n\n[생성 완료]")
장점과 주의사항
✅ 장점
최고의 UX: LLM 토큰을 한 글자씩 받으므로 ChatGPT와 같은 타이핑 효과를 구현할 수 있음
세밀한 관찰: 그래프 실행의 모든 단계를 이벤트로 포착 가능
노드별 필터링:
langgraph_node메타데이터로 원하는 노드의 이벤트만 선택 가능멀티 에이전트 지원: 서브그래프(subgraph) 이벤트도
ns(namespace) 필드로 구분 가능
⚠️ 주의사항 / 한계
비동기 전용:
async for가 필요하므로 동기 코드베이스에서 사용하려면 별도 처리 필요이벤트 노이즈: 이벤트 수가 많으므로 필요한 타입만 필터링하는 것이 중요
version="v2"필수: LangGraph 그래프에서는 반드시version="v2"를 명시해야 함빈 청크 처리:
chunk.content가 빈 문자열인 케이스를 반드시 핸들링해야 함
🔄 언제 사용하고, 언제 피해야 할까?
| 사용하기 좋은 경우 | 피해야 하는 경우 |
|---|---|
| 프론트엔드에서 토큰 단위 렌더링이 필요할 때 | 단순히 최종 결과만 필요할 때 |
| 에이전트 내부 동작을 디버깅할 때 | 동기 코드베이스에서 간단히 사용할 때 |
| 툴 호출 시점을 정확히 알아야 할 때 | 노드 완료 상태만 추적하면 충분할 때 |
| 실시간 대시보드, 채팅 UI 구현 시 | 서버 사이드 배치 처리만 할 때 |
요약 & 마무리
이번 글에서 배운 내용을 정리합니다:
astream_events()는 이벤트 기반 스트리밍 — 노드 완료가 아닌, 실행 중 발생하는 모든 이벤트를 실시간으로 수신합니다.핵심 이벤트 타입 —
on_chat_model_stream(토큰),on_tool_start/end(툴),on_chain_start/end(노드) 세 가지를 기억하세요.필터링이 핵심 —
event["event"]로 이벤트 타입을,metadata["langgraph_node"]로 노드를 필터링하면 원하는 정보만 깔끔하게 처리할 수 있습니다.
astream_events()를 활용하면 ChatGPT나 Claude처럼 토큰이 하나씩 타이핑되는 UX를 직접 구현할 수 있습니다.
참고 자료
이 글은 Claude의 도움을 받아 작성되었습니다.