跳到内容

Neptune

基类: NeptuneBaseGraphStore

源码位于 llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/analytics.py

query #
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class NeptuneAnalyticsGraphStore(NeptuneBaseGraphStore):
    def __init__(
        self,
        graph_identifier: str,
        client: Any = None,
        credentials_profile_name: Optional[str] = None,
        region_name: Optional[str] = None,
        node_label: str = "Entity",
        **kwargs: Any,
    ) -> None:
        """Create a new Neptune Analytics graph wrapper instance."""
        self.node_label = node_label
        self._client = create_neptune_analytics_client(
            graph_identifier, client, credentials_profile_name, region_name
        )
        self.graph_identifier = graph_identifier

    def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
        """Query Neptune Analytics graph."""
        try:
            logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
            resp = self.client.execute_query(
                graphIdentifier=self.graph_identifier,
                queryString=query,
                parameters=params,
                language="OPEN_CYPHER",
            )
            return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": "An error occurred while executing the query.",
                    "details": str(e),
                    "query": query,
                    "parameters": str(params),
                }
            )

    def _get_summary(self) -> Dict:
        try:
            response = self.client.get_graph_summary(
                graphIdentifier=self.graph_identifier, mode="detailed"
            )
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": ("Summary API error occurred on Neptune Analytics"),
                    "details": str(e),
                }
            )

        try:
            summary = response["graphSummary"]
        except Exception:
            raise NeptuneQueryException(
                {
                    "message": "Summary API did not return a valid response.",
                    "details": response.content.decode(),
                }
            )
        else:
            return summary

查询 Neptune Analytics 图。

query(query: str, params: dict = {}) -> Dict[str, Any]

NeptuneAnalyticsPropertyGraphStore #

query #
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
    """Query Neptune Analytics graph."""
    try:
        logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
        resp = self.client.execute_query(
            graphIdentifier=self.graph_identifier,
            queryString=query,
            parameters=params,
            language="OPEN_CYPHER",
        )
        return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
    except Exception as e:
        raise NeptuneQueryException(
            {
                "message": "An error occurred while executing the query.",
                "details": str(e),
                "query": query,
                "parameters": str(params),
            }
        )

基类: NeptuneBasePropertyGraph

源码位于 llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/analytics_property_graph.py

