블로그 이미지
Peter Note
Web & LLM FullStacker, Application Architecter, KnowHow Dispenser and Bike Rider

Publication

Category

Recent Post

2024. 8. 25. 19:55 LLM FullStacker/Python

with get_openai_callback() as cb:는 Python의 컨텍스트 관리자(context manager)를 사용하여 get_openai_callback 함수가 반환하는 객체(cb)를 생성하고, 그 객체를 사용하는 블록을 정의하는 구문입니다. 이 구문을 이해하기 위해서는 Python의 컨텍스트 관리자가 어떻게 작동하는지와 get_openai_callback이 어떤 역할을 하는지를 아는 것이 중요합니다.

 

1. 컨텍스트 관리자 (Context Manager)

 

컨텍스트 관리자는 with 블록의 시작과 종료 시 특정 코드를 자동으로 실행하게 해줍니다. 일반적으로, 컨텍스트 관리자는 자원(resource)을 할당하고 해제하는 작업에 사용됩니다. 예를 들어, 파일을 열고 작업을 한 후 자동으로 파일을 닫는 데 사용할 수 있습니다.

 

__enter__(): with 블록이 시작될 때 호출됩니다. 이 메서드는 일반적으로 어떤 자원을 할당하거나 초기화합니다.

__exit__(): with 블록이 끝날 때 호출됩니다. 이 메서드는 자원을 해제하거나, 예외가 발생했을 때 이를 처리합니다.

 

2. get_openai_callback의 역할

 

get_openai_callback은 OpenAI API 호출과 관련된 메트릭을 수집하는 콜백 객체를 반환합니다. 이 콜백 객체는 컨텍스트 관리자에서 사용될 때 API 호출 동안의 토큰 사용량, 비용 등을 추적합니다.

 

3. with get_openai_callback() as cb:의 의미

 

get_openai_callback()은 컨텍스트 관리자 역할을 하는 객체를 반환합니다.

with 블록이 시작되면, cb 변수에 이 객체가 할당됩니다.

with 블록 내에서 OpenAI API 호출이 이루어지면, cb 객체는 API 호출 관련 데이터를 수집합니다.

with 블록이 종료되면, cb 객체는 수집한 데이터를 자동으로 정리하고, 필요한 경우 자원을 해제합니다.

 

예시 코드 분석

from langchain.llms import OpenAI
from langchain.callbacks import get_openai_callback

llm = OpenAI(model="text-davinci-003")

with get_openai_callback() as cb:
    response = llm("What is the capital of France?")
    print(response)
    print(f"Total Tokens: {cb.total_tokens}")
    print(f"Total Cost: {cb.total_cost}")

get_openai_callback(): 콜백 객체를 생성하여 반환합니다.

with ... as cb:: cb 변수에 콜백 객체를 할당하고, with 블록 내에서 이 객체를 사용합니다.

cb.total_tokens, cb.total_cost: with 블록이 끝난 후, API 호출 동안 사용된 총 토큰 수와 총 비용을 출력합니다.

 

이 구문을 사용함으로써 개발자는 OpenAI API 호출의 성능을 모니터링하고 리소스 사용량을 효율적으로 관리할 수 있습니다.

 

get_openai_callback

소스: langchain_community/callbacks/manager.py

from contextlib import contextmanager

@contextmanager
def get_openai_callback() -> Generator[OpenAICallbackHandler, None, None]:
    """Get the OpenAI callback handler in a context manager.
    which conveniently exposes token and cost information.

    Returns:
        OpenAICallbackHandler: The OpenAI callback handler.

    Example:
        >>> with get_openai_callback() as cb:
        ...     # Use the OpenAI callback handler
    """
    cb = OpenAICallbackHandler()
    openai_callback_var.set(cb)
    yield cb
    openai_callback_var.set(None)
posted by Peter Note
2024. 8. 14. 17:07 LLM FullStacker/Python

Pydantic은 Python에서 데이터 유효성 검사 및 설정 관리를 위한 라이브러리입니다. 주로 FastAPI와 같은 웹 프레임워크와 함께 사용되며, 데이터를 구조화하고 검증하는 데 유용합니다. BaseModel은 Pydantic의 핵심 클래스 중 하나로, 데이터 모델을 정의하는 데 사용됩니다.

 

Pydantic의 주요 기능

 

1. 유효성 검사 및 변환: 필드에 대해 타입을 지정하면, 입력 데이터가 자동으로 그 타입으로 변환되며, 유효성 검사가 수행됩니다.

2. 자동 완성 및 타입 힌팅 지원: IDE의 자동 완성과 타입 힌팅을 통해 개발 생산성을 높입니다.

3. 데이터 직렬화 및 역직렬화: 모델 인스턴스를 JSON으로 직렬화하거나 JSON으로부터 역직렬화할 수 있습니다.

4. 데이터 검증 오류 관리: 잘못된 데이터를 입력하면, Pydantic이 자동으로 유효성 검사 오류를 생성합니다.

 

BaseModel 사용 예시

 

다음은 PydanticBaseModel을 사용하여 간단한 사용자 데이터를 관리하는 예제입니다.

from pydantic import BaseModel, EmailStr, Field
from typing import Optional

class User(BaseModel):
    id: int
    name: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    age: Optional[int] = Field(None, ge=18)
    is_active: bool = True

# Example usage
user_data = {
    "id": 1,
    "name": "John Doe",
    "email": "johndoe@example.com",
    "age": 25,
}

user = User(**user_data)
print(user)
print(user.dict())

 

코드 설명

 

1. 필드 정의:

id: 정수형 필드.

name: 길이가 3에서 50 사이인 문자열 필드.

email: 이메일 형식의 문자열을 요구하는 필드. EmailStr 타입은 이메일 주소가 올바른 형식인지 검증합니다.

age: 선택적 필드로, 값이 주어지면 18 이상이어야 합니다.

is_active: 기본값이 True인 불리언 필드.

2. 필드 유효성 검사:

Field를 사용하여 각 필드에 대한 추가적인 제약 조건을 지정합니다.

3. 데이터 생성 및 출력:

user_data 딕셔너리를 통해 User 객체를 생성합니다. 생성된 객체를 출력하거나, .dict() 메서드를 사용하여 객체를 딕셔너리 형태로 변환할 수 있습니다.

 

이와 같이 Pydantic을 사용하면 데이터 모델을 간단하고 명확하게 정의할 수 있으며, 자동으로 타입 변환과 유효성 검사를 수행할 수 있습니다. 이를 통해 데이터 처리의 신뢰성과 안정성을 높일 수 있습니다.

 

 

 

