lephant_
lephant_•8mo ago

LCEL streaming

Hi all, I'm using langchain LCEL + stream, so chain.stream... But now the inputs at on_chain_start are initially empty {inputs:''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': "Wow that's interesting!"}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': ''}
FIRST: -------> {'input': "Wow that's interesting!"}
Also in the traces the inputs are empty
14 Replies
Marc
Marc•8mo ago
Can you share more details on your implementation? Happy to help debug this
lephant_
lephant_•8mo ago
Yea sure! It starts with a "RunnableSequence" which is infact running in parralel
No description
lephant_
lephant_•8mo ago
mychain = (
_inputs
| _answer
| (
response_moderation
if response_moderation_with_config.moderation_config.filters
else RunnablePassthrough()
)
)
mychain = (
_inputs
| _answer
| (
response_moderation
if response_moderation_with_config.moderation_config.filters
else RunnablePassthrough()
)
)
Marc
Marc•8mo ago
Is this on langfuse cloud?
lephant_
lephant_•8mo ago
Nope running locally And the initial input chain is just gathering question, chathistory, docs and checking if the prompt is toxic
_inputs = InputsRag(
{
"question": GetQuestion(lambda x: x["question"]),
"chat_history": FormatHistory(
first=RunnableLambda(lambda x: x["history"]),
last=RunnableLambda(_format_chat_history),
),
"docs": RetrieveDocs(
first=(_search_query | retriever),
last=RunnableLambda(remove_embedding_from_documents),
),
"error": ModerationChain(
first=(
RunnableLambda(lambda x: x["question"])
| prompt_moderation_with_config
),
last=RunnableLambda(lambda _: False),
),
}
)
_inputs = InputsRag(
{
"question": GetQuestion(lambda x: x["question"]),
"chat_history": FormatHistory(
first=RunnableLambda(lambda x: x["history"]),
last=RunnableLambda(_format_chat_history),
),
"docs": RetrieveDocs(
first=(_search_query | retriever),
last=RunnableLambda(remove_embedding_from_documents),
),
"error": ModerationChain(
first=(
RunnableLambda(lambda x: x["question"])
| prompt_moderation_with_config
),
last=RunnableLambda(lambda _: False),
),
}
)
InputsRag is a wrapper class: class InputsRag(RunnableParallel): pass Since langfuse does not parse the chain names from LCEL
Marc
Marc•8mo ago
Can you check whether it works without streaming? (this would confirm that it is a problem caaused by how the langfuse integration captures streamed lcel chains)
lephant_
lephant_•8mo ago
Yep it works with invoke
Marc
Marc•8mo ago
Since langfuse does not parse the chain names from LCEL
there's an open PR for this and I'm keen to get this fixed soon, we're just putting all of our core effort into some big releases before
lephant_
lephant_•8mo ago
So this is the expected behavior, using .invoke instead of stream
No description
Marc
Marc•8mo ago
Can you add an issue to track this feature request (LCEL streaming support)? Open to contributions on this and we'll see how we can prioritize this soon as well
lephant_
lephant_•8mo ago
Yea sure! Also happy to work on this, can you maybe give a quick insight how this could be fixed? I understand it's somewhere in the callbackhandler (on_chain_start) Also isn't it something on langchain's side? Since the on_chain_start should receive the inputs no?
Marc
Marc•8mo ago
Happy to support on this as well if you run into questions while trying to fix this. I'd make the same assumption as you (on_chain_start should include this), however the implementation of these interfaces in the langchain library is sometimes not super stable so you'd need to try it locally to see where exactly it breaks currently I'd just console.log all of the data to quickly investigate and fix
lephant_
lephant_•8mo ago
I think it's inside of langchain's code: So for their runnableSequence invoke func they have:
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context

# setup callbacks and context
config = config_with_context(ensure_config(config), self.steps)
callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)

# invoke all steps in sequence
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context

# setup callbacks and context
config = config_with_context(ensure_config(config), self.steps)
callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)

# invoke all steps in sequence
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
Which invokes the callback_manager But the stream function will eventually call this function:
def _transform_stream_with_config(
self,
input: Iterator[Input],
transformer: Union[
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[Iterator[Input], CallbackManagerForChainRun], Iterator[Output]],
Callable[
[
Iterator[Input],
CallbackManagerForChainRun,
RunnableConfig,
],
Iterator[Output],
],
],
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""Helper method to transform an Iterator of Input values into an Iterator of
Output values, with callbacks.
Use this to implement `stream()` or `transform()` in Runnable subclasses."""
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = tee(input, 2)
# Start the input iterator to ensure the input runnable starts before this one
final_input: Optional[Input] = next(input_for_tracing, None)
final_input_supported = True
final_output: Optional[Output] = None
final_output_supported = True

config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
)

def _transform_stream_with_config(
self,
input: Iterator[Input],
transformer: Union[
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[Iterator[Input], CallbackManagerForChainRun], Iterator[Output]],
Callable[
[
Iterator[Input],
CallbackManagerForChainRun,
RunnableConfig,
],
Iterator[Output],
],
],
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""Helper method to transform an Iterator of Input values into an Iterator of
Output values, with callbacks.
Use this to implement `stream()` or `transform()` in Runnable subclasses."""
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = tee(input, 2)
# Start the input iterator to ensure the input runnable starts before this one
final_input: Optional[Input] = next(input_for_tracing, None)
final_input_supported = True
final_output: Optional[Output] = None
final_output_supported = True

config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
)

Where they now just call on_chain_start with a empty string as value for input 💀
Marc
Marc•8mo ago
Thanks for the detail on this! Then we need to wait for a fix in langchain, maybe you can also just contribute a PR there to get this fixed