structured_query #
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class NeptuneAnalyticsPropertyGraphStore(NeptuneBasePropertyGraph):
    supports_vector_queries: bool = True

    def __init__(
        self,
        graph_identifier: str,
        client: Any = None,
        credentials_profile_name: Optional[str] = None,
        region_name: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """Create a new Neptune Analytics graph wrapper instance."""
        self._client = create_neptune_analytics_client(
            graph_identifier, client, credentials_profile_name, region_name
        )
        self.graph_identifier = graph_identifier

    def structured_query(self, query: str, param_map: Dict[str, Any] = None) -> Any:
        """
        Run the structured query.

        Args:
            query (str): The query to run
            param_map (Dict[str, Any] | None, optional): A dictionary of query parameters. Defaults to None.

        Raises:
            NeptuneQueryException: An exception from Neptune with details

        Returns:
            Any: The results of the query

        """
        param_map = param_map or {}

        try:
            logger.debug(
                f"structured_query() query: {query} parameters: {json.dumps(param_map)}"
            )
            resp = self.client.execute_query(
                graphIdentifier=self.graph_identifier,
                queryString=query,
                parameters=param_map,
                language="OPEN_CYPHER",
            )
            return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": "An error occurred while executing the query.",
                    "details": str(e),
                    "query": query,
                    "parameters": str(param_map),
                }
            )

    def vector_query(self, query: VectorStoreQuery, **kwargs: Any) -> Tuple[List[Any]]:
        """
        Query the graph store with a vector store query.

        Returns:
            (nodes, score): The nodes and their associated score

        """
        conditions = None
        if query.filters:
            conditions = [
                f"e.{filter.key} {filter.operator.value} {filter.value}"
                for filter in query.filters.filters
            ]
        filters = (
            f" {query.filters.condition.value} ".join(conditions).replace("==", "=")
            if conditions is not None
            else "1 = 1"
        )

        data = self.structured_query(
            f"""MATCH (e:`{BASE_ENTITY_LABEL}`)
            WHERE ({filters})
            CALL neptune.algo.vectors.get(e)
            YIELD embedding
            WHERE embedding IS NOT NULL
            CALL neptune.algo.vectors.topKByNode(e)
            YIELD node, score
            WITH e, score
            ORDER BY score DESC LIMIT $limit
            RETURN e.id AS name,
                [l in labels(e) WHERE l <> '{BASE_ENTITY_LABEL}' | l][0] AS type,
                e{{.* , embedding: Null, name: Null, id: Null}} AS properties,
                score""",
            param_map={
                "embedding": query.query_embedding,
                "dimension": len(query.query_embedding),
                "limit": query.similarity_top_k,
            },
        )
        data = data if data else []

        nodes = []
        scores = []
        for record in data:
            node = EntityNode(
                name=record["name"],
                label=record["type"],
                properties=remove_empty_values(record["properties"]),
            )
            nodes.append(node)
            scores.append(record["score"])

        return (nodes, scores)

    def upsert_nodes(self, nodes: List[LabelledNode]) -> None:
        """
        Upsert the nodes in the graph.

        Args:
            nodes (List[LabelledNode]): The list of nodes to upsert

        """
        # Lists to hold separated types
        entity_dicts: List[dict] = []
        chunk_dicts: List[dict] = []

        # Sort by type
        for item in nodes:
            if isinstance(item, EntityNode):
                entity_dicts.append({**item.dict(), "id": item.id})
            elif isinstance(item, ChunkNode):
                chunk_dicts.append({**item.dict(), "id": item.id})
            else:
                # Log that we do not support these types of nodes
                # Or raise an error?
                pass

        if chunk_dicts:
            for d in chunk_dicts:
                self.structured_query(
                    """
                    WITH $data AS row
                    MERGE (c:Chunk {id: row.id})
                    SET c.text = row.text
                    SET c += removeKeyFromMap(row.properties, '')
                    WITH c, row.embedding as e
                    WHERE e IS NOT NULL
                    CALL neptune.algo.vectors.upsert(c, e)
                    RETURN count(*)
                    """,
                    param_map={"data": d},
                )

        if entity_dicts:
            for d in entity_dicts:
                self.structured_query(
                    f"""
                    WITH $data AS row
                    MERGE (e:`{BASE_NODE_LABEL}` {{id: row.id}})
                    SET e += removeKeyFromMap(row.properties, '')
                    SET e.name = row.name, e:`{BASE_ENTITY_LABEL}`
                    SET e:`{d['label']}`
                    WITH e, row
                    WHERE removeKeyFromMap(row.properties, '').triplet_source_id IS NOT NULL
                    MERGE (c:Chunk {{id: removeKeyFromMap(row.properties, '').triplet_source_id}})
                    MERGE (e)<-[:MENTIONS]-(c)
                    WITH e, row.embedding as em
                    CALL neptune.algo.vectors.upsert(e, em)
                    RETURN count(*) as count
                    """,
                    param_map={"data": d},
                )

    def _get_summary(self) -> Dict:
        """
        Get the Summary of the graph topology.

        Returns:
            Dict: The graph summary

        """
        try:
            response = self.client.get_graph_summary(
                graphIdentifier=self.graph_identifier, mode="detailed"
            )
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": ("Summary API error occurred on Neptune Analytics"),
                    "details": str(e),
                }
            )

        try:
            summary = response["graphSummary"]
        except Exception:
            raise NeptuneQueryException(
                {
                    "message": "Summary API did not return a valid response.",
                    "details": response.content.decode(),
                }
            )
        else:
            return summary

运行结构化查询。

structured_query(query: str, param_map: Dict[str, Any] = None) -> Any

参数

名称

类型 描述 默认值 str
NeptuneAnalyticsPropertyGraphStore 要运行的查询

必需