BaseModel은 자동으로 __init__ 을 실행

Pydantic의 BaseModel을 사용하면 클래스 수준에서 필드를 정의할 수 있으며, 이러한 필드는 마치 __init__ 메서드에서 self.name과 같이 인스턴스 변수로 설정된 것처럼 동작합니다. Pydantic은 이러한 필드를 기반으로 자동으로 __init__ 메서드를 생성하고, 필드에 대한 타입 검사를 수행합니다.

 

이 방식은 일반적인 Python 클래스에서의 인스턴스 변수 설정과는 약간 다릅니다. 일반 Python 클래스에서는 인스턴스 변수를 __init__ 메서드 내에서 self를 통해 설정해야 하지만, Pydantic의 BaseModel을 사용하면 클래스 정의 시 필드의 타입과 기본값을 지정하여 더 간결하고 명확하게 모델을 정의할 수 있습니다.

 

예시 비교

 

일반 Python 클래스

class User:
    def __init__(self, id: int, name: str, email: str, age: int, is_active: bool = True):
        self.id = id
        self.name = name
        self.email = email
        self.age = age
        self.is_active = is_active

 

Pydantic BaseModel

from pydantic import BaseModel, EmailStr, Field
from typing import Optional

class User(BaseModel):
    id: int
    name: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    age: Optional[int] = Field(None, ge=18)
    is_active: bool = True

 

차이점 설명

 

일반 클래스에서는 __init__ 메서드 내에서 self를 사용하여 인스턴스 변수를 직접 설정합니다.

Pydantic BaseModel에서는 클래스 정의 시 필드를 직접 설정하고, Pydantic이 자동으로 __init__ 메서드를 생성하여 필드 초기화, 타입 검사, 유효성 검사를 수행합니다.

 

이렇게 Pydantic의 BaseModel을 사용하면 코드가 더 간결해지며, 데이터 유효성 검사가 자동으로 처리되므로 안전하고 유지보수하기 쉬운 코드를 작성할 수 있습니다.

posted by Peter Note
2024. 8. 13. 00:24 LLM FullStacker/LangChain

RunnableConfig는 체인생성후 Runnble의 필드값을 변경하거나, Runnable을 교체하라 수 있다. 

 

 

configurable_fields 메서드 이해

Runnable의 특정 필드값을 설정할 수 있도록 해준다. runnable의 .bind 메서드와 관련이 있다. 

  - RunnableSerializable 클래스의 configurable_fileds 메서드 (소스)

  - Model 생성시 configurable_fileds 사설정 -> Model 인스턴스 with_config 런타임 설정

class RunnableSerializable(Serializable, Runnable[Input, Output]):
    """Runnable that can be serialized to JSON."""
    
    def configurable_fields(
        self, **kwargs: AnyConfigurableField
    ) -> RunnableSerializable[Input, Output]:
        """Configure particular Runnable fields at runtime.

        Args:
            **kwargs: A dictionary of ConfigurableField instances to configure.

        Returns:
            A new Runnable with the fields configured.

        .. code-block:: python

            from langchain_core.runnables import ConfigurableField
            from langchain_openai import ChatOpenAI

            model = ChatOpenAI(max_tokens=20).configurable_fields(
                max_tokens=ConfigurableField(
                    id="output_token_number",
                    name="Max tokens in the output",
                    description="The maximum number of tokens in the output",
                )
            )

            # max_tokens = 20
            print(
                "max_tokens_20: ",
                model.invoke("tell me something about chess").content
            )

            # max_tokens = 200
            print("max_tokens_200: ", model.with_config(
                configurable={"output_token_number": 200}
                ).invoke("tell me something about chess").content
            )
        """
        from langchain_core.runnables.configurable import RunnableConfigurableFields

        for key in kwargs:
            if key not in self.__fields__:
                raise ValueError(
                    f"Configuration key {key} not found in {self}: "
                    f"available keys are {self.__fields__.keys()}"
                )

        return RunnableConfigurableFields(default=self, fields=kwargs)

 

Runnable 클래스의 with_config 메서드 

class Runnable(Generic[Input, Output], ABC):
    ... 중략 ...
    def with_config(
        self,
        config: Optional[RunnableConfig] = None,
        # Sadly Unpack is not well-supported by mypy so this will have to be untyped
        **kwargs: Any,
    ) -> Runnable[Input, Output]:
        """
        Bind config to a Runnable, returning a new Runnable.

        Args:
            config: The config to bind to the Runnable.
            kwargs: Additional keyword arguments to pass to the Runnable.

        Returns:
            A new Runnable with the config bound.
        """
        return RunnableBinding(
            bound=self,
            config=cast(
                RunnableConfig,
                {**(config or {}), **kwargs},
            ),  # type: ignore[misc]
            kwargs={},
        )

 

실제 예제 

  - Model 생성시 ConfigurableField 통해 설정

  - model 인스턴스 사용시 with_config 통해 설정

from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(temperature=0).configurable_fields(
    temperature=ConfigurableField(
        id="llm_temperature",
        name="LLM Temperature",
        description="The temperature of the LLM",
    )
)

