lephant_
lephant_
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
But the stream function will eventually call this function:
def _transform_stream_with_config(
self,
input: Iterator[Input],
transformer: Union[
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[Iterator[Input], CallbackManagerForChainRun], Iterator[Output]],
Callable[
[
Iterator[Input],
CallbackManagerForChainRun,
RunnableConfig,
],
Iterator[Output],
],
],
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""Helper method to transform an Iterator of Input values into an Iterator of
Output values, with callbacks.
Use this to implement `stream()` or `transform()` in Runnable subclasses."""
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = tee(input, 2)
# Start the input iterator to ensure the input runnable starts before this one
final_input: Optional[Input] = next(input_for_tracing, None)
final_input_supported = True
final_output: Optional[Output] = None
final_output_supported = True

config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
)

def _transform_stream_with_config(
self,
input: Iterator[Input],
transformer: Union[
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[Iterator[Input], CallbackManagerForChainRun], Iterator[Output]],
Callable[
[
Iterator[Input],
CallbackManagerForChainRun,
RunnableConfig,
],
Iterator[Output],
],
],
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""Helper method to transform an Iterator of Input values into an Iterator of
Output values, with callbacks.
Use this to implement `stream()` or `transform()` in Runnable subclasses."""
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = tee(input, 2)
# Start the input iterator to ensure the input runnable starts before this one
final_input: Optional[Input] = next(input_for_tracing, None)
final_input_supported = True
final_output: Optional[Output] = None
final_output_supported = True

config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
)

Where they now just call on_chain_start with a empty string as value for input 💀
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
I think it's inside of langchain's code: So for their runnableSequence invoke func they have:
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context

# setup callbacks and context
config = config_with_context(ensure_config(config), self.steps)
callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)

# invoke all steps in sequence
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context

# setup callbacks and context
config = config_with_context(ensure_config(config), self.steps)
callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)

# invoke all steps in sequence
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
Which invokes the callback_manager
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
Yea sure! Also happy to work on this, can you maybe give a quick insight how this could be fixed? I understand it's somewhere in the callbackhandler (on_chain_start) Also isn't it something on langchain's side? Since the on_chain_start should receive the inputs no?
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
No description
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
Yep it works with invoke
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
InputsRag is a wrapper class: class InputsRag(RunnableParallel): pass Since langfuse does not parse the chain names from LCEL
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
And the initial input chain is just gathering question, chathistory, docs and checking if the prompt is toxic
_inputs = InputsRag(
{
"question": GetQuestion(lambda x: x["question"]),
"chat_history": FormatHistory(
first=RunnableLambda(lambda x: x["history"]),
last=RunnableLambda(_format_chat_history),
),
"docs": RetrieveDocs(
first=(_search_query | retriever),
last=RunnableLambda(remove_embedding_from_documents),
),
"error": ModerationChain(
first=(
RunnableLambda(lambda x: x["question"])
| prompt_moderation_with_config
),
last=RunnableLambda(lambda _: False),
),
}
)
_inputs = InputsRag(
{
"question": GetQuestion(lambda x: x["question"]),
"chat_history": FormatHistory(
first=RunnableLambda(lambda x: x["history"]),
last=RunnableLambda(_format_chat_history),
),
"docs": RetrieveDocs(
first=(_search_query | retriever),
last=RunnableLambda(remove_embedding_from_documents),
),
"error": ModerationChain(
first=(
RunnableLambda(lambda x: x["question"])
| prompt_moderation_with_config
),
last=RunnableLambda(lambda _: False),
),
}
)
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
Nope running locally
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
mychain = (
_inputs
| _answer
| (
response_moderation
if response_moderation_with_config.moderation_config.filters
else RunnablePassthrough()
)
)
mychain = (
_inputs
| _answer
| (
response_moderation
if response_moderation_with_config.moderation_config.filters
else RunnablePassthrough()
)
)
19 replies
LLangfuse
Created by lephant_ on 1/24/2024 in #get-support
LCEL streaming
No description
19 replies