param_map
Dict[str, Any] | None 查询参数字典。默认为 None。

抛出异常

NeptuneQueryException

描述 默认值
来自 Neptune 的异常,包含详细信息

返回值

Any

类型 描述 默认值
查询结果 查询结果

vector_query #

structured_query #
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def structured_query(self, query: str, param_map: Dict[str, Any] = None) -> Any:
    """
    Run the structured query.

    Args:
        query (str): The query to run
        param_map (Dict[str, Any] | None, optional): A dictionary of query parameters. Defaults to None.

    Raises:
        NeptuneQueryException: An exception from Neptune with details

    Returns:
        Any: The results of the query

    """
    param_map = param_map or {}

    try:
        logger.debug(
            f"structured_query() query: {query} parameters: {json.dumps(param_map)}"
        )
        resp = self.client.execute_query(
            graphIdentifier=self.graph_identifier,
            queryString=query,
            parameters=param_map,
            language="OPEN_CYPHER",
        )
        return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
    except Exception as e:
        raise NeptuneQueryException(
            {
                "message": "An error occurred while executing the query.",
                "details": str(e),
                "query": query,
                "parameters": str(param_map),
            }
        )

使用向量存储查询图存储。

vector_query(query: VectorStoreQuery, **kwargs: Any) -> Tuple[List[Any]]

(节点, 分数)

Any

描述 默认值
节点及其相关分数

upsert_nodes #

structured_query #
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def vector_query(self, query: VectorStoreQuery, **kwargs: Any) -> Tuple[List[Any]]:
    """
    Query the graph store with a vector store query.

    Returns:
        (nodes, score): The nodes and their associated score

    """
    conditions = None
    if query.filters:
        conditions = [
            f"e.{filter.key} {filter.operator.value} {filter.value}"
            for filter in query.filters.filters
        ]
    filters = (
        f" {query.filters.condition.value} ".join(conditions).replace("==", "=")
        if conditions is not None
        else "1 = 1"
    )

    data = self.structured_query(
        f"""MATCH (e:`{BASE_ENTITY_LABEL}`)
        WHERE ({filters})
        CALL neptune.algo.vectors.get(e)
        YIELD embedding
        WHERE embedding IS NOT NULL
        CALL neptune.algo.vectors.topKByNode(e)
        YIELD node, score
        WITH e, score
        ORDER BY score DESC LIMIT $limit
        RETURN e.id AS name,
            [l in labels(e) WHERE l <> '{BASE_ENTITY_LABEL}' | l][0] AS type,
            e{{.* , embedding: Null, name: Null, id: Null}} AS properties,
            score""",
        param_map={
            "embedding": query.query_embedding,
            "dimension": len(query.query_embedding),
            "limit": query.similarity_top_k,
        },
    )
    data = data if data else []

    nodes = []
    scores = []
    for record in data:
        node = EntityNode(
            name=record["name"],
            label=record["type"],
            properties=remove_empty_values(record["properties"]),
        )
        nodes.append(node)
        scores.append(record["score"])

    return (nodes, scores)

在图中 upsert 节点。

upsert_nodes(nodes: List[LabelledNode]) -> None

nodes

名称

类型 描述 默认值 str
List[LabelledNode] 要 upsert 的节点列表

NeptuneDatabaseGraphStore #

