跳到内容

Cassandra

Zapier

装饰器

事件

重试策略

工作流程

开源社区

集成

全栈项目 社区常见问题 社区常见问题 大型语言模型
向量数据库 贡献

贡献

文档
更新日志 演示文稿

已废弃术语

文档
LlamaCloud (可选, Session)

要使用的 Cassandra 会话。可以省略,或等效地设置为 None,以使用事先通过 cassio.init() 全局设置的数据库连接。

键空间 贡献

要在其中工作的 Cassandra 键空间的名称。可以省略,或等效地设置为 None,以使用事先通过 cassio.init() 全局设置的数据库连接。

ttl_秒数 (可选, int)

插入条目的过期时间。默认为无过期 (None)。

插入批次大小 (可选, int)

批量插入时同时插入的向量数量。默认为 20。

DEFAULT_INSERTION_BATCH_SIZE

示例

pip install llama-index-vector-stores-cassandra

from llama_index.vector_stores.cassandra import CassandraVectorStore

vector_store = CassandraVectorStore(
    table="cass_v_table", embedding_dimension=1536
)
源代码位于 llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class CassandraVectorStore(BasePydanticVectorStore):
    """
    Cassandra Vector Store.

    An abstraction of a Cassandra table with
    vector-similarity-search. Documents, and their embeddings, are stored
    in a Cassandra table and a vector-capable index is used for searches.
    The table does not need to exist beforehand: if necessary it will
    be created behind the scenes.

    All Cassandra operations are done through the CassIO library.

    Note: in recent versions, only `table` and `embedding_dimension` can be
    passed positionally. Please revise your code if needed.
    This is to accommodate for a leaner usage, whereby the DB connection
    is set globally through a `cassio.init(...)` call: then, the DB details
    are not to be specified anymore when creating a vector store, unless
    desired.

    Args:
        table (str): table name to use. If not existing, it will be created.
        embedding_dimension (int): length of the embedding vectors in use.
        session (optional, cassandra.cluster.Session): the Cassandra session
            to use.
            Can be omitted, or equivalently set to None, to use the
            DB connection set globally through cassio.init() beforehand.
        keyspace (optional. str): name of the Cassandra keyspace to work in
            Can be omitted, or equivalently set to None, to use the
            DB connection set globally through cassio.init() beforehand.
        ttl_seconds (optional, int): expiration time for inserted entries.
            Default is no expiration (None).
        insertion_batch_size (optional, int): how many vectors are inserted
            concurrently, for use by bulk inserts. Defaults to 20.

    Examples:
        `pip install llama-index-vector-stores-cassandra`

        ```python
        from llama_index.vector_stores.cassandra import CassandraVectorStore

        vector_store = CassandraVectorStore(
            table="cass_v_table", embedding_dimension=1536
        )
        ```

    """

    stores_text: bool = True
    flat_metadata: bool = True

    _session: Optional[Any] = PrivateAttr()
    _keyspace: Optional[Any] = PrivateAttr()
    _table: str = PrivateAttr()
    _embedding_dimension: int = PrivateAttr()
    _ttl_seconds: Optional[int] = PrivateAttr()
    _insertion_batch_size: int = PrivateAttr()
    _vector_table: ClusteredMetadataVectorCassandraTable = PrivateAttr()

    def __init__(
        self,
        table: str,
        embedding_dimension: int,
        *,
        session: Optional[Any] = None,
        keyspace: Optional[str] = None,
        ttl_seconds: Optional[int] = None,
        insertion_batch_size: int = DEFAULT_INSERTION_BATCH_SIZE,
    ) -> None:
        super().__init__()

        self._session = session
        self._keyspace = keyspace
        self._table = table
        self._embedding_dimension = embedding_dimension
        self._ttl_seconds = ttl_seconds
        self._insertion_batch_size = insertion_batch_size

        _logger.debug("Creating the Cassandra table")
        self._vector_table = ClusteredMetadataVectorCassandraTable(
            session=self._session,
            keyspace=self._keyspace,
            table=self._table,
            vector_dimension=self._embedding_dimension,
            primary_key_type=["TEXT", "TEXT"],
            # a conservative choice here, to make everything searchable
            # except the bulky "_node_content" key (it'd make little sense to):
            metadata_indexing=("default_to_searchable", ["_node_content"]),
        )

    def add(
        self,
        nodes: List[BaseNode],
        **add_kwargs: Any,
    ) -> List[str]:
        """
        Add nodes to index.

        Args:
            nodes: List[BaseNode]: list of node with embeddings

        """
        node_ids = []
        node_contents = []
        node_metadatas = []
        node_embeddings = []
        for node in nodes:
            metadata = node_to_metadata_dict(
                node,
                remove_text=True,
                flat_metadata=self.flat_metadata,
            )
            node_ids.append(node.node_id)
            node_contents.append(node.get_content(metadata_mode=MetadataMode.NONE))
            node_metadatas.append(metadata)
            node_embeddings.append(node.get_embedding())

        _logger.debug(f"Adding {len(node_ids)} rows to table")
        # Concurrent batching of inserts:
        insertion_tuples = zip(node_ids, node_contents, node_metadatas, node_embeddings)
        for insertion_batch in _batch_iterable(
            insertion_tuples, batch_size=self._insertion_batch_size
        ):
            futures = []
            for (
                node_id,
                node_content,
                node_metadata,
                node_embedding,
            ) in insertion_batch:
                node_ref_doc_id = node_metadata["ref_doc_id"]
                futures.append(
                    self._vector_table.put_async(
                        row_id=node_id,
                        body_blob=node_content,
                        vector=node_embedding,
                        metadata=node_metadata,
                        partition_id=node_ref_doc_id,
                        ttl_seconds=self._ttl_seconds,
                    )
                )
            for future in futures:
                _ = future.result()

        return node_ids

    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """
        Delete nodes using with ref_doc_id.

        Args:
            ref_doc_id (str): The doc_id of the document to delete.

        """
        _logger.debug("Deleting a document from the Cassandra table")
        self._vector_table.delete_partition(
            partition_id=ref_doc_id,
        )

    @property
    def client(self) -> Any:
        """Return the underlying cassIO vector table object."""
        return self._vector_table

    @staticmethod
    def _query_filters_to_dict(query_filters: MetadataFilters) -> Dict[str, Any]:
        if any(
            not isinstance(f, ExactMatchFilter) for f in query_filters.legacy_filters()
        ):
            raise NotImplementedError("Only `ExactMatchFilter` filters are supported")
        return {f.key: f.value for f in query_filters.filters}

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """
        Query index for top k most similar nodes.

        Supported query modes: 'default' (most similar vectors) and 'mmr'.

        Args:
            query (VectorStoreQuery): the basic query definition. Defines:
                mode (VectorStoreQueryMode): one of the supported modes
                query_embedding (List[float]): query embedding to search against
                similarity_top_k (int): top k most similar nodes
                mmr_threshold (Optional[float]): this is the 0-to-1 MMR lambda.
                    If present, takes precedence over the kwargs parameter.
                    Ignored unless for MMR queries.

        Args for query.mode == 'mmr' (ignored otherwise):
            mmr_threshold (Optional[float]): this is the 0-to-1 lambda for MMR.
                Note that in principle mmr_threshold could come in the query
            mmr_prefetch_factor (Optional[float]): factor applied to top_k
                for prefetch pool size. Defaults to 4.0
            mmr_prefetch_k (Optional[int]): prefetch pool size. This cannot be
                passed together with mmr_prefetch_factor

        """
        _available_query_modes = [
            VectorStoreQueryMode.DEFAULT,
            VectorStoreQueryMode.MMR,
        ]
        if query.mode not in _available_query_modes:
            raise NotImplementedError(f"Query mode {query.mode} not available.")
        #
        query_embedding = cast(List[float], query.query_embedding)

        # metadata filtering
        if query.filters is not None:
            # raise NotImplementedError("No metadata filtering yet")
            query_metadata = self._query_filters_to_dict(query.filters)
        else:
            query_metadata = {}

        _logger.debug(
            f"Running ANN search on the Cassandra table (query mode: {query.mode})"
        )
        if query.mode == VectorStoreQueryMode.DEFAULT:
            matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=query.similarity_top_k,
                    metric="cos",
                    metric_threshold=None,
                    metadata=query_metadata,
                )
            )
            top_k_scores = [match["distance"] for match in matches]
        elif query.mode == VectorStoreQueryMode.MMR:
            # Querying a larger number of vectors and then doing MMR on them.
            if (
                kwargs.get("mmr_prefetch_factor") is not None
                and kwargs.get("mmr_prefetch_k") is not None
            ):
                raise ValueError(
                    "'mmr_prefetch_factor' and 'mmr_prefetch_k' "
                    "cannot coexist in a call to query()"
                )
            else:
                if kwargs.get("mmr_prefetch_k") is not None:
                    prefetch_k0 = int(kwargs["mmr_prefetch_k"])
                else:
                    prefetch_k0 = int(
                        query.similarity_top_k
                        * kwargs.get("mmr_prefetch_factor", DEFAULT_MMR_PREFETCH_FACTOR)
                    )
            prefetch_k = max(prefetch_k0, query.similarity_top_k)
            #
            prefetch_matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=prefetch_k,
                    metric="cos",
                    metric_threshold=None,  # this is not `mmr_threshold`
                    metadata=query_metadata,
                )
            )
            #
            mmr_threshold = query.mmr_threshold or kwargs.get("mmr_threshold")
            if prefetch_matches:
                pf_match_indices, pf_match_embeddings = zip(
                    *enumerate(match["vector"] for match in prefetch_matches)
                )
            else:
                pf_match_indices, pf_match_embeddings = [], []
            pf_match_indices = list(pf_match_indices)
            pf_match_embeddings = list(pf_match_embeddings)
            mmr_similarities, mmr_indices = get_top_k_mmr_embeddings(
                query_embedding,
                pf_match_embeddings,
                similarity_top_k=query.similarity_top_k,
                embedding_ids=pf_match_indices,
                mmr_threshold=mmr_threshold,
            )
            #
            matches = [prefetch_matches[mmr_index] for mmr_index in mmr_indices]
            top_k_scores = mmr_similarities

        top_k_nodes = []
        top_k_ids = []
        for match in matches:
            node = metadata_dict_to_node(match["metadata"])
            node.set_content(match["body_blob"])
            top_k_nodes.append(node)
            top_k_ids.append(match["row_id"])

        return VectorStoreQueryResult(
            nodes=top_k_nodes,
            similarities=top_k_scores,
            ids=top_k_ids,
        )

