LLM FullStacker/LangChain

[LCC-10] LangChain RunnableParallel, @chain 이해

Peter Note 2024. 8. 13. 00:00

RAG 구성시에 마지막 chain을 만들 때 retriever를 설정할 때 RunnableParallel을 사용한다. 

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

# The prompt expects input with keys for "context" and "question"
prompt = ChatPromptTemplate.from_template(template)

model = ChatOpenAI()

retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

// 아래 3가지의 사용방식은 동일한다. 
// ref: https://python.langchain.com/v0.2/docs/how_to/parallel/
// {"context": retriever, "question": RunnablePassthrough()}
// RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
// RunnableParallel(context=retriever, question=runnablePassthrough())

retrieval_chain.invoke("where did harrison work?")

 

RunnablePassthrough 클래스는 RunnableSerializable을 상속받고 있다. 

class RunnablePassthrough(RunnableSerializable[Other, Other]):
    ... 중략 ..
    
    def invoke(
        self, input: Other, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> Other:
        if self.func is not None:
            call_func_with_variable_args(
                self.func, input, ensure_config(config), **kwargs
            )
        return self._call_with_config(identity, input, config)

 

RunnablePassthrough 사용 예

from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

runnable = RunnableParallel(
    origin=RunnablePassthrough(),
    modified=lambda x: x+1
)

runnable.invoke(1) # {'origin': 1, 'modified': 2}


def fake_llm(prompt: str) -> str: # Fake LLM for the example
    return "completion"

chain = RunnableLambda(fake_llm) | {
    'original': RunnablePassthrough(), # Original LLM output
    'parsed': lambda text: text[::-1] # Parsing logic
}

chain.invoke('hello') # {'original': 'completion', 'parsed': 'noitelpmoc'}

 

 

여러 PromptTemplate을 병렬로 사용하기

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
    ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

map_chain.invoke({"topic": "bear"})

// 결과
{'joke': AIMessage(content="Why don't bears like fast food? Because they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_d9767fc5b9', 'finish_reason': 'stop', 'logprobs': None}, id='run-fe024170-c251-4b7a-bfd4-64a3737c67f2-0'),
 'poem': AIMessage(content='In the quiet of the forest, the bear roams free\nMajestic and wild, a sight to see.', response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 15, 'total_tokens': 39}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-2707913e-a743-4101-b6ec-840df4568a76-0')}

 

 

RunnableParallel에 추가 dictionary 정보를 주고 싶을 경우 

RunnableAssign을 사용한다. 

from typing import Dict
from langchain_core.runnables.passthrough import (
    RunnableAssign,
    RunnableParallel,
)
from langchain_core.runnables.base import RunnableLambda

def add_ten(x: Dict[str, int]) -> Dict[str, int]:
    return {"added": x["input"] + 10}

mapper = RunnableParallel(
    {"add_step": RunnableLambda(add_ten),}
)

runnable_assign = RunnableAssign(mapper)

# Synchronous example
runnable_assign.invoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}

# Asynchronous example
await runnable_assign.ainvoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}

 

RunnablePassthrough.assign 에서도 사용한다. 

  class RunnablePassthrough(RunnableSerializable[Other, Other]):
    ... 중략 ...
    
    @classmethod
    def assign(
        cls,
        **kwargs: Union[
            Runnable[Dict[str, Any], Any],
            Callable[[Dict[str, Any]], Any],
            Mapping[
                str,
                Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]],
            ],
        ],
    ) -> "RunnableAssign":
        """Merge the Dict input with the output produced by the mapping argument.

        Args:
            **kwargs: Runnable, Callable or a Mapping from keys to Runnables
                or Callables.

        Returns:
            A Runnable that merges the Dict input with the output produced by the
            mapping argument.
        """
        return RunnableAssign(RunnableParallel(kwargs))

 

Dyamic Chain 예를 보자. 

from operator import itemgetter

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnablePassthrough, chain

contextualize_instructions = """Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text)."""
contextualize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_instructions),
        ("placeholder", "{chat_history}"),
        ("human", "{question}"),
    ]
)
contextualize_question = contextualize_prompt | llm | StrOutputParser()

qa_instructions = (
    """Answer the user question given the following context:\n\n{context}."""
)
qa_prompt = ChatPromptTemplate.from_messages(
    [("system", qa_instructions), ("human", "{question}")]
)