param_map
structured_query #
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def upsert_nodes(self, nodes: List[LabelledNode]) -> None:
    """
    Upsert the nodes in the graph.

    Args:
        nodes (List[LabelledNode]): The list of nodes to upsert

    """
    # Lists to hold separated types
    entity_dicts: List[dict] = []
    chunk_dicts: List[dict] = []

    # Sort by type
    for item in nodes:
        if isinstance(item, EntityNode):
            entity_dicts.append({**item.dict(), "id": item.id})
        elif isinstance(item, ChunkNode):
            chunk_dicts.append({**item.dict(), "id": item.id})
        else:
            # Log that we do not support these types of nodes
            # Or raise an error?
            pass

    if chunk_dicts:
        for d in chunk_dicts:
            self.structured_query(
                """
                WITH $data AS row
                MERGE (c:Chunk {id: row.id})
                SET c.text = row.text
                SET c += removeKeyFromMap(row.properties, '')
                WITH c, row.embedding as e
                WHERE e IS NOT NULL
                CALL neptune.algo.vectors.upsert(c, e)
                RETURN count(*)
                """,
                param_map={"data": d},
            )

    if entity_dicts:
        for d in entity_dicts:
            self.structured_query(
                f"""
                WITH $data AS row
                MERGE (e:`{BASE_NODE_LABEL}` {{id: row.id}})
                SET e += removeKeyFromMap(row.properties, '')
                SET e.name = row.name, e:`{BASE_ENTITY_LABEL}`
                SET e:`{d['label']}`
                WITH e, row
                WHERE removeKeyFromMap(row.properties, '').triplet_source_id IS NOT NULL
                MERGE (c:Chunk {{id: removeKeyFromMap(row.properties, '').triplet_source_id}})
                MERGE (e)<-[:MENTIONS]-(c)
                WITH e, row.embedding as em
                CALL neptune.algo.vectors.upsert(e, em)
                RETURN count(*) as count
                """,
                param_map={"data": d},
            )

源码位于 llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/database.py

源码位于 llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/analytics.py

query #
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class NeptuneDatabaseGraphStore(NeptuneBaseGraphStore):
    def __init__(
        self,
        host: str,
        port: int = 8182,
        use_https: bool = True,
        client: Any = None,
        credentials_profile_name: Optional[str] = None,
        region_name: Optional[str] = None,
        sign: bool = True,
        node_label: str = "Entity",
        **kwargs: Any,
    ) -> None:
        """Create a new Neptune Database graph wrapper instance."""
        self.node_label = node_label
        self._client = create_neptune_database_client(
            host, port, client, credentials_profile_name, region_name, sign, use_https
        )

    def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
        """Query Neptune database."""
        try:
            logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
            return self.client.execute_open_cypher_query(
                openCypherQuery=query, parameters=json.dumps(params)
            )["results"]
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": "An error occurred while executing the query.",
                    "details": str(e),
                    "query": query,
                    "parameters": str(params),
                }
            )

    def _get_summary(self) -> Dict:
        try:
            response = self.client.get_propertygraph_summary()
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": (
                        "Summary API is not available for this instance of Neptune,"
                        "ensure the engine version is >=1.2.1.0"
                    ),
                    "details": str(e),
                }
            )

        try:
            summary = response["payload"]["graphSummary"]
        except Exception:
            raise NeptuneQueryException(
                {
                    "message": "Summary API did not return a valid response.",
                    "details": response.content.decode(),
                }
            )
        else:
            return summary

查询 Neptune 数据库。

query(query: str, params: dict = {}) -> Dict[str, Any]

NeptuneDatabasePropertyGraphStore #

query #
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
    """Query Neptune database."""
    try:
        logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
        return self.client.execute_open_cypher_query(
            openCypherQuery=query, parameters=json.dumps(params)
        )["results"]
    except Exception as e:
        raise NeptuneQueryException(
            {
                "message": "An error occurred while executing the query.",
                "details": str(e),
                "query": query,
                "parameters": str(params),
            }
        )

源码位于 llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/database_property_graph.py

源码位于 llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/analytics_property_graph.py

