Open
Description
I'm attempting to implement an asynchronous approach using PostgresChatMessageHistory in combination with RunnableWithMessageHistory, but I'm encountering some challenges. While PostgresChatMessageHistory appears to support asynchronous connections with async methods, I haven't found clear guidance on how to properly use them. Additionally, I couldn't locate any relevant documentation addressing this use case.
Any ideas on how to resolve this problem?
Here the a sample code I'm using and the traceback error:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_postgres import PostgresChatMessageHistory
import psycopg
from psycopg import AsyncConnection
import uuid
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | StrOutputParser()
def get_session_history(session_id: str = None) -> PostgresChatMessageHistory:
session_id = session_id or str(uuid.uuid4())
sync_connection = psycopg.connect(DATABASE_URL) # the db url was initialized before
return PostgresChatMessageHistory(
"chat_history",
session_id,
sync_connection=sync_connection
)
async def aget_session_history(session_id: str = None) -> PostgresChatMessageHistory:
session_id = session_id or str(uuid.uuid4())
async with await AsyncConnection.connect(DATABASE_URL) as async_connection:
return PostgresChatMessageHistory(
"chat_history",
session_id,
async_connection=async_connection
)
chain_with_history_sync = (
RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="question",
history_messages_key="chat_history",
)
)
chain_with_history_async = (
RunnableWithMessageHistory(
chain,
aget_session_history,
input_messages_key="question",
history_messages_key="chat_history",
)
)
session_id = str(uuid.uuid4())
answer = chain_with_history_sync.invoke(
{"question": "Good morning!"},
{"configurable": {"session_id": session_id}}
)
print("Sync invoke:\n", answer)
session_id = str(uuid.uuid4())
answer = await chain_with_history_async.ainvoke(
{"question": "Good morning!"},
{"configurable": {"session_id": session_id}}
)
print("Async invoke:\n", answer)
Traceback error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[2], line 71
67 print("Sync invoke:\n", answer)
69 session_id = str(uuid.uuid4())
---> 71 answer = await chain_with_history_async.ainvoke(
72 {"question": "Good morning!"},
73 {"configurable": {"session_id": session_id}}
74 )
75 print("Async invoke:\n", answer)
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:5105, in RunnableBindingBase.ainvoke(self, input, config, **kwargs)
5099 async def ainvoke(
5100 self,
5101 input: Input,
5102 config: Optional[RunnableConfig] = None,
5103 **kwargs: Optional[Any],
5104 ) -> Output:
-> 5105 return await self.bound.ainvoke(
5106 input,
5107 self._merge_configs(config),
5108 **{**self.kwargs, **kwargs},
5109 )
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:5105, in RunnableBindingBase.ainvoke(self, input, config, **kwargs)
5099 async def ainvoke(
5100 self,
5101 input: Input,
5102 config: Optional[RunnableConfig] = None,
5103 **kwargs: Optional[Any],
5104 ) -> Output:
-> 5105 return await self.bound.ainvoke(
5106 input,
5107 self._merge_configs(config),
5108 **{**self.kwargs, **kwargs},
5109 )
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:2921, in RunnableSequence.ainvoke(self, input, config, **kwargs)
2919 part = functools.partial(step.ainvoke, input, config)
2920 if asyncio_accepts_context():
-> 2921 input = await asyncio.create_task(part(), context=context) # type: ignore
2922 else:
2923 input = await asyncio.create_task(part())
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:5105, in RunnableBindingBase.ainvoke(self, input, config, **kwargs)
5099 async def ainvoke(
5100 self,
5101 input: Input,
5102 config: Optional[RunnableConfig] = None,
5103 **kwargs: Optional[Any],
5104 ) -> Output:
-> 5105 return await self.bound.ainvoke(
5106 input,
5107 self._merge_configs(config),
5108 **{**self.kwargs, **kwargs},
5109 )
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/passthrough.py:523, in RunnableAssign.ainvoke(self, input, config, **kwargs)
517 async def ainvoke(
518 self,
519 input: Dict[str, Any],
520 config: Optional[RunnableConfig] = None,
521 **kwargs: Any,
522 ) -> Dict[str, Any]:
--> 523 return await self._acall_with_config(self._ainvoke, input, config, **kwargs)
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:1837, in Runnable._acall_with_config(self, func, input, config, run_type, serialized, **kwargs)
1833 coro = acall_func_with_variable_args(
1834 func, input, config, run_manager, **kwargs
1835 )
1836 if asyncio_accepts_context():
-> 1837 output: Output = await asyncio.create_task(coro, context=context) # type: ignore
1838 else:
1839 output = await coro
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/passthrough.py:510, in RunnableAssign._ainvoke(self, input, run_manager, config, **kwargs)
497 async def _ainvoke(
498 self,
499 input: Dict[str, Any],
(...)
502 **kwargs: Any,
503 ) -> Dict[str, Any]:
504 assert isinstance(
505 input, dict
506 ), "The input to RunnablePassthrough.assign() must be a dict."
508 return {
509 **input,
--> 510 **await self.mapper.ainvoke(
511 input,
512 patch_config(config, callbacks=run_manager.get_child()),
513 **kwargs,
514 ),
515 }
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:3626, in RunnableParallel.ainvoke(self, input, config, **kwargs)
3623 try:
3624 # copy to avoid issues from the caller mutating the steps during invoke()
3625 steps = dict(self.steps__)
-> 3626 results = await asyncio.gather(
3627 *(
3628 _ainvoke_step(
3629 step,
3630 input,
3631 # mark each step as a child run
3632 config,
3633 key,
3634 )
3635 for key, step in steps.items()
3636 )
3637 )
3638 output = {key: value for key, value in zip(steps, results)}
3639 # finish the root run
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:3616, in RunnableParallel.ainvoke.<locals>._ainvoke_step(step, input, config, key)
3614 context.run(_set_config_context, child_config)
3615 if asyncio_accepts_context():
-> 3616 return await asyncio.create_task( # type: ignore
3617 step.ainvoke(input, child_config), context=context
3618 )
3619 else:
3620 return await asyncio.create_task(step.ainvoke(input, child_config))
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:5105, in RunnableBindingBase.ainvoke(self, input, config, **kwargs)
5099 async def ainvoke(
5100 self,
5101 input: Input,
5102 config: Optional[RunnableConfig] = None,
5103 **kwargs: Optional[Any],
5104 ) -> Output:
-> 5105 return await self.bound.ainvoke(
5106 input,
5107 self._merge_configs(config),
5108 **{**self.kwargs, **kwargs},
5109 )
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:4504, in RunnableLambda.ainvoke(self, input, config, **kwargs)
4493 """Invoke this Runnable asynchronously.
4494
4495 Args:
(...)
4501 The output of this Runnable.
4502 """
4503 the_func = self.afunc if hasattr(self, "afunc") else self.func
-> 4504 return await self._acall_with_config(
4505 self._ainvoke,
4506 input,
4507 self._config(config, the_func),
4508 **kwargs,
4509 )
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:1837, in Runnable._acall_with_config(self, func, input, config, run_type, serialized, **kwargs)
1833 coro = acall_func_with_variable_args(
1834 func, input, config, run_manager, **kwargs
1835 )
1836 if asyncio_accepts_context():
-> 1837 output: Output = await asyncio.create_task(coro, context=context) # type: ignore
1838 else:
1839 output = await coro
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/base.py:4430, in RunnableLambda._ainvoke(self, input, run_manager, config, **kwargs)
4428 output = chunk
4429 else:
-> 4430 output = await acall_func_with_variable_args(
4431 cast(Callable, afunc), input, config, run_manager, **kwargs
4432 )
4433 # If the output is a Runnable, invoke it
4434 if isinstance(output, Runnable):
File /usr/local/lib/python3.12/site-packages/langchain_core/runnables/history.py:498, in RunnableWithMessageHistory._aenter_history(self, input, config)
494 async def _aenter_history(
495 self, input: Dict[str, Any], config: RunnableConfig
496 ) -> List[BaseMessage]:
497 hist: BaseChatMessageHistory = config["configurable"]["message_history"]
--> 498 messages = (await hist.aget_messages()).copy()
500 if not self.history_messages_key:
501 # return all messages
502 input_val = (
503 input if not self.input_messages_key else input[self.input_messages_key]
504 )
AttributeError: 'coroutine' object has no attribute 'aget_messages'
The error indicates that 'coroutine' object has no attribute 'aget_messages', which suggests the asynchronous method handling might not be correctly invoked.
langchain==0.2.8
langchain-community==0.2.5
langchain-core==0.2.41
langchain-openai==0.1.8
langchain-postgres==0.0.7
Metadata
Metadata
Assignees
Labels
No labels