THE DEVLOG

scribbly.

LangChain JS 치트시트

2025.07.08 05:06:43

LangGraph

LangGraph는 LangChain 기반한 상태 머신 프레임워크다. LLM을 이용해 유한 상태 머신을 만든다고 보면 된다.
Graph는 노드와 엣지를 이용하여 분기를 처리함을 의미한다.

그래프 선언

import { StateGraph, MessagesAnnotation } from "@langchain/langgraph";

const graph = new StateGraph(MessagesAnnotation)

StateGraph를 이용하여 상태를 이용한 랭그래프를 선언한다. StateGraph는 상태의 스키마를 파라미터로 받는다.

MessagesAnnotation은 Messages 배열을 기반으로 상태를 정의하는 프리셋이다.

노드 추가

import { StateGraph, MessagesAnnotation } from "@langchain/langgraph";

graph.addNode("greeting", (state) => {
  return {
    messages: [new AIMessage("안녕하세요! 저는 개쩌는 인공지능입니다.")],
  };
});

addNode를 이용해 노드를 추가한다.
첫번째 인자는 노드의 이름, 두 번째 인자는 공유 상태(state)를 받아 Partial``` 또는 Promise<Partial````>`를 반환하는 함수(Action)이다.
Action이 반환하는 상태는 기존 state에 누적된다.

엣지 추가

graph.addEdges(START, "greeting")
graph.addEdges("greeting", "chatNode")
graph.addEdges("chatNode", END)

addEdges를 이용해 노드들을 연결해준다.
노드들을 연결할 때에는 필수적으로 START노드와 END노드가 포함되어야 한다.

조건부 엣지 추가

  graph.addConditionalEdges("analyze", (state) => {
    if (state.messages.at(-1)?.content.includes("검색")) return "search";
    return "chat";
  }, {
    search: "googleNode",
    chat: "chatNode",
  });

addConditionalEdges의 첫번째 인자는 분기가 시작하는 노드의 이름이다.
두번째 인자는 문자열을 반환하는 함수인데, 해당 문제는 Map의 Key가 된다.
세번째 인자는 문자열을 Key로, 실행할 노드의 이름을 Value로 가지는 Map이다.

위의 예제는 analyze 노드가 실행된 후, state의 마지막 메시지를 확인한다. 메시지에 "검색"이라는 단어가 있으면 googleNode를 실행한다.

상태가 누적된다는 점을 이용하여 아래와 같이 활용하게 된다.

// LLM을 이용하여 검색 여부를 질의하고, shouldSearch라는 상태를 추가한다.
graph.addNode("decision", async (state) => {
  const lastUserMessage = state.messages.at(-1)?.content || "";

  const prompt = `다음 사용자의 질문을 보고 웹 검색이 필요한지 "true" 또는 "false"로만 판단해줘.\n\n질문: ${lastUserMessage}`;

  const llmResponse = await LLM.invoke(prompt); 
  const shouldSearch = llmResponse.toLowerCase().includes("true");

  return { shouldSearch };
});

// shouldSearch 상태를 조회하여 노드를 선택한다.
graph.addConditionalEdges(
  "decision",
  (state) => {
    return state.shouldSearch ? "search" : "chat";
  },
  {
    search: "googleNode",
    chat: "chatNode",
  }
);

 

그래프 생성 및 실행하기

graph.compile()

const result = await graph.invoke({
  messages: [new HumanMessage("프랑스 혁명에 대해 설명해줘")],
}); 

 compile을 실행하면 graph가 LangChain의 Runnerble 객체로 컴파일 된다.
컴파일 된 그래프는 LangChain의 Runnerble처럼 사용하면 된다.

Runnerble 실행 메서드설명사용 예
.invoke()한 번 실행단일 질문/응답
.stream()실시간 스트리밍채팅, 토큰 출력
.bind()설정 고정한 새 Runnable 반환temperature, tools 지정
.pipe()단계별 처리 연결프롬프트 → LLM → 파서

 

PDF 검색 멀티-에이전트 만들기

