블로그 이미지
Peter Note
AI & Web FullStacker, Application Architecter, KnowHow Dispenser and Bike Rider

Publication

Category

Recent Post

2024. 8. 12. 22:45 LLM FullStacker/LangChain

LCEL로 연결되는 인스턴스는 Runnable을 상속받아야 한다. 공식 문서의 Runnable을 살펴본다. 

출처 DeepLearning.ai

 

 

 

Custom Chain 생성을 하려면 Runnable을 상속 구현

  - invoke: abstractmethod로 반드시 구현해야 한다. call the chain on an input (ainvoke)

  - stream: stream back chunks of the response (astream)

  - batch: call the chain on a list of inputs (abatch)

 

invoke의 Input, Output 타입은 Generic으로 구현체의 타입을 따른다. 현재 Runnable의 invoke로 호출된 Output은 다음 Runnable의 Input 값이 된다. 

class Runnable(Generic[Input, Output], ABC):
    @abstractmethod
    def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
        """Transform a single input into an output. Override to implement.

        Args:
            input: The input to the Runnable.
            config: A config to use when invoking the Runnable.
               The config supports standard keys like 'tags', 'metadata' for tracing
               purposes, 'max_concurrency' for controlling how much work to do
               in parallel, and other keys. Please refer to the RunnableConfig
               for more details.

        Returns:
            The output of the Runnable.
        """

 

Generic[Input, Output] 의 타입표이고, LangChain의 컴포넌트들이 Runable을 상속받고 있다.

  - Prompt, ChatModel, LLM, OutputParser, Retriever, Tool

 

| pipe operator 이해

Runnable 체인의 연결은 pipe operatore( | ) 또는 Runnable의 .pipe() 메서드를 사용한다. 이는 마치 RxJS의 pipe 연결과 유사하다. 

 

파이썬은 클래스 인스턴스를 + 기호로 쓸 수 있다. 이때 클래스의 __add__ 를 구현한다. 마찬가지로 | 기호를 사용하기 위해서 __or____ror__ 을 구현하고 있다. 

    // a | b -> a.__or__(b)
    def __or__(
        self,
        other: Union[
            Runnable[Any, Other],
            Callable[[Any], Other],
            Callable[[Iterator[Any]], Iterator[Other]],
            Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
        ],
    ) -> RunnableSerializable[Input, Other]:
        """Compose this Runnable with another object to create a RunnableSequence."""
        return RunnableSequence(self, coerce_to_runnable(other))
        
    // a | b (if not working) -> b.__ror_(a)
    def __ror__(
        self,
        other: Union[
            Runnable[Other, Any],
            Callable[[Other], Any],
            Callable[[Iterator[Other]], Iterator[Any]],
            Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
        ],
    ) -> RunnableSerializable[Other, Output]:
        """Compose this Runnable with another object to create a RunnableSequence."""
        return RunnableSequence(coerce_to_runnable(other), self)

    // 끝에 dictionary 타입을 Runnable로 만들어 준다. 
    def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
    """Coerce a Runnable-like object into a Runnable.

    Args:
        thing: A Runnable-like object.

    Returns:
        A Runnable.

    Raises:
        TypeError: If the object is not Runnable-like.
    """
    if isinstance(thing, Runnable):
        return thing
    elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
        return RunnableGenerator(thing)
    elif callable(thing):
        return RunnableLambda(cast(Callable[[Input], Output], thing))
    elif isinstance(thing, dict):
        return cast(Runnable[Input, Output], RunnableParallel(thing))
    else:
        raise TypeError(
            f"Expected a Runnable, callable or dict."
            f"Instead got an unsupported type: {type(thing)}"
        )

 

 

이 코드에서 정의된 __or__ 메서드는 Python의 특수 메서드 중 하나로, 비트 연산자 | 를 오버로딩하여 객체 간의 조합을 가능하게 합니다. 이 메서드는 RunnableSerializable 클래스에 정의되어 있으며, 이 클래스의 인스턴스를 다른 객체와 조합하여 새로운 RunnableSequence를 생성합니다.

 

__or__ 메서드의 의미와 역할

1. __or__ 메서드 오버로딩

  • 이 메서드는 | 연산자를 사용했을 때 호출됩니다. 예를 들어, a | b와 같이 사용할 때, a.__or__(b)가 호출됩니다.
  • self는 __or__ 메서드가 호출되는 객체(RunnableSerializable의 인스턴스)를 가리킵니다.
  • other는 | 연산자의 오른쪽에 있는 객체를 가리킵니다.