//--- case-1
model.invoke("pick a random number")
// 결과
AIMessage(content='17', response_metadata={'token_usage': {'completion_tokens': 1, 'prompt_tokens': 11, 'total_tokens': 12}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-ba26a0da-0a69-4533-ab7f-21178a73d303-0')

//--- case-2
model.with_config(configurable={"llm_temperature": 0.9}).invoke("pick a random number")
// 결과 
AIMessage(content='12', response_metadata={'token_usage': {'completion_tokens': 1, 'prompt_tokens': 11, 'total_tokens': 12}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-ba8422ad-be77-4cb1-ac45-ad0aae74e3d9-0')

//--- case-3
prompt = PromptTemplate.from_template("Pick a random number above {x}")
chain = prompt | model
chain.invoke({"x": 0})
// 결과 
AIMessage(content='27', response_metadata={'token_usage': {'completion_tokens': 1, 'prompt_tokens': 14, 'total_tokens': 15}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-ecd4cadd-1b72-4f92-b9a0-15e08091f537-0')

//--- case-4
chain.with_config(configurable={"llm_temperature": 0.9}).invoke({"x": 0})
// 결과 
AIMessage(content='35', response_metadata={'token_usage': {'completion_tokens': 1, 'prompt_tokens': 14, 'total_tokens': 15}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-a916602b-3460-46d3-a4a8-7c926ec747c0-0')

 

 

 

configurable_alternatives 메서드 이해

Chain에 연결할 Runnable을 교체할 수 있게 한다. 

class RunnableSerializable(Serializable, Runnable[Input, Output]):
    ... 중략 ...
    def configurable_alternatives(
        self,
        which: ConfigurableField,
        *,
        default_key: str = "default",
        prefix_keys: bool = False,
        **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]],
    ) -> RunnableSerializable[Input, Output]:
        """Configure alternatives for Runnables that can be set at runtime.

        Args:
            which: The ConfigurableField instance that will be used to select the
                alternative.
            default_key: The default key to use if no alternative is selected.
                Defaults to "default".
            prefix_keys: Whether to prefix the keys with the ConfigurableField id.
                Defaults to False.
            **kwargs: A dictionary of keys to Runnable instances or callables that
                return Runnable instances.

        Returns:
            A new Runnable with the alternatives configured.

        .. code-block:: python

            from langchain_anthropic import ChatAnthropic
            from langchain_core.runnables.utils import ConfigurableField
            from langchain_openai import ChatOpenAI

            model = ChatAnthropic(
                model_name="claude-3-sonnet-20240229"
            ).configurable_alternatives(
                ConfigurableField(id="llm"),
                default_key="anthropic",
                openai=ChatOpenAI()
            )

            # uses the default model ChatAnthropic
            print(model.invoke("which organization created you?").content)

            # uses ChatOpenAI
            print(
                model.with_config(
                    configurable={"llm": "openai"}
                ).invoke("which organization created you?").content
            )
        """
        from langchain_core.runnables.configurable import (
            RunnableConfigurableAlternatives,
        )

        return RunnableConfigurableAlternatives(
            which=which,
            default=self,
            alternatives=kwargs,
            default_key=default_key,
            prefix_keys=prefix_keys,
        )

 

 

실제 예제

    - Anthropic 모델 생성시, OpenAI 모델을 alternative로 설정한다. 

    - 상황에 따라 OpenAI 모델을 사용한다. 

from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

llm = ChatAnthropic(
    model="claude-3-haiku-20240307", temperature=0
).configurable_alternatives(
    # This gives this field an id
    # When configuring the end runnable, we can then use this id to configure this field
    ConfigurableField(id="llm"),
    # This sets a default_key.
    # If we specify this key, the default LLM (ChatAnthropic initialized above) will be used
    default_key="anthropic",
    # This adds a new option, with name `openai` that is equal to `ChatOpenAI()`
    openai=ChatOpenAI(),
    # This adds a new option, with name `gpt4` that is equal to `ChatOpenAI(model="gpt-4")`
    gpt4=ChatOpenAI(model="gpt-4"),
    # You can add more configuration options here
)
prompt = PromptTemplate.from_template("Tell me a joke about {topic}")
chain = prompt | llm


//--- case-1 
# By default it will call Anthropic
chain.invoke({"topic": "bears"})
// 결과
AIMessage(content="Here's a bear joke for you:\n\nWhy don't bears wear socks? \nBecause they have bear feet!\n\nHow's that? I tried to come up with a simple, silly pun-based joke about bears. Puns and wordplay are a common way to create humorous bear jokes. Let me know if you'd like to hear another one!", response_metadata={'id': 'msg_018edUHh5fUbWdiimhrC3dZD', 'model': 'claude-3-haiku-20240307', 'stop_reason': 'end_turn', 'stop_sequence': None, 'usage': {'input_tokens': 13, 'output_tokens': 80}}, id='run-775bc58c-28d7-4e6b-a268-48fa6661f02f-0')

//--- case-2
# We can use `.with_config(configurable={"llm": "openai"})` to specify an llm to use
chain.with_config(configurable={"llm": "openai"}).invoke({"topic": "bears"})
// 결과
AIMessage(content="Why don't bears like fast food?\n\nBecause they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-7bdaa992-19c9-4f0d-9a0c-1f326bc992d4-0')

//--- case-3
# If we use the `default_key` then it uses the default
chain.with_config(configurable={"llm": "anthropic"}).invoke({"topic": "bears"})
// 결과
AIMessage(content="Here's a bear joke for you:\n\nWhy don't bears wear socks? \nBecause they have bear feet!\n\nHow's that? I tried to come up with a simple, silly pun-based joke about bears. Puns and wordplay are a common way to create humorous bear jokes. Let me know if you'd like to hear another one!", response_metadata={'id': 'msg_01BZvbmnEPGBtcxRWETCHkct', 'model': 'claude-3-haiku-20240307', 'stop_reason': 'end_turn', 'stop_sequence': None, 'usage': {'input_tokens': 13, 'output_tokens': 80}}, id='run-59b6ee44-a1cd-41b8-a026-28ee67cdd718-0')

 

 

 

bind 메서드 이해 

Runnable 클래스의 bind 메서드

  - RunnableBinding 클래스 인스턴스를 리턴한다. 

class Runnable(Generic[Input, Output], ABC):
   ... 중략 ...
   def bind(self, **kwargs: Any) -> Runnable[Input, Output]:
        """
        Bind arguments to a Runnable, returning a new Runnable.

        Useful when a Runnable in a chain requires an argument that is not
        in the output of the previous Runnable or included in the user input.

        Args:
            kwargs: The arguments to bind to the Runnable.

        Returns:
            A new Runnable with the arguments bound.

        Example:

        .. code-block:: python

            from langchain_community.chat_models import ChatOllama
            from langchain_core.output_parsers import StrOutputParser

            llm = ChatOllama(model='llama2')

            # Without bind.
            chain = (
                llm
                | StrOutputParser()
            )

            chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
            # Output is 'One two three four five.'

            # With bind.
            chain = (
                llm.bind(stop=["three"])
                | StrOutputParser()
            )

            chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
            # Output is 'One two'

        """
        return RunnableBinding(bound=self, kwargs=kwargs, config={})

 

RunnableBinding 클래스

    - bind: Bind kwargs to pass to the underlying Runnable when running it.
    - with_config: Bind config to pass to the underlying Runnable when running it.
    - with_listeners:  Bind lifecycle listeners to the underlying Runnable.
    - with_types: Override the input and output types of the underlying Runnable.
    - with_retry: Bind a retry policy to the underlying Runnable.
    - with_fallbacks: Bind a fallback policy to the underlying Runnable.

class RunnableBinding(RunnableBindingBase[Input, Output]):
    """Wrap a Runnable with additional functionality.

    A RunnableBinding can be thought of as a "runnable decorator" that
    preserves the essential features of Runnable; i.e., batching, streaming,
    and async support, while adding additional functionality.

    Any class that inherits from Runnable can be bound to a `RunnableBinding`.
    Runnables expose a standard set of methods for creating `RunnableBindings`
    or sub-classes of `RunnableBindings` (e.g., `RunnableRetry`,
    `RunnableWithFallbacks`) that add additional functionality.

    These methods include:
    - `bind`: Bind kwargs to pass to the underlying Runnable when running it.
    - `with_config`: Bind config to pass to the underlying Runnable when running it.
    - `with_listeners`:  Bind lifecycle listeners to the underlying Runnable.
    - `with_types`: Override the input and output types of the underlying Runnable.
    - `with_retry`: Bind a retry policy to the underlying Runnable.
    - `with_fallbacks`: Bind a fallback policy to the underlying Runnable.

    Example:

    `bind`: Bind kwargs to pass to the underlying Runnable when running it.

        .. code-block:: python

            # Create a Runnable binding that invokes the ChatModel with the
            # additional kwarg `stop=['-']` when running it.
            from langchain_community.chat_models import ChatOpenAI
            model = ChatOpenAI()
            model.invoke('Say "Parrot-MAGIC"', stop=['-']) # Should return `Parrot`
            # Using it the easy way via `bind` method which returns a new
            # RunnableBinding
            runnable_binding = model.bind(stop=['-'])
            runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`

        Can also be done by instantiating a RunnableBinding directly (not recommended):

        .. code-block:: python

            from langchain_core.runnables import RunnableBinding
            runnable_binding = RunnableBinding(
                bound=model,
                kwargs={'stop': ['-']} # <-- Note the additional kwargs
            )
            runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`
    """

 

 

<참조>

- LangChain Configurable: https://python.langchain.com/v0.2/docs/how_to/configure/

 

How to configure runtime chain internals | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- LangChain KR: https://wikidocs.net/235704

 

06. configurable_fields, configurable_alternatives

.custom { background-color: #008d8d; color: white; padding: 0.25em 0.5…

wikidocs.net

 

posted by Peter Note
2024. 8. 13. 00:00 LLM FullStacker/LangChain

RAG 구성시에 마지막 chain을 만들 때 retriever를 설정할 때 RunnableParallel을 사용한다. 

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

# The prompt expects input with keys for "context" and "question"
prompt = ChatPromptTemplate.from_template(template)

model = ChatOpenAI()

retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

// 아래 3가지의 사용방식은 동일한다. 
// ref: https://python.langchain.com/v0.2/docs/how_to/parallel/
// {"context": retriever, "question": RunnablePassthrough()}
// RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
// RunnableParallel(context=retriever, question=runnablePassthrough())

retrieval_chain.invoke("where did harrison work?")

 

RunnablePassthrough 클래스는 RunnableSerializable을 상속받고 있다. 

class RunnablePassthrough(RunnableSerializable[Other, Other]):
    ... 중략 ..
    
    def invoke(
        self, input: Other, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> Other:
        if self.func is not None:
            call_func_with_variable_args(
                self.func, input, ensure_config(config), **kwargs
            )
        return self._call_with_config(identity, input, config)

 

RunnablePassthrough 사용 예

from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

runnable = RunnableParallel(
    origin=RunnablePassthrough(),
    modified=lambda x: x+1
)

runnable.invoke(1) # {'origin': 1, 'modified': 2}


def fake_llm(prompt: str) -> str: # Fake LLM for the example
    return "completion"

chain = RunnableLambda(fake_llm) | {
    'original': RunnablePassthrough(), # Original LLM output
    'parsed': lambda text: text[::-1] # Parsing logic
}

chain.invoke('hello') # {'original': 'completion', 'parsed': 'noitelpmoc'}

 

 

여러 PromptTemplate을 병렬로 사용하기

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
    ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

map_chain.invoke({"topic": "bear"})

// 결과
{'joke': AIMessage(content="Why don't bears like fast food? Because they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_d9767fc5b9', 'finish_reason': 'stop', 'logprobs': None}, id='run-fe024170-c251-4b7a-bfd4-64a3737c67f2-0'),
 'poem': AIMessage(content='In the quiet of the forest, the bear roams free\nMajestic and wild, a sight to see.', response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 15, 'total_tokens': 39}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-2707913e-a743-4101-b6ec-840df4568a76-0')}

 

 

RunnableParallel에 추가 dictionary 정보를 주고 싶을 경우 

RunnableAssign을 사용한다. 

from typing import Dict
from langchain_core.runnables.passthrough import (
    RunnableAssign,
    RunnableParallel,
)
from langchain_core.runnables.base import RunnableLambda

def add_ten(x: Dict[str, int]) -> Dict[str, int]:
    return {"added": x["input"] + 10}

mapper = RunnableParallel(
    {"add_step": RunnableLambda(add_ten),}
)

runnable_assign = RunnableAssign(mapper)

# Synchronous example
runnable_assign.invoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}

# Asynchronous example
await runnable_assign.ainvoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}

 

RunnablePassthrough.assign 에서도 사용한다. 

  class RunnablePassthrough(RunnableSerializable[Other, Other]):
    ... 중략 ...
    
    @classmethod
    def assign(
        cls,
        **kwargs: Union[
            Runnable[Dict[str, Any], Any],
            Callable[[Dict[str, Any]], Any],
            Mapping[
                str,
                Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]],
            ],
        ],
    ) -> "RunnableAssign":
        """Merge the Dict input with the output produced by the mapping argument.

        Args:
            **kwargs: Runnable, Callable or a Mapping from keys to Runnables
                or Callables.

        Returns:
            A Runnable that merges the Dict input with the output produced by the
            mapping argument.
        """
        return RunnableAssign(RunnableParallel(kwargs))

 

Dyamic Chain 예를 보자. 

from operator import itemgetter

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnablePassthrough, chain

contextualize_instructions = """Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text)."""
contextualize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_instructions),
        ("placeholder", "{chat_history}"),
        ("human", "{question}"),
    ]
)
contextualize_question = contextualize_prompt | llm | StrOutputParser()

qa_instructions = (
    """Answer the user question given the following context:\n\n{context}."""
)
qa_prompt = ChatPromptTemplate.from_messages(
    [("system", qa_instructions), ("human", "{question}")]
)


@chain
def contextualize_if_needed(input_: dict) -> Runnable:
    if input_.get("chat_history"):
        # NOTE: This is returning another Runnable, not an actual output.
        return contextualize_question
    else:
        return RunnablePassthrough() | itemgetter("question")


@chain
def fake_retriever(input_: dict) -> str:
    return "egypt's population in 2024 is about 111 million"


full_chain = (
    RunnablePassthrough.assign(question=contextualize_if_needed).assign(
        context=fake_retriever
    )
    | qa_prompt
    | llm
    | StrOutputParser()
)

full_chain.invoke(
    {
        "question": "what about egypt",
        "chat_history": [
            ("human", "what's the population of indonesia"),
            ("ai", "about 276 million"),
        ],
    }
)

// 결과
"According to the context provided, Egypt's population in 2024 is estimated to be about 111 million."

 

 

Custom Function을 Runnable로 사용하기  - @chain == RunnableLambda

chaining 시에 custom function은 RunnableLambda 를 사용하거나, @chain 데코레이터를  사용한다. 

from operator import itemgetter

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI


def length_function(text):
    return len(text)


def _multiple_length_function(text1, text2):
    return len(text1) * len(text2)


def multiple_length_function(_dict):
    return _multiple_length_function(_dict["text1"], _dict["text2"])


model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("what is {a} + {b}")

chain1 = prompt | model

chain = (
    {
        "a": itemgetter("foo") | RunnableLambda(length_function),
        "b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
        | RunnableLambda(multiple_length_function),
    }
    | prompt
    | model
)

chain.invoke({"foo": "bar", "bar": "gah"})

// 결과
AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')

 

@chain 데코레이터 사용하기 

from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain

prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


@chain
def custom_chain(text):
    prompt_val1 = prompt1.invoke({"topic": text})
    output1 = ChatOpenAI().invoke(prompt_val1)
    parsed_output1 = StrOutputParser().invoke(output1)
    chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
    return chain2.invoke({"joke": parsed_output1})


custom_chain.invoke("bears")

// 결과
'The subject of the joke is the bear and his girlfriend.'

 

chain 데코레이터 소스 코드를 보면, function을 RunnableLambda로 변환한다. 

def chain(
    func: Union[
        Callable[[Input], Output],
        Callable[[Input], Iterator[Output]],
        Callable[[Input], Coroutine[Any, Any, Output]],
        Callable[[Input], AsyncIterator[Output]],
    ],
) -> Runnable[Input, Output]:
    """Decorate a function to make it a Runnable.
    Sets the name of the Runnable to the name of the function.
    Any runnables called by the function will be traced as dependencies.

    Args:
        func: A callable.

    Returns:
        A Runnable.

    Example:

    .. code-block:: python

        from langchain_core.runnables import chain
        from langchain_core.prompts import PromptTemplate
        from langchain_openai import OpenAI

        @chain
        def my_func(fields):
            prompt = PromptTemplate("Hello, {name}!")
            llm = OpenAI()
            formatted = prompt.invoke(**fields)

            for chunk in llm.stream(formatted):
                yield chunk
    """
    return RunnableLambda(func)

 

또는 | 오프레이터를 통해 자동 RunnableLambda를 적용할 수 있다. 

prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")

model = ChatOpenAI()

chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])

chain_with_coerced_function.invoke({"topic": "bears"})

// 결과 
'Once '

 

<참조>

- LangChain parallel 공식 문서: https://python.langchain.com/v0.2/docs/how_to/parallel/

 

How to invoke runnables in parallel | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- LangChain Runnable 소스: https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/runnables/base.py

 

langchain/libs/core/langchain_core/runnables/base.py at master · langchain-ai/langchain

🦜🔗 Build context-aware reasoning applications. Contribute to langchain-ai/langchain development by creating an account on GitHub.

github.com

- LangChain function 공식 문서: https://python.langchain.com/v0.2/docs/how_to/functions/

 

How to run custom functions | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- LangChain LCEL how-to 공식문서: https://python.langchain.com/v0.2/docs/how_to/#langchain-expression-language-lcel

 

How-to guides | 🦜️🔗 LangChain

Here you’ll find answers to “How do I….?” types of questions.

python.langchain.com

 

 

posted by Peter Note
2024. 8. 12. 22:45 LLM FullStacker/LangChain

LCEL로 연결되는 인스턴스는 Runnable을 상속받아야 한다. 공식 문서의 Runnable을 살펴본다. 

출처 DeepLearning.ai

 

 

 

Custom Chain 생성을 하려면 Runnable을 상속 구현

  - invoke: abstractmethod로 반드시 구현해야 한다. call the chain on an input (ainvoke)

  - stream: stream back chunks of the response (astream)

  - batch: call the chain on a list of inputs (abatch)

 

invoke의 Input, Output 타입은 Generic으로 구현체의 타입을 따른다. 현재 Runnable의 invoke로 호출된 Output은 다음 Runnable의 Input 값이 된다. 

class Runnable(Generic[Input, Output], ABC):
    @abstractmethod
    def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
        """Transform a single input into an output. Override to implement.

        Args:
            input: The input to the Runnable.
            config: A config to use when invoking the Runnable.
               The config supports standard keys like 'tags', 'metadata' for tracing
               purposes, 'max_concurrency' for controlling how much work to do
               in parallel, and other keys. Please refer to the RunnableConfig
               for more details.

        Returns:
            The output of the Runnable.
        """

 

Generic[Input, Output] 의 타입표이고, LangChain의 컴포넌트들이 Runable을 상속받고 있다.

  - Prompt, ChatModel, LLM, OutputParser, Retriever, Tool

 

| pipe operator 이해

Runnable 체인의 연결은 pipe operatore( | ) 또는 Runnable의 .pipe() 메서드를 사용한다. 이는 마치 RxJS의 pipe 연결과 유사하다. 

 

파이썬은 클래스 인스턴스를 + 기호로 쓸 수 있다. 이때 클래스의 __add__ 를 구현한다. 마찬가지로 | 기호를 사용하기 위해서 __or____ror__ 을 구현하고 있다. 

    // a | b -> a.__or__(b)
    def __or__(
        self,
        other: Union[
            Runnable[Any, Other],
            Callable[[Any], Other],
            Callable[[Iterator[Any]], Iterator[Other]],
            Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
        ],
    ) -> RunnableSerializable[Input, Other]:
        """Compose this Runnable with another object to create a RunnableSequence."""
        return RunnableSequence(self, coerce_to_runnable(other))
        
    // a | b (if not working) -> b.__ror_(a)
    def __ror__(
        self,
        other: Union[
            Runnable[Other, Any],
            Callable[[Other], Any],
            Callable[[Iterator[Other]], Iterator[Any]],
            Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
        ],
    ) -> RunnableSerializable[Other, Output]:
        """Compose this Runnable with another object to create a RunnableSequence."""
        return RunnableSequence(coerce_to_runnable(other), self)

    // 끝에 dictionary 타입을 Runnable로 만들어 준다. 
    def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
    """Coerce a Runnable-like object into a Runnable.

    Args:
        thing: A Runnable-like object.

    Returns:
        A Runnable.

    Raises:
        TypeError: If the object is not Runnable-like.
    """
    if isinstance(thing, Runnable):
        return thing
    elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
        return RunnableGenerator(thing)
    elif callable(thing):
        return RunnableLambda(cast(Callable[[Input], Output], thing))
    elif isinstance(thing, dict):
        return cast(Runnable[Input, Output], RunnableParallel(thing))
    else:
        raise TypeError(
            f"Expected a Runnable, callable or dict."
            f"Instead got an unsupported type: {type(thing)}"
        )

 

 

이 코드에서 정의된 __or__ 메서드는 Python의 특수 메서드 중 하나로, 비트 연산자 | 를 오버로딩하여 객체 간의 조합을 가능하게 합니다. 이 메서드는 RunnableSerializable 클래스에 정의되어 있으며, 이 클래스의 인스턴스를 다른 객체와 조합하여 새로운 RunnableSequence를 생성합니다.

 

__or__ 메서드의 의미와 역할

1. __or__ 메서드 오버로딩

  • 이 메서드는 | 연산자를 사용했을 때 호출됩니다. 예를 들어, a | b와 같이 사용할 때, a.__or__(b)가 호출됩니다.
  • self는 __or__ 메서드가 호출되는 객체(RunnableSerializable의 인스턴스)를 가리킵니다.
  • other는 | 연산자의 오른쪽에 있는 객체를 가리킵니다.

2. 매개변수 other의 타입

other는 다음 중 하나일 수 있습니다:

  • Runnable[Any, Other]: 제네릭 Runnable 객체로, 임의의 입력(Any)을 받아서 Other 타입의 출력을 반환하는 실행 가능한 객체.
  • Callable[[Any], Other]: 함수 또는 람다와 같이, 임의의 입력을 받아 Other 타입의 출력을 반환하는 호출 가능한 객체.
  • Callable[[Iterator[Any]], Iterator[Other]]: 이터레이터를 받아 이터레이터를 반환하는 호출 가능한 객체.
  • Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]]: Runnable이나 Callable 또는 다른 값으로 구성된 맵핑 객체(예: 딕셔너리).