Chroma DB를 이용한 RAG
 에서 ReAct Agent로 구현한 모델은 아래와 같은 문제점이 있었다.

  1. 기본적으로 작동 시간이 오래 걸린다.
  2. Qwen 3를 사용하기 때문에 PDF 분석 정확도가 떨어진다.
  3. 응답 정확도를 높이기 위해 PDF 갯수를 늘리면, 메모리 부족으로 더더욱 부정확한 응답을 내놓는다.

따라서 아래와 같이 여러 모델을 사용한 에이전트를 구현하고자 한다.

  1. Qwen 3가 사용자의 메시지를 읽고 tools 사용 여부를 판단한다.
  2. tools를 사용하게 되는 경우 qwen3-embedding과 qwen3-reranker를 이용해 pdf 검색 결과를 반환한다.
  3. HyperCLOVA X SEED가 최종 응답을 반환한다.

즉 일반적으로 성능이 좋은 Qwen3를 이용해 주요 비즈니스 로직을 처리한 후,

한국어 분석 능력이 뛰어나고 정제된 응답을 하는 HyperCLOVA X SEED로 최종 응답을 생성한다.

그래프 설정하기

// 랭그래프 워크플로우 생성
function createWorkflow() {
  const graph = new StateGraph(MessagesAnnotationWithToolCalls)
    .addNode("decision", decisionNode)
    .addNode("response", responseNode)
    .addNode("tools", toolsNode)
    .addEdge(START, "decision")
    .addConditionalEdges("decision", shouldContinue)
    .addEdge("tools", "decision")
    .addEdge("response", END);

  return graph.compile();
}

// 워크플로우 인스턴스
const workflow = createWorkflow();

 node는 decision, tools, response 세 개의 노드로 나뉜다.
decision : qwen3:1.7b를 이용하여 사용자의 메시지가 tools를 사용해야 하는지 아닌지를 판단한다.
tools : 도구를 실행한다. PDFSearchTools는 Chroma DB, Qwen3-Embedding-0.6B, Qwen3-Reranker-0.6B를 이용한다.
response: 응답을 생성한다. HyperCLOVA-X-SEED-Vision-Instruct-3B를 사용한다.

그리고 상태에 따라 tools 노드와 response 노드를 결정하는 shouldContinue라는 조건부 엣지가 있다.

따라서 아래와 같은 다이어그램이 만들어 진다.

0.42

 

Annotation

import { BaseMessage } from "@langchain/core/messages";
import { Annotation, messagesStateReducer } from "@langchain/langgraph";

export const MessagesAnnotationWithToolCalls = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    default: () => [],
    reducer: messagesStateReducer, // MessagesAnnotation과 동일
  }),
  tool_calls: Annotation<{ name: string; args: { userInput: string } }[]>({
    default: () => [],
    reducer: (_, next) => next, // 항상 덮어쓰기
  }),
  tools_checked: Annotation<boolean>({
    default: () => false,
    reducer: (_, next) => next, // 항상 덮어쓰기
  }),
});

messages 어노테이션에 tools에 대한 의사결정 여부를 저장하는 flag 하나와 필요한 tools를 저장하는 tool_calls를 추가하였다.

시작

    let messages: BaseMessage[] = []
    // 사용자 메시지 추가
    messages.push(new HumanMessage({ content: userInput }));

    // 워크플로우 실행
    const result = await workflow.invoke({
      messages: messages,
    });

 사용자의 메시지를 첫 메시지로 하여 시작한다

Decision 노드

export const DECISION_MODEL_NAME = "qwen3:1.7b";