structured_query #
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class NeptuneDatabasePropertyGraphStore(NeptuneBasePropertyGraph):
    supports_vector_queries: bool = False

    def __init__(
        self,
        host: str,
        port: int = 8182,
        client: Any = None,
        credentials_profile_name: Optional[str] = None,
        region_name: Optional[str] = None,
        sign: bool = True,
        use_https: bool = True,
        **kwargs: Any,
    ) -> None:
        """
        Init.

        Args:
            host (str): The host endpoint
            port (int, optional): The port. Defaults to 8182.
            client (Any, optional): If provided, this is the client that will be used. Defaults to None.
            credentials_profile_name (Optional[str], optional): If provided this is the credentials profile that will be used. Defaults to None.
            region_name (Optional[str], optional): The region to use. Defaults to None.
            sign (bool, optional): True will SigV4 sign all requests, False will not. Defaults to True.
            use_https (bool, optional): True to use https, False to use http. Defaults to True.

        """
        self._client = create_neptune_database_client(
            host, port, client, credentials_profile_name, region_name, sign, use_https
        )

    def structured_query(self, query: str, param_map: Dict[str, Any] = None) -> Any:
        """
        Run the structured query.

        Args:
            query (str): The query to run
            param_map (Dict[str, Any] | None, optional): A dictionary of query parameters. Defaults to None.

        Raises:
            NeptuneQueryException: An exception from Neptune with details

        Returns:
            Any: The results of the query

        """
        param_map = param_map or {}

        try:
            logger.debug(
                f"structured_query() query: {query} parameters: {json.dumps(param_map)}"
            )

            return self.client.execute_open_cypher_query(
                openCypherQuery=query, parameters=json.dumps(param_map)
            )["results"]
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": "An error occurred while executing the query.",
                    "details": str(e),
                    "query": query,
                    "parameters": str(param_map),
                }
            )

    def vector_query(self, query: VectorStoreQuery, **kwargs: Any) -> Tuple[List[Any]]:
        """
        NOT SUPPORTED.

        Args:
            query (VectorStoreQuery): _description_

        Raises:
            NotImplementedError: _description_

        Returns:
            Tuple[List[LabelledNode] | List[float]]: _description_

        """
        raise NotImplementedError

    def upsert_nodes(self, nodes: List[LabelledNode]) -> None:
        """
        Upsert the nodes in the graph.

        Args:
            nodes (List[LabelledNode]): The list of nodes to upsert

        """
        # Lists to hold separated types
        entity_dicts: List[dict] = []
        chunk_dicts: List[dict] = []

        # Sort by type
        for item in nodes:
            if isinstance(item, EntityNode):
                entity_dicts.append({**item.dict(), "id": item.id})
            elif isinstance(item, ChunkNode):
                chunk_dicts.append({**item.dict(), "id": item.id})
            else:
                # Log that we do not support these types of nodes
                # Or raise an error?
                pass

        if chunk_dicts:
            for d in chunk_dicts:
                self.structured_query(
                    """
                    WITH $data AS row
                    MERGE (c:Chunk {id: row.id})
                    SET c.text = row.text
                    SET c += removeKeyFromMap(row.properties, '')
                    RETURN count(*)
                    """,
                    param_map={"data": d},
                )

        if entity_dicts:
            for d in entity_dicts:
                self.structured_query(
                    f"""
                    WITH $data AS row
                    MERGE (e:`{BASE_NODE_LABEL}` {{id: row.id}})
                    SET e += removeKeyFromMap(row.properties, '')
                    SET e.name = row.name, e:`{BASE_ENTITY_LABEL}`
                    SET e:`{d['label']}`
                    WITH e, row
                    WHERE removeKeyFromMap(row.properties, '').triplet_source_id IS NOT NULL
                    MERGE (c:Chunk {{id: removeKeyFromMap(row.properties, '').triplet_source_id}})
                    MERGE (e)<-[:MENTIONS]-(c)
                    RETURN count(*) as count
                    """,
                    param_map={"data": d},
                )

    def _get_summary(self) -> Dict:
        """
        Get the Summary of the graph schema.

        Returns:
            Dict: The graph summary

        """
        try:
            response = self.client.get_propertygraph_summary()
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": (
                        "Summary API is not available for this instance of Neptune,"
                        "ensure the engine version is >=1.2.1.0"
                    ),
                    "details": str(e),
                }
            )

        try:
            summary = response["payload"]["graphSummary"]
        except Exception:
            raise NeptuneQueryException(
                {
                    "message": "Summary API did not return a valid response.",
                    "details": response.content.decode(),
                }
            )
        else:
            return summary

vector_query #

structured_query(query: str, param_map: Dict[str, Any] = None) -> Any

参数

名称