client 属性 #

client: Any

返回底层的 cassIO 向量表对象。

add #

add(nodes: List[BaseNode], **add_kwargs: Any) -> List[str]

将节点添加到索引。

集成

全栈项目 社区常见问题 社区常见问题 大型语言模型
节点 List[BaseNode]

List[BaseNode]:带嵌入的节点列表

文档
源代码位于 llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def add(
    self,
    nodes: List[BaseNode],
    **add_kwargs: Any,
) -> List[str]:
    """
    Add nodes to index.

    Args:
        nodes: List[BaseNode]: list of node with embeddings

    """
    node_ids = []
    node_contents = []
    node_metadatas = []
    node_embeddings = []
    for node in nodes:
        metadata = node_to_metadata_dict(
            node,
            remove_text=True,
            flat_metadata=self.flat_metadata,
        )
        node_ids.append(node.node_id)
        node_contents.append(node.get_content(metadata_mode=MetadataMode.NONE))
        node_metadatas.append(metadata)
        node_embeddings.append(node.get_embedding())

    _logger.debug(f"Adding {len(node_ids)} rows to table")
    # Concurrent batching of inserts:
    insertion_tuples = zip(node_ids, node_contents, node_metadatas, node_embeddings)
    for insertion_batch in _batch_iterable(
        insertion_tuples, batch_size=self._insertion_batch_size
    ):
        futures = []
        for (
            node_id,
            node_content,
            node_metadata,
            node_embedding,
        ) in insertion_batch:
            node_ref_doc_id = node_metadata["ref_doc_id"]
            futures.append(
                self._vector_table.put_async(
                    row_id=node_id,
                    body_blob=node_content,
                    vector=node_embedding,
                    metadata=node_metadata,
                    partition_id=node_ref_doc_id,
                    ttl_seconds=self._ttl_seconds,
                )
            )
        for future in futures:
            _ = future.result()

    return node_ids

