跳到内容

Elasticsearch

ElasticsearchReader #

基类: BasePydanticReader

从 Elasticsearch/Opensearch 索引读取文档。

这些文档随后可在下游 Llama Index 数据结构中使用。

参数

名称 类型 描述 默认值
endpoint str

集群 URL (http/https)

必需
index str

索引名称 (必需)

必需
httpx_client_args dict

传递给 httpx.Client 的可选附加参数

源代码位于 llama-index-integrations/readers/llama-index-readers-elasticsearch/llama_index/readers/elasticsearch/base.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class ElasticsearchReader(BasePydanticReader):
    """
    Read documents from an Elasticsearch/Opensearch index.

    These documents can then be used in a downstream Llama Index data structure.

    Args:
        endpoint (str): URL (http/https) of cluster
        index (str): Name of the index (required)
        httpx_client_args (dict): Optional additional args to pass to the `httpx.Client`

    """

    is_remote: bool = True
    endpoint: str
    index: str
    httpx_client_args: Optional[dict] = None

    _client: Any = PrivateAttr()

    def __init__(
        self, endpoint: str, index: str, httpx_client_args: Optional[dict] = None
    ):
        """Initialize with parameters."""
        super().__init__(
            endpoint=endpoint, index=index, httpx_client_args=httpx_client_args
        )
        import_err_msg = """
            `httpx` package not found. Install via `pip install httpx`
        """
        try:
            import httpx
        except ImportError:
            raise ImportError(import_err_msg)
        self._client = httpx.Client(base_url=endpoint, **(httpx_client_args or {}))

    @classmethod
    def class_name(cls) -> str:
        return "ElasticsearchReader"

    def load_data(
        self,
        field: str,
        query: Optional[dict] = None,
        embedding_field: Optional[str] = None,
        metadata_fields: Optional[List[str]] = None,
    ) -> List[Document]:
        """
        Read data from the Elasticsearch index.

        Args:
            field (str): Field in the document to retrieve text from
            query (Optional[dict]): Elasticsearch JSON query DSL object.
                For example:
                {"query": {"match": {"message": {"query": "this is a test"}}}}
            embedding_field (Optional[str]): If there are embeddings stored in
                this index, this field can be used
                to set the embedding field on the returned Document list.
            metadata_fields (Optional[List[str]]): Fields used as metadata. Default
                is all fields in the document except those specified by the
                field and embedding_field parameters.

        Returns:
            List[Document]: A list of documents.

        """
        res = self._client.post(f"{self.index}/_search", json=query).json()
        documents = []
        for hit in res["hits"]["hits"]:
            doc_id = hit["_id"]
            value = hit["_source"][field]
            embedding = hit["_source"].get(embedding_field or "", None)
            if metadata_fields:
                metadata = {
                    k: v for k, v in hit["_source"].items() if k in metadata_fields
                }
            else:
                hit["_source"].pop(field)
                hit["_source"].pop(embedding_field or "", None)
                metadata = hit["_source"]
            documents.append(
                Document(id_=doc_id, text=value, metadata=metadata, embedding=embedding)
            )
        return documents

load_data #

load_data(field: str, query: Optional[dict] = None, embedding_field: Optional[str] = None, metadata_fields: Optional[List[str]] = None) -> List[Document]

从 Elasticsearch 索引读取数据。

参数

名称 类型 描述 默认值
field str

文档中用于检索文本的字段

必需
可选[dict] query

Elasticsearch JSON 查询 DSL 对象。例如:{"query": {"match": {"message": {"query": "this is a test"}}}}

embedding_field 可选[str]

如果此索引中存储了嵌入,此字段可用于设置返回的 Document 列表上的嵌入字段。

metadata_fields 可选[List[str]]

用作元数据的字段。默认值是文档中除 field 和 embedding_field 参数指定的字段外的所有字段。

返回值

类型 描述
List[Document]

List[Document]:文档列表。

源代码位于 llama-index-integrations/readers/llama-index-readers-elasticsearch/llama_index/readers/elasticsearch/base.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def load_data(
    self,
    field: str,
    query: Optional[dict] = None,
    embedding_field: Optional[str] = None,
    metadata_fields: Optional[List[str]] = None,
) -> List[Document]:
    """
    Read data from the Elasticsearch index.

    Args:
        field (str): Field in the document to retrieve text from
        query (Optional[dict]): Elasticsearch JSON query DSL object.
            For example:
            {"query": {"match": {"message": {"query": "this is a test"}}}}
        embedding_field (Optional[str]): If there are embeddings stored in
            this index, this field can be used
            to set the embedding field on the returned Document list.
        metadata_fields (Optional[List[str]]): Fields used as metadata. Default
            is all fields in the document except those specified by the
            field and embedding_field parameters.

    Returns:
        List[Document]: A list of documents.

    """
    res = self._client.post(f"{self.index}/_search", json=query).json()
    documents = []
    for hit in res["hits"]["hits"]:
        doc_id = hit["_id"]
        value = hit["_source"][field]
        embedding = hit["_source"].get(embedding_field or "", None)
        if metadata_fields:
            metadata = {
                k: v for k, v in hit["_source"].items() if k in metadata_fields
            }
        else:
            hit["_source"].pop(field)
            hit["_source"].pop(embedding_field or "", None)
            metadata = hit["_source"]
        documents.append(
            Document(id_=doc_id, text=value, metadata=metadata, embedding=embedding)
        )
    return documents