跳到内容

Couchbase

CouchbaseReader #

基类: BaseReader

Couchbase 文档加载器。

将 Couchbase 集群中的数据加载到 LlamaIndex 使用的 Document 中。

参数

名称 类型 描述 默认值
client (可选[任意])

要使用的 Couchbase 客户端。如果未提供,将根据 connection_string 和数据库凭据创建客户端。

必需
connection_string 可选[字符串]

Couchbase 集群的连接字符串。

db_username 可选[字符串]

连接 Couchbase 集群的用户名。

db_password 可选[字符串]

连接 Couchbase 集群的密码。

源代码位于 llama-index-integrations/readers/llama-index-readers-couchbase/llama_index/readers/couchbase/base.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class CouchbaseReader(BaseReader):
    """
    Couchbase document loader.

    Loads data from a Couchbase cluster into Document used by LlamaIndex.

    Args:
        client(Optional[Any]): A Couchbase client to use.
            If not provided, the client will be created based on the connection_string
            and database credentials.
        connection_string (Optional[str]): The connection string to the Couchbase cluster.
        db_username (Optional[str]): The username to connect to the Couchbase cluster.
        db_password (Optional[str]): The password to connect to the Couchbase cluster.

    """

    def __init__(
        self,
        client: Optional[Any] = None,
        connection_string: Optional[str] = None,
        db_username: Optional[str] = None,
        db_password: Optional[str] = None,
    ) -> None:
        """Initialize Couchbase document loader."""
        import_err_msg = "`couchbase` package not found, please run `pip install --upgrade couchbase`"
        try:
            from couchbase.auth import PasswordAuthenticator
            from couchbase.cluster import Cluster
            from couchbase.options import ClusterOptions
        except ImportError:
            raise ImportError(import_err_msg)

        if not client:
            if not connection_string or not db_username or not db_password:
                raise ValueError(
                    "You need to pass either a couchbase client or connection_string and credentials must be provided."
                )
            else:
                auth = PasswordAuthenticator(
                    db_username,
                    db_password,
                )

                self._client: Cluster = Cluster(connection_string, ClusterOptions(auth))
        else:
            self._client = client

    def lazy_load_data(
        self,
        query: str,
        text_fields: Optional[List[str]] = None,
        metadata_fields: Optional[List[str]] = [],
    ) -> Iterable[Document]:
        """
        Load data from the Couchbase cluster lazily.

        Args:
            query (str): The SQL++ query to execute.
            text_fields (Optional[List[str]]): The columns to write into the
                `text` field of the document. By default, all columns are
                written.
            metadata_fields (Optional[List[str]]): The columns to write into the
                `metadata` field of the document. By default, no columns are written.

        """
        from datetime import timedelta

        if not query:
            raise ValueError("Query must be provided.")

        # Ensure connection to Couchbase cluster
        self._client.wait_until_ready(timedelta(seconds=5))

        # Run SQL++ Query
        result = self._client.query(query)
        for row in result:
            if not text_fields:
                text_fields = list(row.keys())

            metadata = {field: row[field] for field in metadata_fields}

            document = "\n".join(
                f"{k}: {v}" for k, v in row.items() if k in text_fields
            )

            yield (Document(text=document, metadata=metadata))

    def load_data(
        self,
        query: str,
        text_fields: Optional[List[str]] = None,
        metadata_fields: Optional[List[str]] = None,
    ) -> List[Document]:
        """
        Load data from the Couchbase cluster.

        Args:
            query (str): The SQL++ query to execute.
            text_fields (Optional[List[str]]): The columns to write into the
                `text` field of the document. By default, all columns are
                written.
            metadata_fields (Optional[List[str]]): The columns to write into the
                `metadata` field of the document. By default, no columns are written.

        """
        return list(self.lazy_load_data(query, text_fields, metadata_fields))

lazy_load_data #

lazy_load_data(query: str, text_fields: Optional[List[str]] = None, metadata_fields: Optional[List[str]] = []) -> Iterable[Document]

延迟加载 Couchbase 集群中的数据。

参数

名称 类型 描述 默认值
query 字符串

要执行的 SQL++ 查询。

必需
text_fields 可选[列表[字符串]]

要写入文档 text 字段的列。默认情况下,写入所有列。

metadata_fields 可选[列表[字符串]]

要写入文档 metadata 字段的列。默认情况下,不写入任何列。

[]
源代码位于 llama-index-integrations/readers/llama-index-readers-couchbase/llama_index/readers/couchbase/base.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def lazy_load_data(
    self,
    query: str,
    text_fields: Optional[List[str]] = None,
    metadata_fields: Optional[List[str]] = [],
) -> Iterable[Document]:
    """
    Load data from the Couchbase cluster lazily.

    Args:
        query (str): The SQL++ query to execute.
        text_fields (Optional[List[str]]): The columns to write into the
            `text` field of the document. By default, all columns are
            written.
        metadata_fields (Optional[List[str]]): The columns to write into the
            `metadata` field of the document. By default, no columns are written.

    """
    from datetime import timedelta

    if not query:
        raise ValueError("Query must be provided.")

    # Ensure connection to Couchbase cluster
    self._client.wait_until_ready(timedelta(seconds=5))

    # Run SQL++ Query
    result = self._client.query(query)
    for row in result:
        if not text_fields:
            text_fields = list(row.keys())

        metadata = {field: row[field] for field in metadata_fields}

        document = "\n".join(
            f"{k}: {v}" for k, v in row.items() if k in text_fields
        )

        yield (Document(text=document, metadata=metadata))

load_data #

load_data(query: str, text_fields: Optional[List[str]] = None, metadata_fields: Optional[List[str]] = None) -> List[Document]

从 Couchbase 集群加载数据。

参数

名称 类型 描述 默认值
query 字符串

要执行的 SQL++ 查询。

必需
text_fields 可选[列表[字符串]]

要写入文档 text 字段的列。默认情况下,写入所有列。

metadata_fields 可选[列表[字符串]]

要写入文档 metadata 字段的列。默认情况下,不写入任何列。

源代码位于 llama-index-integrations/readers/llama-index-readers-couchbase/llama_index/readers/couchbase/base.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def load_data(
    self,
    query: str,
    text_fields: Optional[List[str]] = None,
    metadata_fields: Optional[List[str]] = None,
) -> List[Document]:
    """
    Load data from the Couchbase cluster.

    Args:
        query (str): The SQL++ query to execute.
        text_fields (Optional[List[str]]): The columns to write into the
            `text` field of the document. By default, all columns are
            written.
        metadata_fields (Optional[List[str]]): The columns to write into the
            `metadata` field of the document. By default, no columns are written.

    """
    return list(self.lazy_load_data(query, text_fields, metadata_fields))