3. 리턴 타입

  • __or__ 메서드는 RunnableSerializable[Input, Other] 타입의 객체를 반환합니다. 이 반환 객체는 RunnableSequence로, 두 개의 Runnable 객체를 순차적으로 연결한 것입니다.

4. 코드 동작

  • RunnableSequence(self, coerce_to_runnable(other))를 반환하여 두 객체를 조합합니다. 여기서 self는 현재 객체이고, other는 조합할 다른 객체입니다.
  • coerce_to_runnable(other)는 other를 Runnable 객체로 변환하는 함수입니다. 만약 other가 Runnable이 아니더라도, 이 함수가 이를 적절한 Runnable 객체로 변환합니다.

 

__ror__ 메서드의 의미와 역할

1. __ror__ 메서드 오버로딩

  • __ror__는 | 연산자가 왼쪽 객체에서 작동하지 않을 때 오른쪽 객체에 의해 호출됩니다. 즉, a | b에서 a가 __or__ 메서드를 정의하지 않았거나 NotImplemented를 반환하면, b.__ror__(a)가 호출됩니다.
  • self는 이 메서드가 정의된 클래스의 인스턴스(즉, RunnableSerializable)를 가리킵니다.
  • other는 | 연산자의 왼쪽에 있는 객체를 가리킵니다.