// 의사결정 노드: 도구 필요 여부만 판단
export async function decisionNode(
  state: typeof MessagesAnnotationWithToolCalls.State
) {

  const toolCalls: { name: string; args: { userInput: MessageContent } }[] = [];
  
  if (!state.tools_checked) {
    const qwenModel = new ChatOllama({
      baseUrl: "http://localhost:11434",
      model: DECISION_MODEL_NAME,
      streaming: false,
    });

    // 마지막 사용자 메시지 찾기
    const lastUserMessage = state.messages
      .slice()
      .reverse()
      .find((msg) => msg._getType() === "human");

    const userInput = lastUserMessage?.content || "";

    // 의사결정 프롬프트
    const decisionPrompt = `사용자의 질문을 분석하여 PDF 검색이 필요한지 판단하세요.
      
      사용자 질문: ${userInput}

      다음 중 하나로만 응답하세요:
      - "YES"
      - "NO"
      
      응답:`;

    const decisionResponse = await qwenModel.invoke([
      new HumanMessage(decisionPrompt),
    ]);
    const decision = decisionResponse.content.toString().trim().toUpperCase();

    if (decision.endsWith("YES")) {
      toolCalls.push({
        name: "pdf_search",
        args: { userInput },
      });
    }
  }

  return {
    ...state,
    tools_checked: true,
    tool_calls: toolCalls,
  };
}

toolsChecked가 false이면 qwen3 1.7b를 호출하여 도구를 사용할지 여부를 묻는다. qwen3 1.7b의 응답이 "YES"이면 PDF 검색 도구를 toolCalls 배열에 추가한다.

toolsChecked가 이미 true거나, qwen3 1.7b가 도구 사용이 필요 없다는 의미로 "NO"를 응답하면, 비어있는 toolCalls를 반환한다.

 

조건부 엣지

export function shouldContinue(
  state: typeof MessagesAnnotationWithToolCalls.State
) {

  if (state.tool_calls.length) {
    return "tools";
  }

  return "response";
}

Decision 노드의 실행이 끝난 이후 상태를 확인하는 분기이다.
tool_calls가 비어있으면 tools 노드를 실행할 필요가 없는 것으로 보고 response 노드를 실행한다.

Tools 노드

export async function toolsNode(
  state: typeof MessagesAnnotationWithToolCalls.State
) {
  const toolCalls = state.tool_calls || [];
  const messages = state.messages ? [...state.messages] : [];

  for (const toolCall of toolCalls) {
    if (toolCall.name === "pdf_search") {
      try {
        const searchResult = await pdfSearchTool.invoke({
          query: [toolCall.args.userInput],
        });
        messages.push(
          new AIMessage({
            content: `PDF 검색 결과: ${String(searchResult)}`,
            name: "pdf_search_results",
          })
        );
      } catch {
        messages.push(
          new AIMessage({
            content: "PDF 검색 중 오류가 발생했습니다.",
            name: "pdf_search_error",
          })
        );
      }
    }
  }

  return {
    ...state,
    messages,
    tool_calls: [],
  };
}

toolCalls 배열을 순회하면서 적절한 도구들을 실행한다.
도구를 실행한 결과는 messages에 저장한다.
도구 실행이 끝난 후에는 messages 상태를 업데이트하고, 도구 실행이 끝났다는 의미로 비어있는 toolCalls를 반환한다.

PDFSearchTool

const EMBEDDING_MODEL = "hf.co/Qwen/Qwen3-Embedding-0.6B-GGUF:Q8_0";