2. 매개변수 other의 타입

other는 다음 중 하나일 수 있습니다:

  • Runnable[Any, Other]: 제네릭 Runnable 객체로, 임의의 입력(Any)을 받아서 Other 타입의 출력을 반환하는 실행 가능한 객체.
  • Callable[[Any], Other]: 함수 또는 람다와 같이, 임의의 입력을 받아 Other 타입의 출력을 반환하는 호출 가능한 객체.
  • Callable[[Iterator[Any]], Iterator[Other]]: 이터레이터를 받아 이터레이터를 반환하는 호출 가능한 객체.
  • Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]]: Runnable이나 Callable 또는 다른 값으로 구성된 맵핑 객체(예: 딕셔너리).

3. 리턴 타입

  • __or__ 메서드는 RunnableSerializable[Input, Other] 타입의 객체를 반환합니다. 이 반환 객체는 RunnableSequence로, 두 개의 Runnable 객체를 순차적으로 연결한 것입니다.

4. 코드 동작

  • RunnableSequence(self, coerce_to_runnable(other))를 반환하여 두 객체를 조합합니다. 여기서 self는 현재 객체이고, other는 조합할 다른 객체입니다.
  • coerce_to_runnable(other)는 other를 Runnable 객체로 변환하는 함수입니다. 만약 other가 Runnable이 아니더라도, 이 함수가 이를 적절한 Runnable 객체로 변환합니다.

 

__ror__ 메서드의 의미와 역할

1. __ror__ 메서드 오버로딩

  • __ror__는 | 연산자가 왼쪽 객체에서 작동하지 않을 때 오른쪽 객체에 의해 호출됩니다. 즉, a | b에서 a가 __or__ 메서드를 정의하지 않았거나 NotImplemented를 반환하면, b.__ror__(a)가 호출됩니다.
  • self는 이 메서드가 정의된 클래스의 인스턴스(즉, RunnableSerializable)를 가리킵니다.
  • other는 | 연산자의 왼쪽에 있는 객체를 가리킵니다.

2. 매개변수 other의 타입

  • other는 다음 중 하나일 수 있습니다:
    • Runnable[Other, Any]: Other 타입의 입력을 받아 Any 타입의 출력을 반환하는 실행 가능한 객체.
    • Callable[[Other], Any]: Other 타입의 입력을 받아 Any 타입의 출력을 반환하는 호출 가능한 객체.
    • Callable[[Iterator[Other]], Iterator[Any]]: Other 타입의 이터레이터를 받아 Any 타입의 이터레이터를 반환하는 호출 가능한 객체.
    • Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]]: Runnable이나 Callable 또는 다른 값으로 구성된 맵핑 객체(예: 딕셔너리).

3. 리턴 타입

  • 이 메서드는 RunnableSerializable[Other, Output] 타입의 객체를 반환합니다. 반환된 객체는 RunnableSequence로, 이는 두 개의 Runnable 객체를 순차적으로 연결한 것입니다.

4. 코드 동작

  • RunnableSequence(coerce_to_runnable(other), self)를 반환하여, other 객체와 self 객체를 결합한 새로운 RunnableSequence를 생성합니다.
  • coerce_to_runnable(other)는 other 객체가 Runnable이 아니더라도 이를 Runnable 객체로 변환하는 함수입니다.

이 코드의 동작은 other | self 연산이 수행될 때, other를 Runnable로 변환하고, 이를 self와 결합하여 순차적인 실행 체인을 만드는 것입니다.

 

pipe 메서드의 의미와 역할

Runnable 의 pipe 메서드 구현의 반환값이 __or__ 의 반환값이 동일하다.

    def pipe(
        self,
        *others: Union[Runnable[Any, Other], Callable[[Any], Other]],
        name: Optional[str] = None,
    ) -> RunnableSerializable[Input, Other]:
        """Compose this Runnable with Runnable-like objects to make a RunnableSequence.

        Equivalent to `RunnableSequence(self, *others)` or `self | others[0] | ...`

        Example:
            .. code-block:: python

                from langchain_core.runnables import RunnableLambda

                def add_one(x: int) -> int:
                    return x + 1

                def mul_two(x: int) -> int:
                    return x * 2

                runnable_1 = RunnableLambda(add_one)
                runnable_2 = RunnableLambda(mul_two)
                sequence = runnable_1.pipe(runnable_2)
                # Or equivalently:
                # sequence = runnable_1 | runnable_2
                # sequence = RunnableSequence(first=runnable_1, last=runnable_2)
                sequence.invoke(1)
                await sequence.ainvoke(1)
                # -> 4

                sequence.batch([1, 2, 3])
                await sequence.abatch([1, 2, 3])
                # -> [4, 6, 8]
        """
        return RunnableSequence(self, *others, name=name)