2. 매개변수 other의 타입

  • other는 다음 중 하나일 수 있습니다:
    • Runnable[Other, Any]: Other 타입의 입력을 받아 Any 타입의 출력을 반환하는 실행 가능한 객체.
    • Callable[[Other], Any]: Other 타입의 입력을 받아 Any 타입의 출력을 반환하는 호출 가능한 객체.
    • Callable[[Iterator[Other]], Iterator[Any]]: Other 타입의 이터레이터를 받아 Any 타입의 이터레이터를 반환하는 호출 가능한 객체.
    • Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]]: Runnable이나 Callable 또는 다른 값으로 구성된 맵핑 객체(예: 딕셔너리).

3. 리턴 타입

  • 이 메서드는 RunnableSerializable[Other, Output] 타입의 객체를 반환합니다. 반환된 객체는 RunnableSequence로, 이는 두 개의 Runnable 객체를 순차적으로 연결한 것입니다.

4. 코드 동작

  • RunnableSequence(coerce_to_runnable(other), self)를 반환하여, other 객체와 self 객체를 결합한 새로운 RunnableSequence를 생성합니다.
  • coerce_to_runnable(other)는 other 객체가 Runnable이 아니더라도 이를 Runnable 객체로 변환하는 함수입니다.

이 코드의 동작은 other | self 연산이 수행될 때, other를 Runnable로 변환하고, 이를 self와 결합하여 순차적인 실행 체인을 만드는 것입니다.

 

