跳到内容

Snowflake

SnowflakeReader #

基类: BaseReader

初始化 SnowflakeReader 的新实例。

此类使用 SQLAlchemy 建立与 Snowflake 的连接,执行查询并将每一行连接成 LlamaIndex 使用的 Document。

属性

名称 类型 描述
engine Optional[Engine]

数据库连接的 SQLAlchemy Engine 对象。

account Optional[str]

Snowflake 账户标识符。

user Optional[str]

Snowflake 账户用户名。

password Optional[str]

Snowflake 账户的密码。

database Optional[str]

Snowflake 数据库名称。

schema Optional[str]

Snowflake 模式名称。

warehouse Optional[str]

Snowflake 仓库名称。

proxy Optional[str]

连接的代理设置。

源代码位于 llama-index-integrations/readers/llama-index-readers-snowflake/llama_index/readers/snowflake/base.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class SnowflakeReader(BaseReader):
    """
    Initializes a new instance of the SnowflakeReader.

    This class establishes a connection to Snowflake using SQLAlchemy, executes query
    and concatenates each row into Document used by LlamaIndex.

    Attributes:
        engine (Optional[Engine]): SQLAlchemy Engine object of the database connection.

        OR

        account (Optional[str]): Snowflake account identifier.
        user (Optional[str]): Snowflake account username.
        password (Optional[str]): Password for the Snowflake account.
        database (Optional[str]): Snowflake database name.
        schema (Optional[str]): Snowflake schema name.
        warehouse (Optional[str]): Snowflake warehouse name.
        proxy (Optional[str]): Proxy setting for the connection.

    """

    def __init__(
        self,
        account: Optional[str] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
        database: Optional[str] = None,
        schema: Optional[str] = None,
        warehouse: Optional[str] = None,
        role: Optional[str] = None,
        proxy: Optional[str] = None,
        engine: Optional[Engine] = None,
    ) -> None:
        """
        Initializes the SnowflakeReader with optional connection details, proxy configuration, or an engine directly.

        Args:
            account (Optional[str]): Snowflake account identifier.
            user (Optional[str]): Snowflake account username.
            password (Optional[str]): Password for the Snowflake account.
            database (Optional[str]): Snowflake database name.
            schema (Optional[str]): Snowflake schema name.
            warehouse (Optional[str]): Snowflake warehouse name.
            role (Optional[str]): Snowflake role name.
            proxy (Optional[str]): Proxy setting for the connection.
            engine (Optional[Engine]): Existing SQLAlchemy engine.

        """
        from snowflake.sqlalchemy import URL

        if engine is None:
            connect_args = {}
            if proxy:
                connect_args["proxy"] = proxy

            # Create an SQLAlchemy engine for Snowflake
            self.engine = create_engine(
                URL(
                    account=account or "",
                    user=user or "",
                    password=password or "",
                    database=database or "",
                    schema=schema or "",
                    warehouse=warehouse or "",
                    role=role or "",
                ),
                connect_args=connect_args,
            )
        else:
            self.engine = engine

        # Create a sessionmaker bound to the engine
        self.Session = sessionmaker(bind=self.engine)

    def execute_query(self, query_string: str) -> List[Any]:
        """
        Executes a SQL query and returns the fetched results.

        Args:
            query_string (str): The SQL query to be executed.

        Returns:
            List[Any]: The fetched results from the query.

        """
        # Create a session and execute the query
        session = self.Session()
        try:
            result = session.execute(text(query_string))
            return result.fetchall()
        finally:
            # Ensure the session is closed after query execution
            session.close()

    def load_data(self, query: str) -> List[Document]:
        """
        Query and load data from the Database, returning a list of Documents.

        Args:
            query (str): Query parameter to filter tables and rows.

        Returns:
            List[Document]: A list of Document objects.

        """
        documents = []

        if query is None:
            raise ValueError("A query parameter is necessary to filter the data")

        try:
            result = self.execute_query(query)

            for item in result:
                # fetch each item
                doc_str = ", ".join([str(entry) for entry in item])
                documents.append(Document(text=doc_str))
            return documents
        except Exception as e:
            logger.error(
                f"An error occurred while loading the data: {e}", exc_info=True
            )

execute_query #

execute_query(query_string: str) -> List[Any]

执行 SQL 查询并返回获取的结果。

参数

名称 类型 描述 默认值
query_string str

要执行的 SQL 查询。

必需

返回

类型 描述
List[Any]

List[Any]:从查询中获取的结果。

源代码位于 llama-index-integrations/readers/llama-index-readers-snowflake/llama_index/readers/snowflake/base.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def execute_query(self, query_string: str) -> List[Any]:
    """
    Executes a SQL query and returns the fetched results.

    Args:
        query_string (str): The SQL query to be executed.

    Returns:
        List[Any]: The fetched results from the query.

    """
    # Create a session and execute the query
    session = self.Session()
    try:
        result = session.execute(text(query_string))
        return result.fetchall()
    finally:
        # Ensure the session is closed after query execution
        session.close()

load_data #

load_data(query: str) -> List[Document]

查询并从数据库加载数据,返回 Document 列表。

参数

名称 类型 描述 默认值
query str

用于过滤表和行的查询参数。

必需

返回

类型 描述
List[Document]

List[Document]:Document 对象列表。

源代码位于 llama-index-integrations/readers/llama-index-readers-snowflake/llama_index/readers/snowflake/base.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def load_data(self, query: str) -> List[Document]:
    """
    Query and load data from the Database, returning a list of Documents.

    Args:
        query (str): Query parameter to filter tables and rows.

    Returns:
        List[Document]: A list of Document objects.

    """
    documents = []

    if query is None:
        raise ValueError("A query parameter is necessary to filter the data")

    try:
        result = self.execute_query(query)

        for item in result:
            # fetch each item
            doc_str = ", ".join([str(entry) for entry in item])
            documents.append(Document(text=doc_str))
        return documents
    except Exception as e:
        logger.error(
            f"An error occurred while loading the data: {e}", exc_info=True
        )