classNileVectorStore(BasePydanticVectorStore):""" Nile (Multi-tenant Postgres) Vector Store. Examples: `pip install llama-index-vector-stores-nile` ```python from llama_index.vector_stores.nile import NileVectorStore # Create NileVectorStore instance vector_store = NileVectorStore.from_params( service_url="postgresql://user:[email protected]:5432/niledb", table_name="test_table", tenant_aware=True, num_dimensions=1536 ) ``` """stores_text:bool=Trueflat_metadata:bool=Falseservice_url:strtable_name:strnum_dimensions:inttenant_aware:bool_sync_conn:Any=PrivateAttr()_async_conn:Any=PrivateAttr()def_create_clients(self)->None:self._sync_conn=psycopg.connect(self.service_url)self._async_conn=psycopg.connect(self.service_url)def_create_tables(self)->None:_logger.info(f"Creating tables for {self.table_name} with {self.num_dimensions} dimensions")withself._sync_conn.cursor()ascursor:ifself.tenant_aware:query=sql.SQL(""" CREATE TABLE IF NOT EXISTS {table_name} (id UUID DEFAULT (gen_random_uuid()), tenant_id UUID, embedding VECTOR({num_dimensions}), content TEXT, metadata JSONB) """).format(table_name=sql.Identifier(self.table_name),num_dimensions=sql.Literal(self.num_dimensions),)cursor.execute(query)else:query=sql.SQL(""" CREATE TABLE IF NOT EXISTS {table_name} (id UUID DEFAULT (gen_random_uuid()), embedding VECTOR({num_dimensions}), content TEXT, metadata JSONB) """).format(table_name=sql.Identifier(self.table_name),num_dimensions=sql.Literal(self.num_dimensions),)cursor.execute(query)self._sync_conn.commit()def__init__(self,service_url:str,table_name:str,tenant_aware:bool=False,num_dimensions:int=DEFAULT_EMBEDDING_DIM,)->None:super().__init__(service_url=service_url,table_name=table_name,num_dimensions=num_dimensions,tenant_aware=tenant_aware,)self._create_clients()self._create_tables()@classmethoddefclass_name(cls)->str:return"NileVectorStore"@propertydefclient(self)->Any:returnself._sync_connasyncdefclose(self)->None:self._sync_conn.close()awaitself._async_conn.close()@classmethoddeffrom_params(cls,service_url:str,table_name:str,tenant_aware:bool=False,num_dimensions:int=DEFAULT_EMBEDDING_DIM,)->"NileVectorStore":returncls(service_url=service_url,table_name=table_name,tenant_aware=tenant_aware,num_dimensions=num_dimensions,)# We extract tenant_id from the node metadata.def_node_to_row(self,node:BaseNode)->Any:metadata=node_to_metadata_dict(node,remove_text=True,flat_metadata=self.flat_metadata,)tenant_id=node.metadata.get("tenant_id",None)return[tenant_id,metadata,node.get_content(metadata_mode=MetadataMode.NONE),node.embedding,]def_insert_row(self,cursor:Any,row:Any)->str:_logger.debug(f"Inserting row into {self.table_name} with tenant_id {row[0]}")ifself.tenant_aware:ifrow[0]isNone:# Nile would fail the insert itself, but this saves the DB call and easier to testraiseValueError("tenant_id cannot be None if tenant_aware is True")query=sql.SQL(""" INSERT INTO {} (tenant_id, metadata, content, embedding) VALUES (%(tenant_id)s, %(metadata)s, %(content)s, %(embedding)s) returning id """).format(sql.Identifier(self.table_name))cursor.execute(query,{"tenant_id":row[0],"metadata":json.dumps(row[1]),"content":row[2],"embedding":row[3],},)else:query=sql.SQL(""" INSERT INTO {} (metadata, content, embedding) VALUES (%(metadata)s, %(content)s, %(embedding)s) returning id """).format(sql.Identifier(self.table_name))cursor.execute(query,{"metadata":json.dumps(row[0]),"content":row[1],"embedding":row[2],},)id=cursor.fetchone()[0]self._sync_conn.commit()returniddefadd(self,nodes:List[BaseNode],**add_kwargs:Any)->List[str]:rows_to_insert=[self._node_to_row(node)fornodeinnodes]ids=[]withself._sync_conn.cursor()ascursor:forrowinrows_to_insert:# this will throw an error if tenant_id is None and tenant_aware is True, which is what we wantids.append(self._insert_row(cursor,row))# commit is called in _insert_rowreturnidsasyncdefasync_add(self,nodes:List[BaseNode],**add_kwargs:Any)->List[str]:rows_to_insert=[self._node_to_row(node)fornodeinnodes]ids=[]asyncwithself._async_conn.cursor()ascursor:forrowinrows_to_insert:ids.append(self._insert_row(cursor,row))awaitself._async_conn.commit()returnidsdef_set_tenant_context(self,cursor:Any,tenant_id:Any)->None:ifself.tenant_aware:cursor.execute(sql.SQL(""" set local nile.tenant_id = {} """).format(sql.Literal(tenant_id)))def_to_postgres_operator(self,operator:FilterOperator)->str:ifoperator==FilterOperator.EQ:return"="elifoperator==FilterOperator.GT:return">"elifoperator==FilterOperator.LT:return"<"elifoperator==FilterOperator.NE:return"!="elifoperator==FilterOperator.GTE:return">="elifoperator==FilterOperator.LTE:return"<="elifoperator==FilterOperator.IN:return"IN"elifoperator==FilterOperator.NIN:return"NOT IN"elifoperator==FilterOperator.CONTAINS:return"@>"elifoperator==FilterOperator.TEXT_MATCH:return"LIKE"elifoperator==FilterOperator.TEXT_MATCH_INSENSITIVE:return"ILIKE"else:_logger.warning(f"Unknown operator: {operator}, fallback to '='")return"="def_create_where_clause(self,filters:MetadataFilters)->Tuple[sql.SQL,dict]:where_clauses=[]params={}param_counter=0iffiltersisNone:returnsql.SQL(""),params_logger.debug(f"Filters: {filters}")forfilterinfilters.filters:param_counter+=1param_name=f"param_{param_counter}"ifisinstance(filter,MetadataFilters):raiseValueError("Nested MetadataFilters are not supported yet")ifisinstance(filter,MetadataFilter):key_param=f"key_{param_counter}"params[key_param]=filter.keyiffilter.operatorin[FilterOperator.IN,FilterOperator.NIN]:params[param_name]=filter.valuewhere_clauses.append(sql.SQL("metadata->>%({})s {} %({})s").format(sql.Identifier(key_param),sql.SQL(self._to_postgres_operator(filter.operator)),sql.Identifier(param_name),))eliffilter.operatorin[FilterOperator.CONTAINS]:params[param_name]=filter.valuewhere_clauses.append(sql.SQL("metadata->%({})s @> %({})s::jsonb").format(sql.Identifier(key_param),sql.Identifier(param_name)))elif(filter.operator==FilterOperator.TEXT_MATCHorfilter.operator==FilterOperator.TEXT_MATCH_INSENSITIVE):# Safely handle text match operationsparams[param_name]=f"%{filter.value}%"# Add wildcards in parameter, not in SQLwhere_clauses.append(sql.SQL("metadata->>%({})s {} %({})s").format(sql.Identifier(key_param),sql.SQL(self._to_postgres_operator(filter.operator)),sql.Identifier(param_name),))else:params[param_name]=filter.valuewhere_clauses.append(sql.SQL("metadata->>%({})s {} %({})s").format(sql.Identifier(key_param),sql.SQL(self._to_postgres_operator(filter.operator)),sql.Identifier(param_name),))_logger.debug(f"Where clauses: {where_clauses}")iflen(where_clauses)==0:returnsql.SQL(""),paramselse:# Ensure the condition is either 'AND' or 'OR'safe_condition="AND"ifhasattr(filters,"condition")andfilters.condition.upper()in["AND","OR",]:safe_condition=filters.condition.upper()return(sql.SQL(" WHERE {}").format(sql.SQL(f" {safe_condition} ").join(where_clauses)),params,)def_execute_query(self,cursor:Any,query_embedding:VectorStoreQuery,tenant_id:Any=None,ivfflat_probes:Any=None,hnsw_ef_search:Any=None,)->List[Any]:_logger.info(f"Querying {self.table_name} with tenant_id {tenant_id}")self._set_tenant_context(cursor,tenant_id)ifivfflat_probesisnotNone:cursor.execute(sql.SQL("""SET ivfflat.probes = {}""").format(sql.Literal(ivfflat_probes)))ifhnsw_ef_searchisnotNone:cursor.execute(sql.SQL("""SET hnsw.ef_search = {}""").format(sql.Literal(hnsw_ef_search)))where_clause,where_params=self._create_where_clause(query_embedding.filters)query_params={"query_embedding":query_embedding.query_embedding,**where_params,# Merge the where clause parameters}query=sql.SQL(""" SELECT id, metadata, content, %(query_embedding)s::vector<=>embedding as distance FROM {table_name} {where_clause} ORDER BY distance LIMIT {limit} """).format(table_name=sql.Identifier(self.table_name),where_clause=where_clause,limit=sql.Literal(query_embedding.similarity_top_k),)cursor.execute(query,query_params)returncursor.fetchall()def_process_query_results(self,results:List[Any])->VectorStoreQueryResult:nodes=[]similarities=[]ids=[]forrowinresults:node=metadata_dict_to_node(row[1])node.set_content(row[2])nodes.append(node)similarities.append(row[3])ids.append(row[0])returnVectorStoreQueryResult(nodes=nodes,similarities=similarities,ids=ids)# NOTE: Maybe handle tenant_id specified in filter vs. kwargs# NOTE: Add support for additional query modesdefquery(self,query_embedding:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:# get and validate tenant_idtenant_id=kwargs.get("tenant_id")ivfflat_probes=kwargs.get("ivfflat_probes")hnsw_ef_search=kwargs.get("hnsw_ef_search")ifself.tenant_awareandtenant_idisNone:raiseValueError("tenant_id must be specified in kwargs if tenant_aware is True")# check query modeifquery_embedding.mode!=VectorStoreQueryMode.DEFAULT:raiseValueError("Only DEFAULT mode is currently supported")# querywithself._sync_conn.cursor()ascursor:self._set_tenant_context(cursor,tenant_id)results=self._execute_query(cursor,query_embedding,tenant_id,ivfflat_probes,hnsw_ef_search)self._sync_conn.commit()returnself._process_query_results(results)asyncdefaquery(self,query_embedding:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:tenant_id=kwargs.get("tenant_id")ifself.tenant_awareandtenant_idisNone:raiseValueError("tenant_id must be specified in kwargs if tenant_aware is True")asyncwithself._async_conn.cursor()ascursor:results=self._execute_query(cursor,query_embedding,tenant_id)awaitself._async_conn.commit()returnself._process_query_results(results)defcreate_tenant(self,tenant_name:str)->uuid.UUID:""" Create a new tenant and return the tenant_id. Parameters ---------- tenant_name (str): The name of the tenant to create. Returns ------- tenant_id (uuid.UUID): The id of the newly created tenant. """withself._sync_conn.cursor()ascursor:cursor.execute(""" INSERT INTO tenants (name) VALUES (%(tenant_name)s) returning id """,{"tenant_name":tenant_name},)tenant_id=cursor.fetchone()[0]self._sync_conn.commit()returntenant_iddefcreate_index(self,index_type:IndexType,**kwargs:Any)->None:""" Create an index of the specified type. Run this after populating the table. We intentionally throw an error if the index already exists. Since you may want to try a different type or parameters, we recommend dropping the index first. Parameters ---------- index_type (IndexType): The type of index to create. m (optional int): The number of neighbors to consider during construction for PGVECTOR_HSNW index. ef_construction (optional int): The construction parameter for PGVECTOR_HSNW index. nlists (optional int): The number of lists for PGVECTOR_IVFFLAT index. """_logger.info(f"Creating index of type {index_type} for {self.table_name}")ifindex_type==IndexType.PGVECTOR_HNSW:m=kwargs.get("m")ef_construction=kwargs.get("ef_construction")ifmisNoneoref_constructionisNone:raiseValueError("m and ef_construction must be specified in kwargs for PGVECTOR_HSNW index")query=sql.SQL(""" CREATE INDEX {index_name} ON {table_name} USING hnsw (embedding vector_cosine_ops) WITH (m = {m}, ef_construction = {ef_construction}); """).format(table_name=sql.Identifier(self.table_name),index_name=sql.Identifier(f"{self.table_name}_embedding_idx"),m=sql.Literal(m),ef_construction=sql.Literal(ef_construction),)withself._sync_conn.cursor()ascursor:try:cursor.execute(query)self._sync_conn.commit()exceptpsycopg.errors.DuplicateTable:self._sync_conn.rollback()raisepsycopg.errors.DuplicateTable(f"Index {self.table_name}_embedding_idx already exists")elifindex_type==IndexType.PGVECTOR_IVFFLAT:nlists=kwargs.get("nlists")ifnlistsisNone:raiseValueError("nlist must be specified in kwargs for PGVECTOR_IVFFLAT index")query=sql.SQL(""" CREATE INDEX {index_name} ON {table_name} USING ivfflat (embedding vector_cosine_ops) WITH (lists = {nlists}); """).format(table_name=sql.Identifier(self.table_name),index_name=sql.Identifier(f"{self.table_name}_embedding_idx"),nlists=sql.Literal(nlists),)withself._sync_conn.cursor()ascursor:try:cursor.execute(query)self._sync_conn.commit()exceptpsycopg.errors.DuplicateTable:self._sync_conn.rollback()raisepsycopg.errors.DuplicateTable(f"Index {self.table_name}_embedding_idx already exists")else:raiseValueError(f"Unknown index type: {index_type}")defdrop_index(self)->None:_logger.info(f"Dropping index for {self.table_name}")query=sql.SQL(""" DROP INDEX IF EXISTS {index_name}; """).format(index_name=sql.Identifier(f"{self.table_name}_embedding_idx"))withself._sync_conn.cursor()ascursor:cursor.execute(query)self._sync_conn.commit()defdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:tenant_id=delete_kwargs.get("tenant_id")_logger.info(f"Deleting document {ref_doc_id} with tenant_id {tenant_id}")ifself.tenant_awareandtenant_idisNone:raiseValueError("tenant_id must be specified in delete_kwargs if tenant_aware is True")withself._sync_conn.cursor()ascursor:self._set_tenant_context(cursor,tenant_id)cursor.execute(sql.SQL("DELETE FROM {} WHERE metadata->>'doc_id' = %(ref_doc_id)s").format(sql.Identifier(self.table_name)),{"ref_doc_id":ref_doc_id},)self._sync_conn.commit()asyncdefadelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:tenant_id=delete_kwargs.get("tenant_id")_logger.info(f"Deleting document {ref_doc_id} with tenant_id {tenant_id}")ifself.tenant_awareandtenant_idisNone:raiseValueError("tenant_id must be specified in delete_kwargs if tenant_aware is True")asyncwithself._async_conn.cursor()ascursor:self._set_tenant_context(cursor,tenant_id)cursor.execute(sql.SQL("DELETE FROM {} WHERE metadata->>'doc_id' = %(ref_doc_id)s").format(sql.Identifier(self.table_name)),{"ref_doc_id":ref_doc_id},)awaitself._async_conn.commit()
defcreate_tenant(self,tenant_name:str)->uuid.UUID:""" Create a new tenant and return the tenant_id. Parameters ---------- tenant_name (str): The name of the tenant to create. Returns ------- tenant_id (uuid.UUID): The id of the newly created tenant. """withself._sync_conn.cursor()ascursor:cursor.execute(""" INSERT INTO tenants (name) VALUES (%(tenant_name)s) returning id """,{"tenant_name":tenant_name},)tenant_id=cursor.fetchone()[0]self._sync_conn.commit()returntenant_id
index_type (IndexType): The type of index to create.
m (optional int): The number of neighbors to consider during construction for PGVECTOR_HSNW index.
ef_construction (optional int): The construction parameter for PGVECTOR_HSNW index.
nlists (optional int): The number of lists for PGVECTOR_IVFFLAT index.
defcreate_index(self,index_type:IndexType,**kwargs:Any)->None:""" Create an index of the specified type. Run this after populating the table. We intentionally throw an error if the index already exists. Since you may want to try a different type or parameters, we recommend dropping the index first. Parameters ---------- index_type (IndexType): The type of index to create. m (optional int): The number of neighbors to consider during construction for PGVECTOR_HSNW index. ef_construction (optional int): The construction parameter for PGVECTOR_HSNW index. nlists (optional int): The number of lists for PGVECTOR_IVFFLAT index. """_logger.info(f"Creating index of type {index_type} for {self.table_name}")ifindex_type==IndexType.PGVECTOR_HNSW:m=kwargs.get("m")ef_construction=kwargs.get("ef_construction")ifmisNoneoref_constructionisNone:raiseValueError("m and ef_construction must be specified in kwargs for PGVECTOR_HSNW index")query=sql.SQL(""" CREATE INDEX {index_name} ON {table_name} USING hnsw (embedding vector_cosine_ops) WITH (m = {m}, ef_construction = {ef_construction}); """).format(table_name=sql.Identifier(self.table_name),index_name=sql.Identifier(f"{self.table_name}_embedding_idx"),m=sql.Literal(m),ef_construction=sql.Literal(ef_construction),)withself._sync_conn.cursor()ascursor:try:cursor.execute(query)self._sync_conn.commit()exceptpsycopg.errors.DuplicateTable:self._sync_conn.rollback()raisepsycopg.errors.DuplicateTable(f"Index {self.table_name}_embedding_idx already exists")elifindex_type==IndexType.PGVECTOR_IVFFLAT:nlists=kwargs.get("nlists")ifnlistsisNone:raiseValueError("nlist must be specified in kwargs for PGVECTOR_IVFFLAT index")query=sql.SQL(""" CREATE INDEX {index_name} ON {table_name} USING ivfflat (embedding vector_cosine_ops) WITH (lists = {nlists}); """).format(table_name=sql.Identifier(self.table_name),index_name=sql.Identifier(f"{self.table_name}_embedding_idx"),nlists=sql.Literal(nlists),)withself._sync_conn.cursor()ascursor:try:cursor.execute(query)self._sync_conn.commit()exceptpsycopg.errors.DuplicateTable:self._sync_conn.rollback()raisepsycopg.errors.DuplicateTable(f"Index {self.table_name}_embedding_idx already exists")else:raiseValueError(f"Unknown index type: {index_type}")