pipe 메서드의 의미와 역할

Runnable 의 pipe 메서드 구현의 반환값이 __or__ 의 반환값이 동일하다.

    def pipe(
        self,
        *others: Union[Runnable[Any, Other], Callable[[Any], Other]],
        name: Optional[str] = None,
    ) -> RunnableSerializable[Input, Other]:
        """Compose this Runnable with Runnable-like objects to make a RunnableSequence.

        Equivalent to `RunnableSequence(self, *others)` or `self | others[0] | ...`

        Example:
            .. code-block:: python

                from langchain_core.runnables import RunnableLambda

                def add_one(x: int) -> int:
                    return x + 1

                def mul_two(x: int) -> int:
                    return x * 2

                runnable_1 = RunnableLambda(add_one)
                runnable_2 = RunnableLambda(mul_two)
                sequence = runnable_1.pipe(runnable_2)
                # Or equivalently:
                # sequence = runnable_1 | runnable_2
                # sequence = RunnableSequence(first=runnable_1, last=runnable_2)
                sequence.invoke(1)
                await sequence.ainvoke(1)
                # -> 4

                sequence.batch([1, 2, 3])
                await sequence.abatch([1, 2, 3])
                # -> [4, 6, 8]
        """
        return RunnableSequence(self, *others, name=name)