delete #

delete(ref_doc_id: str, **delete_kwargs: Any) -> None

使用 ref_doc_id 删除节点。

集成

全栈项目 社区常见问题 社区常见问题 大型语言模型
ref_doc_id 贡献

要删除文档的 doc_id。

文档
源代码位于 llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
195
196
197
198
199
200
201
202
203
204
205
206
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
    """
    Delete nodes using with ref_doc_id.

    Args:
        ref_doc_id (str): The doc_id of the document to delete.

    """
    _logger.debug("Deleting a document from the Cassandra table")
    self._vector_table.delete_partition(
        partition_id=ref_doc_id,
    )

query #

query(query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult

查询索引以获取前 k 个最相似的节点。

支持的查询模式:“default”(最相似向量)和“mmr”。

集成

全栈项目 社区常见问题 社区常见问题 大型语言模型
所有 Cassandra 操作均通过 CassIO 库完成。 VectorStoreQuery

基本查询定义。定义: mode (VectorStoreQueryMode):支持的模式之一 query_embedding (List[float]):用于搜索的查询嵌入 similarity_top_k (int):前 k 个最相似的节点 mmr_threshold (Optional[float]):这是 0 到 1 的 MMR lambda。如果存在,优先于 kwargs 参数。除非用于 MMR 查询,否则忽略。

文档

query.mode == 'mmr' 的参数(否则忽略): mmr_threshold (Optional[float]):这是 MMR 的 0 到 1 lambda。请注意,原则上 mmr_threshold 可以来自查询 mmr_prefetch_factor (Optional[float]):应用于 top_k 以确定预取池大小的因子。默认为 4.0 mmr_prefetch_k (Optional[int]):预取池大小。不能与 mmr_prefetch_factor 一起传递

源代码位于 llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
    """
    Query index for top k most similar nodes.

    Supported query modes: 'default' (most similar vectors) and 'mmr'.

    Args:
        query (VectorStoreQuery): the basic query definition. Defines:
            mode (VectorStoreQueryMode): one of the supported modes
            query_embedding (List[float]): query embedding to search against
            similarity_top_k (int): top k most similar nodes
            mmr_threshold (Optional[float]): this is the 0-to-1 MMR lambda.
                If present, takes precedence over the kwargs parameter.
                Ignored unless for MMR queries.

    Args for query.mode == 'mmr' (ignored otherwise):
        mmr_threshold (Optional[float]): this is the 0-to-1 lambda for MMR.
            Note that in principle mmr_threshold could come in the query
        mmr_prefetch_factor (Optional[float]): factor applied to top_k
            for prefetch pool size. Defaults to 4.0
        mmr_prefetch_k (Optional[int]): prefetch pool size. This cannot be
            passed together with mmr_prefetch_factor

    """
    _available_query_modes = [
        VectorStoreQueryMode.DEFAULT,
        VectorStoreQueryMode.MMR,
    ]
    if query.mode not in _available_query_modes:
        raise NotImplementedError(f"Query mode {query.mode} not available.")
    #
    query_embedding = cast(List[float], query.query_embedding)

    # metadata filtering
    if query.filters is not None:
        # raise NotImplementedError("No metadata filtering yet")
        query_metadata = self._query_filters_to_dict(query.filters)
    else:
        query_metadata = {}

    _logger.debug(
        f"Running ANN search on the Cassandra table (query mode: {query.mode})"
    )
    if query.mode == VectorStoreQueryMode.DEFAULT:
        matches = list(
            self._vector_table.metric_ann_search(
                vector=query_embedding,
                n=query.similarity_top_k,
                metric="cos",
                metric_threshold=None,
                metadata=query_metadata,
            )
        )
        top_k_scores = [match["distance"] for match in matches]
    elif query.mode == VectorStoreQueryMode.MMR:
        # Querying a larger number of vectors and then doing MMR on them.
        if (
            kwargs.get("mmr_prefetch_factor") is not None
            and kwargs.get("mmr_prefetch_k") is not None
        ):
            raise ValueError(
                "'mmr_prefetch_factor' and 'mmr_prefetch_k' "
                "cannot coexist in a call to query()"
            )
        else:
            if kwargs.get("mmr_prefetch_k") is not None:
                prefetch_k0 = int(kwargs["mmr_prefetch_k"])
            else:
                prefetch_k0 = int(
                    query.similarity_top_k
                    * kwargs.get("mmr_prefetch_factor", DEFAULT_MMR_PREFETCH_FACTOR)
                )
        prefetch_k = max(prefetch_k0, query.similarity_top_k)
        #
        prefetch_matches = list(
            self._vector_table.metric_ann_search(
                vector=query_embedding,
                n=prefetch_k,
                metric="cos",
                metric_threshold=None,  # this is not `mmr_threshold`
                metadata=query_metadata,
            )
        )
        #
        mmr_threshold = query.mmr_threshold or kwargs.get("mmr_threshold")
        if prefetch_matches:
            pf_match_indices, pf_match_embeddings = zip(
                *enumerate(match["vector"] for match in prefetch_matches)
            )
        else:
            pf_match_indices, pf_match_embeddings = [], []
        pf_match_indices = list(pf_match_indices)
        pf_match_embeddings = list(pf_match_embeddings)
        mmr_similarities, mmr_indices = get_top_k_mmr_embeddings(
            query_embedding,
            pf_match_embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=pf_match_indices,
            mmr_threshold=mmr_threshold,
        )
        #
        matches = [prefetch_matches[mmr_index] for mmr_index in mmr_indices]
        top_k_scores = mmr_similarities

    top_k_nodes = []
    top_k_ids = []
    for match in matches:
        node = metadata_dict_to_node(match["metadata"])
        node.set_content(match["body_blob"])
        top_k_nodes.append(node)
        top_k_ids.append(match["row_id"])

    return VectorStoreQueryResult(
        nodes=top_k_nodes,
        similarities=top_k_scores,
        ids=top_k_ids,
    )