1. pipe 메서드의 역할

  • pipe 메서드는 현재 객체와 다른 Runnable 또는 Callable 객체들을 연결하여 하나의 실행 체인을 만듭니다.
  • 이 체인은 여러 단계를 거쳐 데이터를 처리하며, 각 단계는 이전 단계의 출력을 받아 다음 단계의 입력으로 사용합니다.

2. 매개변수

  • self: pipe 메서드가 호출되는 객체(즉, RunnableSerializable 클래스의 인스턴스)를 가리킵니다.
  • *others: 가변 인자 형태로, 여러 개의 Runnable[Any, Other] 또는 Callable[[Any], Other] 객체들을 받아들입니다. 이 객체들은 순차적으로 연결되어 실행됩니다.
    • Runnable[Any, Other]: 임의의 입력(Any)을 받아 Other 타입의 출력을 생성하는 실행 가능한 객체.
    • Callable[[Any], Other]: 함수를 포함하여 입력을 받아 출력을 반환하는 호출 가능한 객체.
  • name (선택적): 실행 체인의 이름을 지정하는 선택적인 매개변수입니다. 이 이름은 디버깅 또는 로깅 목적으로 사용할 수 있습니다.

3. 리턴 타입

  • RunnableSerializable[Input, Other]: 여러 단계를 연결한 새로운 RunnableSerializable 객체를 반환합니다. 이 객체는 입력(Input)을 받아 최종 출력(Other)을 생성하는 실행 체인입니다.

4. 코드 동작

  • 이 메서드는 현재 객체(self)와 others로 전달된 모든 Runnable 또는 Callable 객체들을 연결하여 하나의 RunnableSequence 객체를 생성합니다.
  • RunnableSequence는 각 단계의 출력을 다음 단계의 입력으로 사용하여 최종 결과를 생성합니다.

사용예

from langchain_core.runnables import RunnableParallel

composed_chain_with_pipe = (
    RunnableParallel({"joke": chain})
    .pipe(analysis_prompt)
    .pipe(model)
    .pipe(StrOutputParser())
)

composed_chain_with_pipe.invoke({"topic": "battlestar galactica"})

// 결과
"I cannot reproduce any copyrighted material verbatim, but I can try to analyze the humor in the joke you provided without quoting it directly.\n\nThe joke plays on the idea that the Cylon raiders, who are the antagonists in the Battlestar Galactica universe, failed to locate the human survivors after attacking their home planets (the Twelve Colonies) due to using an outdated and poorly performing operating system (Windows Vista) for their targeting systems.\n\nThe humor stems from the juxtaposition of a futuristic science fiction setting with a relatable real-world frustration – the use of buggy, slow, or unreliable software or technology. It pokes fun at the perceived inadequacies of Windows Vista, which was widely criticized for its performance issues and other problems when it was released.\n\nBy attributing the Cylons' failure to locate the humans to their use of Vista, the joke creates an amusing and unexpected connection between a fictional advanced race of robots and a familiar technological annoyance experienced by many people in the real world.\n\nOverall, the joke relies on incongruity and relatability to generate humor, but without reproducing any copyrighted material directly."

// 또는 일렇게 사용도 가능하다. 
composed_chain_with_pipe = RunnableParallel({"joke": chain}).pipe(
    analysis_prompt, model, StrOutputParser()
)

 

파이프 예제 및 Runnable 구현체들

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

// model runnable
model = ChatOpenAI(model="gpt-4o-mini")
// prompt runnable
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
// output parser runnable
chain = prompt | model | StrOutputParser()

chain.invoke({"topic": "bears"})

// 결과
"Here's a bear joke for you:\n\nWhy did the bear dissolve in water?\nBecause it was a polar bear!"

 

Runnable 구현체들은 langchain_core.runnables 폴더의  base.py에 구현되어 있다.

  - RunnableSerializable : Serialize 는 보통 파일, 네트워크 전송을 가능하게 한다. 여기서는 json 으로 serialization이 가능하다. 

  - RunnableSequence : Runnable을 순차 처리한다. 

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# Or equivalently:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)

// 결과 
4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])