@chain
def contextualize_if_needed(input_: dict) -> Runnable:
    if input_.get("chat_history"):
        # NOTE: This is returning another Runnable, not an actual output.
        return contextualize_question
    else:
        return RunnablePassthrough() | itemgetter("question")


@chain
def fake_retriever(input_: dict) -> str:
    return "egypt's population in 2024 is about 111 million"


full_chain = (
    RunnablePassthrough.assign(question=contextualize_if_needed).assign(
        context=fake_retriever
    )
    | qa_prompt
    | llm
    | StrOutputParser()
)

full_chain.invoke(
    {
        "question": "what about egypt",
        "chat_history": [
            ("human", "what's the population of indonesia"),
            ("ai", "about 276 million"),
        ],
    }
)

// 결과
"According to the context provided, Egypt's population in 2024 is estimated to be about 111 million."

 

 

Custom Function을 Runnable로 사용하기  - @chain == RunnableLambda

chaining 시에 custom function은 RunnableLambda 를 사용하거나, @chain 데코레이터를  사용한다. 

from operator import itemgetter

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI


def length_function(text):
    return len(text)


def _multiple_length_function(text1, text2):
    return len(text1) * len(text2)


def multiple_length_function(_dict):
    return _multiple_length_function(_dict["text1"], _dict["text2"])


model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("what is {a} + {b}")

chain1 = prompt | model

chain = (
    {
        "a": itemgetter("foo") | RunnableLambda(length_function),
        "b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
        | RunnableLambda(multiple_length_function),
    }
    | prompt
    | model
)

chain.invoke({"foo": "bar", "bar": "gah"})

// 결과
AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')

 

@chain 데코레이터 사용하기 

from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain

prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


@chain
def custom_chain(text):
    prompt_val1 = prompt1.invoke({"topic": text})
    output1 = ChatOpenAI().invoke(prompt_val1)
    parsed_output1 = StrOutputParser().invoke(output1)
    chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
    return chain2.invoke({"joke": parsed_output1})


custom_chain.invoke("bears")

// 결과
'The subject of the joke is the bear and his girlfriend.'

 

chain 데코레이터 소스 코드를 보면, function을 RunnableLambda로 변환한다. 

def chain(
    func: Union[
        Callable[[Input], Output],
        Callable[[Input], Iterator[Output]],
        Callable[[Input], Coroutine[Any, Any, Output]],
        Callable[[Input], AsyncIterator[Output]],
    ],
) -> Runnable[Input, Output]:
    """Decorate a function to make it a Runnable.
    Sets the name of the Runnable to the name of the function.
    Any runnables called by the function will be traced as dependencies.

    Args:
        func: A callable.

    Returns:
        A Runnable.

    Example:

    .. code-block:: python

        from langchain_core.runnables import chain
        from langchain_core.prompts import PromptTemplate
        from langchain_openai import OpenAI

        @chain
        def my_func(fields):
            prompt = PromptTemplate("Hello, {name}!")
            llm = OpenAI()
            formatted = prompt.invoke(**fields)

            for chunk in llm.stream(formatted):
                yield chunk
    """
    return RunnableLambda(func)

 

또는 | 오프레이터를 통해 자동 RunnableLambda를 적용할 수 있다. 

prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")

model = ChatOpenAI()

chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])

chain_with_coerced_function.invoke({"topic": "bears"})

// 결과 
'Once '

 

<참조>

- LangChain parallel 공식 문서: https://python.langchain.com/v0.2/docs/how_to/parallel/

 

How to invoke runnables in parallel | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- LangChain Runnable 소스: https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/runnables/base.py

 

langchain/libs/core/langchain_core/runnables/base.py at master · langchain-ai/langchain

🦜🔗 Build context-aware reasoning applications. Contribute to langchain-ai/langchain development by creating an account on GitHub.

github.com

- LangChain function 공식 문서: https://python.langchain.com/v0.2/docs/how_to/functions/

 

How to run custom functions | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- LangChain LCEL how-to 공식문서: https://python.langchain.com/v0.2/docs/how_to/#langchain-expression-language-lcel

 

How-to guides | 🦜️🔗 LangChain

Here you’ll find answers to “How do I….?” types of questions.

python.langchain.com