类型 描述 默认值 str
NeptuneAnalyticsPropertyGraphStore 要运行的查询

必需

param_map
Dict[str, Any] | None 查询参数字典。默认为 None。

抛出异常

NeptuneQueryException

描述 默认值
来自 Neptune 的异常,包含详细信息

返回值

Any

类型 描述 默认值
查询结果 查询结果

vector_query #

structured_query #
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def structured_query(self, query: str, param_map: Dict[str, Any] = None) -> Any:
    """
    Run the structured query.

    Args:
        query (str): The query to run
        param_map (Dict[str, Any] | None, optional): A dictionary of query parameters. Defaults to None.

    Raises:
        NeptuneQueryException: An exception from Neptune with details

    Returns:
        Any: The results of the query

    """
    param_map = param_map or {}

    try:
        logger.debug(
            f"structured_query() query: {query} parameters: {json.dumps(param_map)}"
        )

        return self.client.execute_open_cypher_query(
            openCypherQuery=query, parameters=json.dumps(param_map)
        )["results"]
    except Exception as e:
        raise NeptuneQueryException(
            {
                "message": "An error occurred while executing the query.",
                "details": str(e),
                "query": query,
                "parameters": str(param_map),
            }
        )

不支持。

vector_query(query: VectorStoreQuery, **kwargs: Any) -> Tuple[List[Any]]

VectorStoreQuery

名称

类型 描述 默认值 str
NeptuneAnalyticsPropertyGraphStore 描述

NotImplementedError

param_map

NeptuneQueryException

描述 默认值
Tuple[List[Any]]

NotImplementedError

Any

描述 默认值
Tuple[List[LabelledNode] | List[float]]: 描述

upsert_nodes #

structured_query #
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def vector_query(self, query: VectorStoreQuery, **kwargs: Any) -> Tuple[List[Any]]:
    """
    NOT SUPPORTED.

    Args:
        query (VectorStoreQuery): _description_

    Raises:
        NotImplementedError: _description_

    Returns:
        Tuple[List[LabelledNode] | List[float]]: _description_

    """
    raise NotImplementedError

返回顶部

upsert_nodes(nodes: List[LabelledNode]) -> None

nodes

名称

类型 描述 默认值 str
List[LabelledNode] 要 upsert 的节点列表

NeptuneDatabaseGraphStore #

param_map
structured_query #
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def upsert_nodes(self, nodes: List[LabelledNode]) -> None:
    """
    Upsert the nodes in the graph.

    Args:
        nodes (List[LabelledNode]): The list of nodes to upsert

    """
    # Lists to hold separated types
    entity_dicts: List[dict] = []
    chunk_dicts: List[dict] = []

    # Sort by type
    for item in nodes:
        if isinstance(item, EntityNode):
            entity_dicts.append({**item.dict(), "id": item.id})
        elif isinstance(item, ChunkNode):
            chunk_dicts.append({**item.dict(), "id": item.id})
        else:
            # Log that we do not support these types of nodes
            # Or raise an error?
            pass

    if chunk_dicts:
        for d in chunk_dicts:
            self.structured_query(
                """
                WITH $data AS row
                MERGE (c:Chunk {id: row.id})
                SET c.text = row.text
                SET c += removeKeyFromMap(row.properties, '')
                RETURN count(*)
                """,
                param_map={"data": d},
            )

    if entity_dicts:
        for d in entity_dicts:
            self.structured_query(
                f"""
                WITH $data AS row
                MERGE (e:`{BASE_NODE_LABEL}` {{id: row.id}})
                SET e += removeKeyFromMap(row.properties, '')
                SET e.name = row.name, e:`{BASE_ENTITY_LABEL}`
                SET e:`{d['label']}`
                WITH e, row
                WHERE removeKeyFromMap(row.properties, '').triplet_source_id IS NOT NULL
                MERGE (c:Chunk {{id: removeKeyFromMap(row.properties, '').triplet_source_id}})
                MERGE (e)<-[:MENTIONS]-(c)
                RETURN count(*) as count
                """,
                param_map={"data": d},
            )