01 — Pourquoi LangGraph s'est imposé
Entre 2022 et 2024, plusieurs frameworks d'agents se sont disputés le terrain — LangChain Agents v0, AutoGen, CrewAI, des projets internes chez chaque éditeur. En 2026, le standard de fait de la production est LangGraph. Pourquoi ?
Une seule raison, structurelle : LangGraph rend l'état d'agent explicite et inspectable. Tous les autres frameworks ont longtemps caché l'état derrière des abstractions imperatives (boucle while implicite, callbacks, agents « intelligents » qui décidaient de tout). Cette opacité est acceptable en démo. Elle est intolérable en production, où il faut pouvoir :
- Suspendre un agent en plein vol et reprendre plus tard.
- Modifier l'état entre deux étapes (correction humaine, ajout d'info).
- Rejouer une trajectoire à l'identique pour la déboguer.
- Audit un agent : montrer la séquence exacte de décisions prises.
LangGraph résout ces quatre besoins en posant une primitive : un graphe de transitions explicites sur un état typé. Tout le reste découle.
02 — Anatomie d'un graphe
Quatre primitives suffisent à modéliser n'importe quel workflow agentique : l'état, les nœuds, les arêtes, le checkpointer.
L'état (TypedDict)
L'état est un TypedDict partagé entre tous les nœuds. Il contient ce qui doit circuler d'une transition à l'autre : messages, scratchpad de raisonnement, résultats d'outils, mémoire de travail, indicateur de terminaison.
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class GraphState(TypedDict):
messages: Annotated[list, add_messages]
plan: str | None
tool_results: list
iteration: int
done: bool
Le Annotated + reducer (add_messages) est important : il indique comment les valeurs successives doivent se combiner. Pour les messages, on append ; pour un compteur, on incrémente ; pour une décision booléenne, on écrase. Bien définir les reducers est l'une des décisions les plus impactantes.
Les nœuds (functions)
Un nœud est une fonction (ou méthode async) qui prend l'état courant et retourne une mise à jour partielle de l'état. Pas un nouvel état complet : juste les champs modifiés.
def planner(state: GraphState) -> dict:
last_msg = state["messages"][-1].content
plan = llm.invoke(f"Planifie pour: {last_msg}").content
return {"plan": plan, "iteration": state["iteration"] + 1}
Les arêtes (edges) et arêtes conditionnelles
Une arête simple connecte deux nœuds : après A, va vers B. Une arête conditionnelle prend l'état et retourne le nom du nœud suivant — c'est le mécanisme de routage de l'agent.
def router(state: GraphState) -> str:
if state["done"]:
return END
if state["iteration"] > MAX_ITER:
return "timeout_handler"
if needs_tool(state["plan"]):
return "call_tool"
return "respond"
graph.add_conditional_edges("planner", router)
Ne jamais rendre un routage dépendant du contenu d'une réponse LLM non-structurée (« cherche le mot 'recherche' dans la sortie »). Toujours router sur des champs typés de l'état, mis à jour explicitement par les nœuds précédents. Sinon, la moindre variation de sortie casse le graphe.
Le checkpointer
C'est ce qui rend tout l'édifice productionnable. Le checkpointer persiste l'état à chaque transition — vous pouvez interrompre, reprendre, time-traveler.
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(POSTGRES_URI)
graph = builder.compile(checkpointer=checkpointer)
# Invocation avec thread_id : reprend là où on s'était arrêté
config = {"configurable": {"thread_id": "session-42"}}
result = graph.invoke({"messages": [...]}, config)
03 — Checkpointer en production
Trois backends sont supportés : SQLite (dev), Postgres (production), Redis (haute fréquence). Notre choix par défaut en production : Postgres.
Pourquoi Postgres : durabilité, transactions ACID, requêtes SQL pour l'inspection, intégration native avec les outils d'audit. Redis est plus rapide mais perd des checkpoints en cas de panne ; tolérable pour une session utilisateur courte, dangereux pour des trajectoires longues.
Le schéma Postgres créé par LangGraph est simple : une table checkpoints et une table writes. Indexées sur thread_id et checkpoint_id. La requête « montre-moi l'état du thread X à l'étape N » est triviale.
Mettre une politique de rétention sur les checkpoints (TTL ou archivage). En 6 mois, un agent productif peut générer plusieurs millions de checkpoints. Sans nettoyage, vous tuez votre Postgres.
04 — Streaming & partial outputs
Pour les interfaces utilisateur (chat, agent d'assistance), il faut streamer : l'utilisateur doit voir les tokens arriver, savoir quel nœud est en train de s'exécuter, quel outil est appelé.
LangGraph propose trois modes de streaming :
| Mode | Ce qu'on reçoit | Usage |
|---|---|---|
values | L'état complet à chaque transition | Debug, replay UI |
updates | Les deltas d'état à chaque transition | Production typique |
messages | Les tokens LLM en flux | Chat UI fluide |
En production, on combine : messages pour les tokens (affichage progressif), updates pour les transitions de nœud (notifier l'UI « l'agent appelle l'outil X »). LangGraph supporte les deux simultanément depuis la 0.3.
async for event in graph.astream(
{"messages": [user_msg]},
config,
stream_mode=["messages", "updates"]
):
mode, payload = event
if mode == "messages":
# token, métadonnées du nœud émetteur
token, meta = payload
yield {"type": "token", "content": token.content, "node": meta["langgraph_node"]}
elif mode == "updates":
# {node_name: delta_state}
for node_name, delta in payload.items():
yield {"type": "transition", "node": node_name}
05 — Human-in-the-loop
Pour les agents qui prennent des décisions à enjeu (envoyer un email, signer une commande, modifier une base), on insère une pause humaine : l'agent s'arrête, propose son action, attend une approbation. LangGraph rend cela natif via interrupt.
from langgraph.types import interrupt, Command
def confirm_send_email(state: GraphState) -> dict:
# Suspend l'exécution et renvoie cette valeur au caller
decision = interrupt({
"type": "approval",
"action": "send_email",
"to": state["recipient"],
"subject": state["subject"],
"body": state["body"],
})
if decision == "approve":
send_email(state)
return {"email_sent": True}
elif decision == "edit":
return {"messages": [HumanMessage("Refais avec ces corrections...")]}
else:
return {"email_sent": False, "done": True}
Côté caller, après une interruption, on reprend via Command(resume=...) :
# 1er appel : l'agent travaille puis s'interrompt sur confirm_send_email
state = graph.invoke({"messages": [user_msg]}, config)
# state contient un __interrupt__ avec la demande d'approbation
# UI montre la demande à l'utilisateur, qui clique « approve »
# 2e appel : on reprend
state = graph.invoke(Command(resume="approve"), config)
Le checkpointer rend cela trivial : l'état est persisté pendant l'attente, peut survivre à un redémarrage de service, peut attendre des minutes ou des jours.
06 — Sub-graphs & composition
À partir d'une certaine complexité (5-10 nœuds), un graphe monolithique devient illisible. La composition en sub-graphs résout ça : un nœud peut être lui-même un graphe.
# Sub-graph "research" : 4 nœuds internes
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_node)
research_graph.add_node("filter", filter_node)
research_graph.add_node("summarize", summarize_node)
research_compiled = research_graph.compile()
# Graphe parent : embarque research comme nœud
parent = StateGraph(ParentState)
parent.add_node("research", research_compiled)
parent.add_node("draft", draft_node)
parent.add_node("review", review_node)
Important : les états du parent et du sub-graph peuvent être différents. LangGraph fournit deux mécanismes pour mapper l'un sur l'autre — partage de champs typés ou transformation explicite à l'entrée/sortie du sub-graph.
Au-delà de la lisibilité, les sub-graphs permettent de versionner indépendamment chaque sous-système, d'A/B tester un sub-graph en isolation, de réutiliser un même sub-graph dans plusieurs agents.
07 — Observabilité LangSmith / MLflow
LangGraph s'instrumente nativement vers LangSmith (de l'éditeur, payant) et vers MLflow Tracing v3 (open source, self-host). Les deux fonctionnent. Notre choix par défaut chez nos clients : MLflow + Langfuse, pour rester sur du self-host EU.
import mlflow
# Auto-instrumentation LangGraph
mlflow.langchain.autolog()
mlflow.set_experiment("acpr-agent-prod")
with mlflow.start_run() as run:
result = graph.invoke({"messages": [msg]}, config)
# Chaque nœud, chaque appel LLM, chaque tool call est tracé
mlflow.log_metric("trajectory_steps", len(result["messages"]))
Côté Langfuse, on configure un handler :
from langfuse.callback import CallbackHandler
handler = CallbackHandler(public_key=..., secret_key=..., host=LANGFUSE_HOST)
result = graph.invoke(input, config={**config, "callbacks": [handler]})
Les deux co-existent sans friction. MLflow garde la trace structurée pour les audits AI Act. Langfuse fournit le dashboard temps-réel pour les opérateurs.
08 — Pièges fréquents
Voici les pièges qu'on a rencontrés systématiquement chez nos clients qui passent de LangGraph en démo à LangGraph en production.
Piège 1 — Reducer manquant ou mal défini
Sans add_messages ou équivalent, le champ messages est écrasé à chaque transition au lieu d'être appendé. L'agent oublie son contexte. Symptôme : l'agent répète les mêmes questions ou refait les mêmes actions. Toujours auditer les reducers avant le premier run.
Piège 2 — État qui enfle
Le scratchpad de raisonnement, les résultats d'outils volumineux, les messages s'accumulent. À l'itération 30, le LLM reçoit 80 000 tokens de contexte. Coût explose, qualité chute. Solution : summarization périodique de l'état, ou pruning explicite après un seuil.
Piège 3 — Tool calls non-déterministes
Si vos outils ne sont pas idempotents (envoi d'email, modification de base), un replay déclenche des doublons. Toujours wrapper les outils à effet de bord dans un dédupeur basé sur un identifiant de transition.
Piège 4 — Pas de budget
Un agent sans limite stricte sur max_iterations, max_tokens, max_cost_eur finira par boucler à l'infini ou exploser votre facture. Implémenter ces budgets dans l'état lui-même, avec un nœud final qui force END.
def budget_guard(state: GraphState) -> str:
if state["iteration"] >= MAX_ITER:
return "timeout_handler"
if state["cost_eur"] > MAX_COST:
return "budget_exceeded"
return "continue"
Piège 5 — Erreurs non typées dans les outils
Un outil qui lève une exception Python brute fait crasher tout le graphe. Toujours retourner une erreur structurée que l'agent peut interpréter et corriger. Pattern : Result[Ok, ToolError] avec retry policy explicite.
Piège 6 — Pas de versioning des prompts
Les prompts vivent dans le code, ils sont modifiés par l'équipe au fil de l'eau, sans review systématique. Un changement de prompt fait régresser silencieusement la qualité. Mettre les prompts dans un MLflow Prompt Registry ou un fichier YAML versionné, avec une suite d'eval qui tourne à chaque PR sur les prompts.
LangGraph à industrialiser ?
Mise en place checkpointer, observabilité MLflow/Langfuse, human-in-the-loop, sub-graphs, suites d'eval en CI/CD. Première analyse de faisabilité offerte.
Échanger avec un expert