// 결과
[4, 6, 8]

 

  - RunnableParallel  : 예 소스 Runnable을 병렬 처리한다. 

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

def mul_three(x: int) -> int:
    return x * 3

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)

sequence = runnable_1 | {  # this dict is coerced to a RunnableParallel
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
#     {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
#     mul_two=runnable_2,
#     mul_three=runnable_3,
# )

sequence.invoke(1)
await sequence.ainvoke(1)

// 결과
{'mul_two': 4, 'mul_three': 6}


sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])

// 결과
[{'mul_two': 4, 'mul_three': 6},
 {'mul_two': 6, 'mul_three': 9},
 {'mul_two': 8, 'mul_three': 12}]

 

LLM 호출 RunnableParallel 예제

from langchain_core.output_parsers import StrOutputParser

analysis_prompt = ChatPromptTemplate.from_template("is this a funny joke? {joke}")
composed_chain = {"joke": chain} | analysis_prompt | model | StrOutputParser()

// RunnableParallel 로 전환된다. 
composed_chain.invoke({"topic": "bears"})

// 결과 
'Haha, that\'s a clever play on words! Using "polar" to imply the bear dissolved or became polar/polarized when put in water. Not the most hilarious joke ever, but it has a cute, groan-worthy pun that makes it mildly amusing. I appreciate a good pun or wordplay joke.'

 

chain 의 output 을 lambda 의 input으로 받아 dictionary 를 만들어 체인을 구성할 수도 있다. 

composed_chain_with_lambda = (
    chain
    | (lambda input: {"joke": input})
    | analysis_prompt
    | model
    | StrOutputParser()
)

composed_chain_with_lambda.invoke({"topic": "beets"})

// 결과
"Haha, that's a cute and punny joke! I like how it plays on the idea of beets blushing or turning red like someone blushing. Food puns can be quite amusing. While not a total knee-slapper, it's a light-hearted, groan-worthy dad joke that would make me chuckle and shake my head. Simple vegetable humor!"

 

  - RunnableLambda  

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)

runnable.invoke(1) # returns 2
runnable.batch([1, 2, 3]) # returns [2, 3, 4]

# Async is supported by default by delegating to the sync implementation
await runnable.ainvoke(1) # returns 2
await runnable.abatch([1, 2, 3]) # returns [2, 3, 4]


# Alternatively, can provide both synd and sync implementations
async def add_one_async(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one, afunc=add_one_async)
runnable.invoke(1) # Uses add_one
await runnable.ainvoke(1) # Uses add_one_async

 

  - RunnableGenerator

from typing import Any, AsyncIterator, Iterator

from langchain_core.runnables import RunnableGenerator


def gen(input: Iterator[Any]) -> Iterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token


runnable = RunnableGenerator(gen)
runnable.invoke(None)  # "Have a nice day"
list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]


# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token

runnable = RunnableGenerator(agen)
await runnable.ainvoke(None)  # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]

  - 그외 RunnableEachBase, RunableEach, RunnableBindingBase, RunnableBinding

 

모든 Runnable은 input 과 output schema를 노출한다. 

   - input_schema: input pydantic model

   - output_schema: output pydantic model

 

 

예제는 주로 invoke, ainvoke를 한번에 output을 받는 것이고, LLM을 통해 응답을 받을때는 UX개선을 위하 stream으로 받아 처리한다. Runnable stream 처리에 대한 다양한 접근법은 공식문서를 참조한다. 

 

 

 

Runnable 개념 정리

LangChain Runnable과 LangChain Expression Language (LCEL)은 LLM(대형 언어 모델)의 기능을 활용하여 견고한 프로덕션 수준의 프로그램을 만들 수 있는 강력한 도구입니다. 다음은 이들의 동작 방식과 중요성에 대한 개요입니다.

LangChain Runnable

LangChain Runnable은 실행할 수 있는 작업이나 기능을 캡슐화한 개념으로, 일반적으로 일련의 계산 단계를 나타냅니다. Runnable은 LangChain에서 복잡한 워크플로우 또는 체인을 구성할 수 있는 기본 구성 요소입니다. 각 Runnable은 유연하게 설계되어 동기, 비동기, 배치, 스트리밍 작업을 처리할 수 있습니다.

LangChain Expression Language (LCEL)

LangChain Expression Language (LCEL)은 이러한 워크플로우 또는 프로그램을 구성할 수 있는 선언적 언어입니다. LCEL을 사용하면 여러 Runnable을 명확하고 구조적인 방식으로 결합할 수 있습니다. LCEL을 통해 복잡한 작업 체인을 정의하고 여러 Runnable을 하나의 프로그램으로 결합할 수 있습니다.

