[LCC-10] LangChain RunnableParallel, @chain 이해
RAG 구성시에 마지막 chain을 만들 때 retriever를 설정할 때 RunnableParallel을 사용한다.
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
# The prompt expects input with keys for "context" and "question"
prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI()
retrieval_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
// 아래 3가지의 사용방식은 동일한다.
// ref: https://python.langchain.com/v0.2/docs/how_to/parallel/
// {"context": retriever, "question": RunnablePassthrough()}
// RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
// RunnableParallel(context=retriever, question=runnablePassthrough())
retrieval_chain.invoke("where did harrison work?")
RunnablePassthrough 클래스는 RunnableSerializable을 상속받고 있다.
class RunnablePassthrough(RunnableSerializable[Other, Other]):
... 중략 ..
def invoke(
self, input: Other, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Other:
if self.func is not None:
call_func_with_variable_args(
self.func, input, ensure_config(config), **kwargs
)
return self._call_with_config(identity, input, config)
RunnablePassthrough 사용 예
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
)
runnable = RunnableParallel(
origin=RunnablePassthrough(),
modified=lambda x: x+1
)
runnable.invoke(1) # {'origin': 1, 'modified': 2}
def fake_llm(prompt: str) -> str: # Fake LLM for the example
return "completion"
chain = RunnableLambda(fake_llm) | {
'original': RunnablePassthrough(), # Original LLM output
'parsed': lambda text: text[::-1] # Parsing logic
}
chain.invoke('hello') # {'original': 'completion', 'parsed': 'noitelpmoc'}
여러 PromptTemplate을 병렬로 사용하기
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
map_chain.invoke({"topic": "bear"})
// 결과
{'joke': AIMessage(content="Why don't bears like fast food? Because they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_d9767fc5b9', 'finish_reason': 'stop', 'logprobs': None}, id='run-fe024170-c251-4b7a-bfd4-64a3737c67f2-0'),
'poem': AIMessage(content='In the quiet of the forest, the bear roams free\nMajestic and wild, a sight to see.', response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 15, 'total_tokens': 39}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-2707913e-a743-4101-b6ec-840df4568a76-0')}
RunnableParallel에 추가 dictionary 정보를 주고 싶을 경우
RunnableAssign을 사용한다.
from typing import Dict
from langchain_core.runnables.passthrough import (
RunnableAssign,
RunnableParallel,
)
from langchain_core.runnables.base import RunnableLambda
def add_ten(x: Dict[str, int]) -> Dict[str, int]:
return {"added": x["input"] + 10}
mapper = RunnableParallel(
{"add_step": RunnableLambda(add_ten),}
)
runnable_assign = RunnableAssign(mapper)
# Synchronous example
runnable_assign.invoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}
# Asynchronous example
await runnable_assign.ainvoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}
RunnablePassthrough.assign 에서도 사용한다.
class RunnablePassthrough(RunnableSerializable[Other, Other]):
... 중략 ...
@classmethod
def assign(
cls,
**kwargs: Union[
Runnable[Dict[str, Any], Any],
Callable[[Dict[str, Any]], Any],
Mapping[
str,
Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]],
],
],
) -> "RunnableAssign":
"""Merge the Dict input with the output produced by the mapping argument.
Args:
**kwargs: Runnable, Callable or a Mapping from keys to Runnables
or Callables.
Returns:
A Runnable that merges the Dict input with the output produced by the
mapping argument.
"""
return RunnableAssign(RunnableParallel(kwargs))
Dyamic Chain 예를 보자.
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnablePassthrough, chain
contextualize_instructions = """Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text)."""
contextualize_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_instructions),
("placeholder", "{chat_history}"),
("human", "{question}"),
]
)
contextualize_question = contextualize_prompt | llm | StrOutputParser()
qa_instructions = (
"""Answer the user question given the following context:\n\n{context}."""
)
qa_prompt = ChatPromptTemplate.from_messages(
[("system", qa_instructions), ("human", "{question}")]
)
@chain
def contextualize_if_needed(input_: dict) -> Runnable:
if input_.get("chat_history"):
# NOTE: This is returning another Runnable, not an actual output.
return contextualize_question
else:
return RunnablePassthrough() | itemgetter("question")
@chain
def fake_retriever(input_: dict) -> str:
return "egypt's population in 2024 is about 111 million"
full_chain = (
RunnablePassthrough.assign(question=contextualize_if_needed).assign(
context=fake_retriever
)
| qa_prompt
| llm
| StrOutputParser()
)
full_chain.invoke(
{
"question": "what about egypt",
"chat_history": [
("human", "what's the population of indonesia"),
("ai", "about 276 million"),
],
}
)
// 결과
"According to the context provided, Egypt's population in 2024 is estimated to be about 111 million."
Custom Function을 Runnable로 사용하기 - @chain == RunnableLambda
chaining 시에 custom function은 RunnableLambda 를 사용하거나, @chain 데코레이터를 사용한다.
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
def length_function(text):
return len(text)
def _multiple_length_function(text1, text2):
return len(text1) * len(text2)
def multiple_length_function(_dict):
return _multiple_length_function(_dict["text1"], _dict["text2"])
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("what is {a} + {b}")
chain1 = prompt | model
chain = (
{
"a": itemgetter("foo") | RunnableLambda(length_function),
"b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
| RunnableLambda(multiple_length_function),
}
| prompt
| model
)
chain.invoke({"foo": "bar", "bar": "gah"})
// 결과
AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')
@chain 데코레이터 사용하기
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain
prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")
@chain
def custom_chain(text):
prompt_val1 = prompt1.invoke({"topic": text})
output1 = ChatOpenAI().invoke(prompt_val1)
parsed_output1 = StrOutputParser().invoke(output1)
chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
return chain2.invoke({"joke": parsed_output1})
custom_chain.invoke("bears")
// 결과
'The subject of the joke is the bear and his girlfriend.'
chain 데코레이터 소스 코드를 보면, function을 RunnableLambda로 변환한다.
def chain(
func: Union[
Callable[[Input], Output],
Callable[[Input], Iterator[Output]],
Callable[[Input], Coroutine[Any, Any, Output]],
Callable[[Input], AsyncIterator[Output]],
],
) -> Runnable[Input, Output]:
"""Decorate a function to make it a Runnable.
Sets the name of the Runnable to the name of the function.
Any runnables called by the function will be traced as dependencies.
Args:
func: A callable.
Returns:
A Runnable.
Example:
.. code-block:: python
from langchain_core.runnables import chain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
@chain
def my_func(fields):
prompt = PromptTemplate("Hello, {name}!")
llm = OpenAI()
formatted = prompt.invoke(**fields)
for chunk in llm.stream(formatted):
yield chunk
"""
return RunnableLambda(func)
또는 | 오프레이터를 통해 자동 RunnableLambda를 적용할 수 있다.
prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")
model = ChatOpenAI()
chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])
chain_with_coerced_function.invoke({"topic": "bears"})
// 결과
'Once '
<참조>
- LangChain parallel 공식 문서: https://python.langchain.com/v0.2/docs/how_to/parallel/
- LangChain Runnable 소스: https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/runnables/base.py
- LangChain function 공식 문서: https://python.langchain.com/v0.2/docs/how_to/functions/
- LangChain LCEL how-to 공식문서: https://python.langchain.com/v0.2/docs/how_to/#langchain-expression-language-lcel