1. pipe 메서드의 역할

  • pipe 메서드는 현재 객체와 다른 Runnable 또는 Callable 객체들을 연결하여 하나의 실행 체인을 만듭니다.
  • 이 체인은 여러 단계를 거쳐 데이터를 처리하며, 각 단계는 이전 단계의 출력을 받아 다음 단계의 입력으로 사용합니다.

2. 매개변수

  • self: pipe 메서드가 호출되는 객체(즉, RunnableSerializable 클래스의 인스턴스)를 가리킵니다.
  • *others: 가변 인자 형태로, 여러 개의 Runnable[Any, Other] 또는 Callable[[Any], Other] 객체들을 받아들입니다. 이 객체들은 순차적으로 연결되어 실행됩니다.
    • Runnable[Any, Other]: 임의의 입력(Any)을 받아 Other 타입의 출력을 생성하는 실행 가능한 객체.
    • Callable[[Any], Other]: 함수를 포함하여 입력을 받아 출력을 반환하는 호출 가능한 객체.
  • name (선택적): 실행 체인의 이름을 지정하는 선택적인 매개변수입니다. 이 이름은 디버깅 또는 로깅 목적으로 사용할 수 있습니다.

3. 리턴 타입

  • RunnableSerializable[Input, Other]: 여러 단계를 연결한 새로운 RunnableSerializable 객체를 반환합니다. 이 객체는 입력(Input)을 받아 최종 출력(Other)을 생성하는 실행 체인입니다.

4. 코드 동작

  • 이 메서드는 현재 객체(self)와 others로 전달된 모든 Runnable 또는 Callable 객체들을 연결하여 하나의 RunnableSequence 객체를 생성합니다.
  • RunnableSequence는 각 단계의 출력을 다음 단계의 입력으로 사용하여 최종 결과를 생성합니다.

사용예

from langchain_core.runnables import RunnableParallel

composed_chain_with_pipe = (
    RunnableParallel({"joke": chain})
    .pipe(analysis_prompt)
    .pipe(model)
    .pipe(StrOutputParser())
)

composed_chain_with_pipe.invoke({"topic": "battlestar galactica"})

// 결과
"I cannot reproduce any copyrighted material verbatim, but I can try to analyze the humor in the joke you provided without quoting it directly.\n\nThe joke plays on the idea that the Cylon raiders, who are the antagonists in the Battlestar Galactica universe, failed to locate the human survivors after attacking their home planets (the Twelve Colonies) due to using an outdated and poorly performing operating system (Windows Vista) for their targeting systems.\n\nThe humor stems from the juxtaposition of a futuristic science fiction setting with a relatable real-world frustration – the use of buggy, slow, or unreliable software or technology. It pokes fun at the perceived inadequacies of Windows Vista, which was widely criticized for its performance issues and other problems when it was released.\n\nBy attributing the Cylons' failure to locate the humans to their use of Vista, the joke creates an amusing and unexpected connection between a fictional advanced race of robots and a familiar technological annoyance experienced by many people in the real world.\n\nOverall, the joke relies on incongruity and relatability to generate humor, but without reproducing any copyrighted material directly."

// 또는 일렇게 사용도 가능하다. 
composed_chain_with_pipe = RunnableParallel({"joke": chain}).pipe(
    analysis_prompt, model, StrOutputParser()
)

 

파이프 예제 및 Runnable 구현체들

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

// model runnable
model = ChatOpenAI(model="gpt-4o-mini")
// prompt runnable
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
// output parser runnable
chain = prompt | model | StrOutputParser()

chain.invoke({"topic": "bears"})

// 결과
"Here's a bear joke for you:\n\nWhy did the bear dissolve in water?\nBecause it was a polar bear!"

 

Runnable 구현체들은 langchain_core.runnables 폴더의  base.py에 구현되어 있다.

  - RunnableSerializable : Serialize 는 보통 파일, 네트워크 전송을 가능하게 한다. 여기서는 json 으로 serialization이 가능하다. 

  - RunnableSequence : Runnable을 순차 처리한다. 

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# Or equivalently:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)

// 결과 
4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])

// 결과
[4, 6, 8]

 

  - RunnableParallel  : 예 소스 Runnable을 병렬 처리한다. 

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

def mul_three(x: int) -> int:
    return x * 3

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)

sequence = runnable_1 | {  # this dict is coerced to a RunnableParallel
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
#     {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
#     mul_two=runnable_2,
#     mul_three=runnable_3,
# )

sequence.invoke(1)
await sequence.ainvoke(1)

// 결과
{'mul_two': 4, 'mul_three': 6}


sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])

// 결과
[{'mul_two': 4, 'mul_three': 6},
 {'mul_two': 6, 'mul_three': 9},
 {'mul_two': 8, 'mul_three': 12}]

 

LLM 호출 RunnableParallel 예제

from langchain_core.output_parsers import StrOutputParser

analysis_prompt = ChatPromptTemplate.from_template("is this a funny joke? {joke}")
composed_chain = {"joke": chain} | analysis_prompt | model | StrOutputParser()

// RunnableParallel 로 전환된다. 
composed_chain.invoke({"topic": "bears"})

// 결과 
'Haha, that\'s a clever play on words! Using "polar" to imply the bear dissolved or became polar/polarized when put in water. Not the most hilarious joke ever, but it has a cute, groan-worthy pun that makes it mildly amusing. I appreciate a good pun or wordplay joke.'

 

chain 의 output 을 lambda 의 input으로 받아 dictionary 를 만들어 체인을 구성할 수도 있다. 

composed_chain_with_lambda = (
    chain
    | (lambda input: {"joke": input})
    | analysis_prompt
    | model
    | StrOutputParser()
)

composed_chain_with_lambda.invoke({"topic": "beets"})

// 결과
"Haha, that's a cute and punny joke! I like how it plays on the idea of beets blushing or turning red like someone blushing. Food puns can be quite amusing. While not a total knee-slapper, it's a light-hearted, groan-worthy dad joke that would make me chuckle and shake my head. Simple vegetable humor!"

 

  - RunnableLambda  

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)

runnable.invoke(1) # returns 2
runnable.batch([1, 2, 3]) # returns [2, 3, 4]

# Async is supported by default by delegating to the sync implementation
await runnable.ainvoke(1) # returns 2
await runnable.abatch([1, 2, 3]) # returns [2, 3, 4]