export const pdfSearchTool = tool(
  async (input: {
    query: string[] | string;
    limit?: number;
    filename?: string;
  }) => {
    try {
      const { query, limit = 3, filename } = input;

      // Chroma 클라이언트 초기화
      const client = new ChromaClient({
        host: "localhost",
        port: 8000,
      });

      // Ollama 임베딩 함수 초기화
      const embedder = new OllamaEmbeddingFunction({
        model: EMBEDDING_MODEL,
        url: "http://localhost:11434",
      });

      // 컬렉션 가져오기 또는 생성
      const chromaCollection = await client.getOrCreateCollection({
        name: "pdfs",
        embeddingFunction: embedder,
        metadata: {
          "hnsw:space": "cosine",
        },
      });

      // 크로마DB 검색조건
      const searchOptions: {
        queryTexts: string[];
        nResults: number;
        include: ("documents" | "metadatas" | "distances")[];
        where?: { filename: string };
      } = {
        queryTexts: typeof query === "string" ? [query] : [...query],
        nResults: 10,
        include: ["documents", "metadatas", "distances"],
      };

      // 파일명 필터 추가
      if (filename) {
        searchOptions.where = { filename: filename };
      }

      let results = await chromaCollection.query(searchOptions);

      if (filename && !results.documents?.[0]?.length) {
        delete searchOptions.where;
        results = await chromaCollection.query(searchOptions);
      }

      // 검색 결과가 없으면 바로 반환
      if (!results.documents?.[0]?.length) {
        return JSON.stringify(
          {
            query,
            results: [],
            totalFound: 0,
          },
          null,
          2
        );
      }

      // 리랭킹 적용
      const candidates: string[] = (results.documents?.[0] || []).filter(
        (d): d is string => typeof d === "string"
      );
      
      const rerankResult = await rerankTool.invoke({
        query: typeof query === "string" ? query : query[0],
        candidates,
      });

      // 상위 3개만 사용
      let top3: { rank: number; content: string; score: number }[] = [];
      if (Array.isArray(rerankResult)) {
        top3 = rerankResult;
      }

      // 리랭킹 결과 파싱
      const formattedResults = top3.map(
        (item: { rank: number; content: string; score: number }) => {
          // 원본 인덱스 찾기
          const origIdx = candidates.indexOf(item.content);
          return {
            id: item.rank,
            content: item.content,
            metadata: results.metadatas?.[0]?.[origIdx] || {},
            score: item.score,
          };
        }
      );

      return JSON.stringify(
        {
          query,
          results: formattedResults,
          totalFound: formattedResults.length,
        },
        null,
        2
      );
    } catch (error) {
      console.error("PDF 검색 중 오류:", error);
      return `PDF 검색 중 오류가 발생했습니다: ${
        error instanceof Error ? error.message : "알 수 없는 오류"
      }`;
    }
  },
  {
    name: "pdf_search",
    description: "PDF 문서에서 특정 내용을 검색합니다.",
    schema: z.object({
      query: z.array(z.string()).describe("검색할 질문이나 키워드 (string[]}"),
      limit: z.number().optional().describe("반환할 결과 수. (기본값: 3)"),
      filename: z.string().optional().describe("기본값: undefined"),
    }),
  }
);

크로마 DB와 Qwen3-Embedding-0.6B을 이용하여 임베딩 검색을 수행한다.
코사인 유사도를 바탕으로 10개의 결과를 불러온다.
그리고 rerankTool을 이용하여 상위 3개의 결과를 반환한다.

RerankTool

RerankTool은 http://localhost:8001/rerank를 호출하여 Qwen3-reranker-0.6b를 실행한다

# reranker_server.py
from fastapi import FastAPI, Request
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

app = FastAPI()

MODEL_NAME = "Qwen/Qwen3-Reranker-0.6B"
DEVICE = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

print("🔄 모델 로딩 중...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, padding_side='left')
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME, torch_dtype=torch.float16
).to(DEVICE).eval()
print("✅ 모델 로딩 완료")


# ===== 입력 구조 =====
class RerankInput(BaseModel):
    query: str
    documents: list[str]
    instruction: str | None = None  # optional


# ===== 입력 포맷 포장 =====
def format_input(instruction, query, doc):
    if instruction is None:
        instruction = "Given a web search query, retrieve relevant passages that answer the query"
    return f"<Instruct>: {instruction}\n<Query>: {query}\n<Document>: {doc}"


