Skip to content

feat: create hybrid search capable vector store table [2/N] #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: hybrid_search_1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion langchain_postgres/v2/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from .hybrid_search_config import HybridSearchConfig

T = TypeVar("T")


Expand Down Expand Up @@ -156,6 +158,7 @@ async def _ainit_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -178,6 +181,8 @@ async def _ainit_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Default: None.

Raises:
:class:`DuplicateTableError <asyncpg.exceptions.DuplicateTableError>`: if table already exists.
Expand All @@ -186,6 +191,7 @@ async def _ainit_vectorstore_table(

schema_name = self._escape_postgres_identifier(schema_name)
table_name = self._escape_postgres_identifier(table_name)
hybrid_search_default_column_name = content_column + "_tsv"
content_column = self._escape_postgres_identifier(content_column)
embedding_column = self._escape_postgres_identifier(embedding_column)
if metadata_columns is None:
Expand Down Expand Up @@ -226,10 +232,22 @@ async def _ainit_vectorstore_table(
id_data_type = id_column["data_type"]
id_column_name = id_column["name"]

hybrid_search_column = "" # Default is no TSV column for hybrid search
if hybrid_search_config:
hybrid_search_column_name = (
hybrid_search_config.tsv_column or hybrid_search_default_column_name
)
hybrid_search_column_name = self._escape_postgres_identifier(
hybrid_search_column_name
)
hybrid_search_config.tsv_column = hybrid_search_column_name
hybrid_search_column = f',"{self._escape_postgres_identifier(hybrid_search_column_name)}" TSVECTOR NOT NULL'

query = f"""CREATE TABLE "{schema_name}"."{table_name}"(
"{id_column_name}" {id_data_type} PRIMARY KEY,
"{content_column}" TEXT NOT NULL,
"{embedding_column}" vector({vector_size}) NOT NULL"""
"{embedding_column}" vector({vector_size}) NOT NULL
{hybrid_search_column}"""
for column in metadata_columns:
if isinstance(column, Column):
nullable = "NOT NULL" if not column.nullable else ""
Expand Down Expand Up @@ -258,6 +276,7 @@ async def ainit_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -280,6 +299,8 @@ async def ainit_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Default: None.
"""
await self._run_as_async(
self._ainit_vectorstore_table(
Expand All @@ -293,6 +314,7 @@ async def ainit_vectorstore_table(
id_column=id_column,
overwrite_existing=overwrite_existing,
store_metadata=store_metadata,
hybrid_search_config=hybrid_search_config,
)
)

Expand All @@ -309,6 +331,7 @@ def init_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -331,6 +354,8 @@ def init_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Default: None.
"""
self._run_as_sync(
self._ainit_vectorstore_table(
Expand All @@ -344,6 +369,7 @@ def init_vectorstore_table(
id_column=id_column,
overwrite_existing=overwrite_existing,
store_metadata=store_metadata,
hybrid_search_config=hybrid_search_config,
)
)

Expand Down
71 changes: 63 additions & 8 deletions tests/unit_tests/v2/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
from sqlalchemy.pool import NullPool

from langchain_postgres import Column, PGEngine
from langchain_postgres.v2.hybrid_search_config import HybridSearchConfig
from tests.utils import VECTORSTORE_CONNECTION_STRING as CONNECTION_STRING

DEFAULT_TABLE = "default" + str(uuid.uuid4()).replace("-", "_")
CUSTOM_TABLE = "custom" + str(uuid.uuid4()).replace("-", "_")
HYBRID_SEARCH_TABLE = "hybrid" + str(uuid.uuid4()).replace("-", "_")
CUSTOM_TYPEDDICT_TABLE = "custom_td" + str(uuid.uuid4()).replace("-", "_")
INT_ID_CUSTOM_TABLE = "custom_int_id" + str(uuid.uuid4()).replace("-", "_")
DEFAULT_TABLE_SYNC = "default_sync" + str(uuid.uuid4()).replace("-", "_")
CUSTOM_TABLE_SYNC = "custom_sync" + str(uuid.uuid4()).replace("-", "_")
HYBRID_SEARCH_TABLE_SYNC = "hybrid_sync" + str(uuid.uuid4()).replace("-", "_")
CUSTOM_TYPEDDICT_TABLE_SYNC = "custom_td_sync" + str(uuid.uuid4()).replace("-", "_")
INT_ID_CUSTOM_TABLE_SYNC = "custom_int_id_sync" + str(uuid.uuid4()).replace("-", "_")
VECTOR_SIZE = 768
Expand Down Expand Up @@ -68,10 +71,11 @@ async def engine(self) -> AsyncIterator[PGEngine]:
engine = PGEngine.from_connection_string(url=CONNECTION_STRING, **kwargs)

yield engine
await aexecute(engine, f'DROP TABLE "{CUSTOM_TABLE}"')
await aexecute(engine, f'DROP TABLE "{CUSTOM_TYPEDDICT_TABLE}"')
await aexecute(engine, f'DROP TABLE "{DEFAULT_TABLE}"')
await aexecute(engine, f'DROP TABLE "{INT_ID_CUSTOM_TABLE}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_TABLE}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{HYBRID_SEARCH_TABLE}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_TYPEDDICT_TABLE}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{DEFAULT_TABLE}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{INT_ID_CUSTOM_TABLE}"')
await engine.close()

async def test_init_table(self, engine: PGEngine) -> None:
Expand Down Expand Up @@ -110,6 +114,31 @@ async def test_init_table_custom(self, engine: PGEngine) -> None:
for row in results:
assert row in expected

async def test_init_table_hybrid_search(self, engine: PGEngine) -> None:
await engine.ainit_vectorstore_table(
HYBRID_SEARCH_TABLE,
VECTOR_SIZE,
id_column="uuid",
content_column="my-content",
embedding_column="my_embedding",
metadata_columns=[Column("page", "TEXT"), Column("source", "TEXT")],
store_metadata=True,
hybrid_search_config=HybridSearchConfig(),
)
stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{HYBRID_SEARCH_TABLE}';"
results = await afetch(engine, stmt)
expected = [
{"column_name": "uuid", "data_type": "uuid"},
{"column_name": "my_embedding", "data_type": "USER-DEFINED"},
{"column_name": "langchain_metadata", "data_type": "json"},
{"column_name": "my-content", "data_type": "text"},
{"column_name": "my-content_tsv", "data_type": "tsvector"},
{"column_name": "page", "data_type": "text"},
{"column_name": "source", "data_type": "text"},
]
for row in results:
assert row in expected

async def test_invalid_typed_dict(self, engine: PGEngine) -> None:
with pytest.raises(TypeError):
await engine.ainit_vectorstore_table(
Expand Down Expand Up @@ -230,10 +259,11 @@ class TestEngineSync:
async def engine(self) -> AsyncIterator[PGEngine]:
engine = PGEngine.from_connection_string(url=CONNECTION_STRING)
yield engine
await aexecute(engine, f'DROP TABLE "{CUSTOM_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE "{DEFAULT_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE "{INT_ID_CUSTOM_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE "{CUSTOM_TYPEDDICT_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{HYBRID_SEARCH_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{DEFAULT_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{INT_ID_CUSTOM_TABLE_SYNC}"')
await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_TYPEDDICT_TABLE_SYNC}"')
await engine.close()

async def test_init_table(self, engine: PGEngine) -> None:
Expand Down Expand Up @@ -269,6 +299,31 @@ async def test_init_table_custom(self, engine: PGEngine) -> None:
for row in results:
assert row in expected

async def test_init_table_hybrid_search(self, engine: PGEngine) -> None:
engine.init_vectorstore_table(
HYBRID_SEARCH_TABLE_SYNC,
VECTOR_SIZE,
id_column="uuid",
content_column="my-content",
embedding_column="my_embedding",
metadata_columns=[Column("page", "TEXT"), Column("source", "TEXT")],
store_metadata=True,
hybrid_search_config=HybridSearchConfig(),
)
stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{HYBRID_SEARCH_TABLE_SYNC}';"
results = await afetch(engine, stmt)
expected = [
{"column_name": "uuid", "data_type": "uuid"},
{"column_name": "my_embedding", "data_type": "USER-DEFINED"},
{"column_name": "langchain_metadata", "data_type": "json"},
{"column_name": "my-content", "data_type": "text"},
{"column_name": "my-content_tsv", "data_type": "tsvector"},
{"column_name": "page", "data_type": "text"},
{"column_name": "source", "data_type": "text"},
]
for row in results:
assert row in expected

async def test_invalid_typed_dict(self, engine: PGEngine) -> None:
with pytest.raises(TypeError):
engine.init_vectorstore_table(
Expand Down