# Alternatively, can provide both synd and sync implementations
async def add_one_async(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one, afunc=add_one_async)
runnable.invoke(1) # Uses add_one
await runnable.ainvoke(1) # Uses add_one_async

 

  - RunnableGenerator

from typing import Any, AsyncIterator, Iterator

from langchain_core.runnables import RunnableGenerator


def gen(input: Iterator[Any]) -> Iterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token


runnable = RunnableGenerator(gen)
runnable.invoke(None)  # "Have a nice day"
list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]


# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token

runnable = RunnableGenerator(agen)
await runnable.ainvoke(None)  # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]

  - 그외 RunnableEachBase, RunableEach, RunnableBindingBase, RunnableBinding

 

모든 Runnable은 input 과 output schema를 노출한다. 

   - input_schema: input pydantic model

   - output_schema: output pydantic model

 

 

예제는 주로 invoke, ainvoke를 한번에 output을 받는 것이고, LLM을 통해 응답을 받을때는 UX개선을 위하 stream으로 받아 처리한다. Runnable stream 처리에 대한 다양한 접근법은 공식문서를 참조한다. 

 

 

 

Runnable 개념 정리

LangChain Runnable과 LangChain Expression Language (LCEL)은 LLM(대형 언어 모델)의 기능을 활용하여 견고한 프로덕션 수준의 프로그램을 만들 수 있는 강력한 도구입니다. 다음은 이들의 동작 방식과 중요성에 대한 개요입니다.

LangChain Runnable

LangChain Runnable은 실행할 수 있는 작업이나 기능을 캡슐화한 개념으로, 일반적으로 일련의 계산 단계를 나타냅니다. Runnable은 LangChain에서 복잡한 워크플로우 또는 체인을 구성할 수 있는 기본 구성 요소입니다. 각 Runnable은 유연하게 설계되어 동기, 비동기, 배치, 스트리밍 작업을 처리할 수 있습니다.

LangChain Expression Language (LCEL)

LangChain Expression Language (LCEL)은 이러한 워크플로우 또는 프로그램을 구성할 수 있는 선언적 언어입니다. LCEL을 사용하면 여러 Runnable을 명확하고 구조적인 방식으로 결합할 수 있습니다. LCEL을 통해 복잡한 작업 체인을 정의하고 여러 Runnable을 하나의 프로그램으로 결합할 수 있습니다.

LCEL과 Runnables의 주요 기능

  1. 동기 작업: 전통적인 차단 작업으로, 각 단계가 완료된 후 다음 단계로 이동합니다.
  2. 비동기 작업: 차단되지 않는 작업으로, 특정 작업이 완료되기를 기다리는 동안 프로그램이 다른 작업을 처리할 수 있습니다. 이는 서버 환경에서 여러 요청을 동시에 처리할 때 특히 중요합니다.
  3. 배치 처리: 여러 입력을 동시에 처리할 수 있는 기능입니다. 대량의 데이터를 처리하거나 여러 사용자의 쿼리를 동시에 처리할 때 유용합니다.
  4. 스트리밍: 중간 결과가 생성됨에 따라 이를 스트리밍할 수 있는 기능으로, 사용자 경험을 개선할 수 있습니다. 예를 들어, LLM으로 텍스트를 생성할 때 전체 텍스트가 생성되기 전에 부분적인 응답을 사용자에게 스트리밍할 수 있습니다.

사용 예시

  • 배치 처리: LLM을 통해 고객 쿼리 데이터를 분류해야 하는 시나리오를 상상해 보십시오. LCEL과 Runnables을 사용하여 이 쿼리들을 배치 처리하고 병렬로 처리할 수 있어 작업 속도가 크게 향상됩니다.
  • 비동기 작업: 여러 클라이언트 요청을 처리하는 웹 서버에서 LCEL을 사용하여 이러한 요청을 비동기로 관리함으로써, 서버가 여러 트래픽을 동시에 처리할 수 있게 합니다.
  • 스트리밍 결과: 실시간으로 사용자와 상호작용하는 챗봇의 경우, LCEL을 사용하여 부분적으로 생성된 텍스트로 바로 응답을 시작함으로써 더 인터랙티브하고 반응성이 좋은 사용자 경험을 제공할 수 있습니다.

프로덕션 수준 프로그램에서의 LCEL의 역할

LCEL의 설계는 LLM의 기능을 활용하는 복잡한 워크플로우를 보다 쉽게 구축하고 관리할 수 있도록 하는 데 중점을 둡니다. 이는 다양한 작업 모드(동기, 비동기, 배치, 스트리밍)를 처리하는 데 있어 복잡성을 추상화하여, 개발자가 워크플로우의 논리 정의에 집중할 수 있도록 합니다. 이러한 접근 방식은 특히 신뢰성, 확장성, 성능이 중요한 프로덕션 환경에서 매우 가치가 있습니다.

구현 및 스키마

LangChain Runnables 및 LCEL의 구현은 프로그램의 다양한 구성 요소가 어떻게 상호작용하는지를 설명하는 스키마를 정의하는 것입니다. 이러한 스키마는 체인의 각 구성 요소가 올바른 유형의 입력을 받고 예상되는 유형의 출력을 생성하도록 보장합니다. 이러한 구조화된 접근 방식은 신뢰할 수 있는 프로그램을 구축하는 데 도움이 될 뿐만 아니라, 프로그램을 유지하고 확장하는 데도 유리합니다.

결론

LangChain Runnables과 LCEL은 LLM을 통합한 워크플로우를 구축하고 관리할 수 있는 강력한 패러다임을 제공합니다. 이들은 다양한 작업 모드를 지원하는 유연하고 확장 가능한 프레임워크를 제공하여, 복잡한 데이터 처리 작업을 간단하게 구성하고 강력한 처리 체인을 만들 수 있습니다. LCEL과 Runnables을 활용하여 효율적이고 반응성이 뛰어난 확장 가능한 솔루션을 만들 수 있습니다.

 

 

<참조>

- Runnable Conceptual Guide: https://python.langchain.com/v0.2/docs/concepts/#runnable-interface

 

Conceptual guide | 🦜️🔗 LangChain

This section contains introductions to key parts of LangChain.

python.langchain.com

- How-to Guides Runnable Sequence: https://python.langchain.com/v0.2/docs/how_to/sequence/

 

How to chain runnables | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- How-to Guides Runnable stream: https://python.langchain.com/v0.2/docs/how_to/streaming/

 

How to stream runnables | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

 

posted by Peter Note