LCEL과 Runnables의 주요 기능

  1. 동기 작업: 전통적인 차단 작업으로, 각 단계가 완료된 후 다음 단계로 이동합니다.
  2. 비동기 작업: 차단되지 않는 작업으로, 특정 작업이 완료되기를 기다리는 동안 프로그램이 다른 작업을 처리할 수 있습니다. 이는 서버 환경에서 여러 요청을 동시에 처리할 때 특히 중요합니다.
  3. 배치 처리: 여러 입력을 동시에 처리할 수 있는 기능입니다. 대량의 데이터를 처리하거나 여러 사용자의 쿼리를 동시에 처리할 때 유용합니다.
  4. 스트리밍: 중간 결과가 생성됨에 따라 이를 스트리밍할 수 있는 기능으로, 사용자 경험을 개선할 수 있습니다. 예를 들어, LLM으로 텍스트를 생성할 때 전체 텍스트가 생성되기 전에 부분적인 응답을 사용자에게 스트리밍할 수 있습니다.

사용 예시

  • 배치 처리: LLM을 통해 고객 쿼리 데이터를 분류해야 하는 시나리오를 상상해 보십시오. LCEL과 Runnables을 사용하여 이 쿼리들을 배치 처리하고 병렬로 처리할 수 있어 작업 속도가 크게 향상됩니다.
  • 비동기 작업: 여러 클라이언트 요청을 처리하는 웹 서버에서 LCEL을 사용하여 이러한 요청을 비동기로 관리함으로써, 서버가 여러 트래픽을 동시에 처리할 수 있게 합니다.
  • 스트리밍 결과: 실시간으로 사용자와 상호작용하는 챗봇의 경우, LCEL을 사용하여 부분적으로 생성된 텍스트로 바로 응답을 시작함으로써 더 인터랙티브하고 반응성이 좋은 사용자 경험을 제공할 수 있습니다.

프로덕션 수준 프로그램에서의 LCEL의 역할

LCEL의 설계는 LLM의 기능을 활용하는 복잡한 워크플로우를 보다 쉽게 구축하고 관리할 수 있도록 하는 데 중점을 둡니다. 이는 다양한 작업 모드(동기, 비동기, 배치, 스트리밍)를 처리하는 데 있어 복잡성을 추상화하여, 개발자가 워크플로우의 논리 정의에 집중할 수 있도록 합니다. 이러한 접근 방식은 특히 신뢰성, 확장성, 성능이 중요한 프로덕션 환경에서 매우 가치가 있습니다.

구현 및 스키마

LangChain Runnables 및 LCEL의 구현은 프로그램의 다양한 구성 요소가 어떻게 상호작용하는지를 설명하는 스키마를 정의하는 것입니다. 이러한 스키마는 체인의 각 구성 요소가 올바른 유형의 입력을 받고 예상되는 유형의 출력을 생성하도록 보장합니다. 이러한 구조화된 접근 방식은 신뢰할 수 있는 프로그램을 구축하는 데 도움이 될 뿐만 아니라, 프로그램을 유지하고 확장하는 데도 유리합니다.

결론

LangChain Runnables과 LCEL은 LLM을 통합한 워크플로우를 구축하고 관리할 수 있는 강력한 패러다임을 제공합니다. 이들은 다양한 작업 모드를 지원하는 유연하고 확장 가능한 프레임워크를 제공하여, 복잡한 데이터 처리 작업을 간단하게 구성하고 강력한 처리 체인을 만들 수 있습니다. LCEL과 Runnables을 활용하여 효율적이고 반응성이 뛰어난 확장 가능한 솔루션을 만들 수 있습니다.

 

 

<참조>

- Runnable Conceptual Guide: https://python.langchain.com/v0.2/docs/concepts/#runnable-interface

 

Conceptual guide | 🦜️🔗 LangChain

This section contains introductions to key parts of LangChain.

python.langchain.com

- How-to Guides Runnable Sequence: https://python.langchain.com/v0.2/docs/how_to/sequence/

 

How to chain runnables | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

- How-to Guides Runnable stream: https://python.langchain.com/v0.2/docs/how_to/streaming/

 

How to stream runnables | 🦜️🔗 LangChain

This guide assumes familiarity with the following concepts:

python.langchain.com

 

posted by Peter Note