def compute_scores(query: str, docs: list[str], instruction: str | None = None):
    # 데이터를 파싱
    prefix = "<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Answer only \"yes\" or \"no\".<|im_end|>\n<|im_start|>user\n"
    suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n"
    prefix_ids = tokenizer.encode(prefix, add_special_tokens=False)
    suffix_ids = tokenizer.encode(suffix, add_special_tokens=False)

    # 결과 점수를 저장할 리스트
    scores = []

    # 각 문서에 대해 반복하며 점수 계산
    for doc in docs:
        # 텍스트를 데이터로 만들고 파이토치 실행
        text = format_input(instruction, query, doc)
        input_ids = tokenizer.encode(text, add_special_tokens=False)
        final_input = prefix_ids + input_ids + suffix_ids
        final_input = torch.tensor([final_input], device=DEVICE)

        with torch.no_grad(): # gradient 계산(Training의 방식) 비활성화
            # 모델 실행: 마지막 토큰(즉, <think> 이후)의 로짓(logit) 추출
            logits = model(final_input).logits[0, -1]

            # "yes"와 "no" 토큰의 ID를 가져옴
            yes_token = tokenizer.convert_tokens_to_ids("yes")
            no_token = tokenizer.convert_tokens_to_ids("no")

            # 해당 두 토큰에 대해 softmax로 확률화
            probs = torch.softmax(logits[[no_token, yes_token]], dim=0)
            
            # "yes"의 확률 값을 score로 사용 (index 1)
            score = probs[1].item()

            # 점수 리스트에 추가
            scores.append(score)

    return scores



@app.post("/rerank")
async def rerank(input: RerankInput):
    scores = compute_scores(input.query, input.documents, input.instruction)
    return {"scores": scores}

위 로직을 간단하게 설명하면,
Reranker가 사용자의 질문과 문서의 내용을 비교한 후, 연관이 있을 정도(YES)과 연관이 없을 정도(NO)를 반환한다. (logits)
그리고 이를 softmax를 통해 연관이 있을 확률과 연관이 없을 확률로 변환한다. (probs)
YES가 나올 probs를 점수로 사용한다.

RerankTool은 점수 상위 3개만을 선정하여 반환한다.

Response 노드

마지막 Response 노드는 하이퍼클로바X시드 3B모델을 이용해서 프롬프트 엔지니어링을 하여 대답하게 한다.

import { HumanMessage } from "@langchain/core/messages";
import { MessagesAnnotation } from "@langchain/langgraph";
import { ChatOllama } from "@langchain/ollama";
export const CHAT_MODEL =
  "hf.co/cherryDavid/HyperCLOVA-X-SEED-Vision-Instruct-3B-Llamafied-Q4_K_S-GGUF:latest";

// 최종 응답 생성 노드 (hyperclovaxseed 모델 사용)
export async function responseNode(state: typeof MessagesAnnotation.State) {

  const chatModel = new ChatOllama({
    baseUrl: "http://localhost:11434",
    model: CHAT_MODEL,
    streaming: false,
  });

  // 마지막 사용자 메시지 찾기
  const lastUserMessage = state.messages
    .slice()
    .reverse()
    .find((msg) => msg._getType() === "human");

  const userInput = lastUserMessage?.content || "";

  // PDF 검색 결과가 있는지 확인
  const pdfResults = state.messages
    .slice()
    .reverse()
    .find((msg) => msg.name === "pdf_search_results");

  let prompt = `사용자에게 한국어로 친절하고 정확하게 답변하세요.
  
  사용자 질문: ${userInput}`;

  if (pdfResults) {
    prompt += `
  
  PDF 검색 결과:
  ${pdfResults.content}
  
  위 검색 결과를 바탕으로 사용자의 질문에 답변하세요. 검색 결과가 없다면 그 사실을 명시하고 일반적인 답변을 제공하세요.`;
  } else {
    prompt += `
  
  PDF 검색이 필요하지 않은 질문입니다. 일반적인 대화나 인사에 적절히 응답하세요.`;
  }

  const response = await chatModel.invoke([new HumanMessage(prompt)]);

  return {
    messages: [response],
  };
}

한편 현재 구조에서는 사용자 경험을 더 낫게 하기 위해 최종 응답을 생성하는 chatOllama를 streaming : true로 하여 LangGraph와 별도로 분리 시키는 것도 가능하다. (LangGraph는 응답을 streaming으로 반환하지 못한다.)