JSONalyze 查询引擎¶
JSONalyze 查询引擎通常设计用于在调用 API(通过代理等)之后连接,在这种情况下,返回的值是大批行的实例,下一步是对数据执行统计分析。
使用 JSONalyze,在后台会创建一个内存中的 SQLite 表,并加载 JSON 列表;查询引擎能够对数据执行 SQL 查询,并将查询结果作为分析问题的答案返回。
本笔记本详细介绍了使用 Workflows 实现 JSON Analyze 查询引擎的过程。
具体来说,我们将实现 JSONalyzeQueryEngine。
In [ ]
已复制!
!pip install -U llama-index
!pip install -U llama-index
In [ ]
已复制!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os os.environ["OPENAI_API_KEY"] = "sk-..."
由于工作流是异步优先的,这一切在笔记本中运行良好。如果您在自己的代码中运行,如果尚未运行异步事件循环,则需要使用 asyncio.run()
来启动它。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
工作流¶
jsonalyzer
- 它将 StartEvent 作为输入,并返回 JsonAnalyzerEvent。
- 该函数设置一个内存中的 SQLite 数据库,加载 JSON 数据,使用 LLM 根据查询生成 SQL 查询,执行查询,并返回结果以及 SQL 查询和表 schema。
synthesize
该函数使用 LLM 根据 SQL 查询、表 schema 和查询结果合成响应。
这些步骤将使用内置的 StartEvent
和 StopEvent
事件。
定义事件¶
In [ ]
已复制!
from llama_index.core.workflow import Event
from typing import Dict, List, Any
class JsonAnalyzerEvent(Event):
"""
Event containing results of JSON analysis.
Attributes:
sql_query (str): The generated SQL query.
table_schema (Dict[str, Any]): Schema of the analyzed table.
results (List[Dict[str, Any]]): Query execution results.
"""
sql_query: str
table_schema: Dict[str, Any]
results: List[Dict[str, Any]]
from llama_index.core.workflow import Event from typing import Dict, List, Any class JsonAnalyzerEvent(Event): """ 包含 JSON 分析结果的事件。 属性: sql_query (str): 生成的 SQL 查询。 table_schema (Dict[str, Any]): 分析表的 Schema。 results (List[Dict[str, Any]]): 查询执行结果。 """ sql_query: str table_schema: Dict[str, Any] results: List[Dict[str, Any]]
提示模板¶
这里我们定义了默认的 DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL
、DEFAULT_RESPONSE_SYNTHESIS_PROMPT
和 DEFAULT_TABLE_NAME
。
In [ ]
已复制!
from llama_index.core.prompts.prompt_type import PromptType
from llama_index.core.prompts import PromptTemplate
DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL = (
"Given a query, synthesize a response based on SQL query results"
" to satisfy the query. Only include details that are relevant to"
" the query. If you don't know the answer, then say that.\n"
"SQL Query: {sql_query}\n"
"Table Schema: {table_schema}\n"
"SQL Response: {sql_response}\n"
"Query: {query_str}\n"
"Response: "
)
DEFAULT_RESPONSE_SYNTHESIS_PROMPT = PromptTemplate(
DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL,
prompt_type=PromptType.SQL_RESPONSE_SYNTHESIS,
)
DEFAULT_TABLE_NAME = "items"
from llama_index.core.prompts.prompt_type import PromptType from llama_index.core.prompts import PromptTemplate DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL = ( "Given a query, synthesize a response based on SQL query results" " to satisfy the query. Only include details that are relevant to" " the query. If you don't know the answer, then say that.\n" "SQL Query: {sql_query}\n" "Table Schema: {table_schema}\n" "SQL Response: {sql_response}\n" "Query: {query_str}\n" "Response: " ) DEFAULT_RESPONSE_SYNTHESIS_PROMPT = PromptTemplate( DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL, prompt_type=PromptType.SQL_RESPONSE_SYNTHESIS, ) DEFAULT_TABLE_NAME = "items"
In [ ]
已复制!
from llama_index.core.base.response.schema import Response
from llama_index.core.indices.struct_store.sql_retriever import (
DefaultSQLParser,
)
from llama_index.core.prompts.default_prompts import DEFAULT_JSONALYZE_PROMPT
from llama_index.core.utils import print_text
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
from IPython.display import Markdown, display
class JSONAnalyzeQueryEngineWorkflow(Workflow):
@step
async def jsonalyzer(
self, ctx: Context, ev: StartEvent
) -> JsonAnalyzerEvent:
"""
Analyze JSON data using a SQL-like query approach.
This asynchronous method sets up an in-memory SQLite database, loads JSON data,
generates a SQL query based on a natural language question, executes the query,
and returns the results.
Args:
ctx (Context): The context object for storing data during execution.
ev (StartEvent): The event object containing input parameters.
Returns:
JsonAnalyzerEvent: An event object containing the SQL query, table schema, and query results.
The method performs the following steps:
1. Imports the required 'sqlite-utils' package.
2. Extracts necessary data from the input event.
3. Sets up an in-memory SQLite database and loads the JSON data.
4. Generates a SQL query using a LLM based on the input question.
5. Executes the SQL query and retrieves the results.
6. Returns the results along with the SQL query and table schema.
Note:
This method requires the 'sqlite-utils' package to be installed.
"""
try:
import sqlite_utils
except ImportError as exc:
IMPORT_ERROR_MSG = (
"sqlite-utils is needed to use this Query Engine:\n"
"pip install sqlite-utils"
)
raise ImportError(IMPORT_ERROR_MSG) from exc
await ctx.set("query", ev.get("query"))
await ctx.set("llm", ev.get("llm"))
query = ev.get("query")
table_name = ev.get("table_name")
list_of_dict = ev.get("list_of_dict")
prompt = DEFAULT_JSONALYZE_PROMPT
# Instantiate in-memory SQLite database
db = sqlite_utils.Database(memory=True)
try:
# Load list of dictionaries into SQLite database
db[ev.table_name].insert_all(list_of_dict)
except sqlite_utils.utils.sqlite3.IntegrityError as exc:
print_text(
f"Error inserting into table {table_name}, expected format:"
)
print_text("[{col1: val1, col2: val2, ...}, ...]")
raise ValueError("Invalid list_of_dict") from exc
# Get the table schema
table_schema = db[table_name].columns_dict
# Get the SQL query with text-to-SQL prompt
response_str = await ev.llm.apredict(
prompt=prompt,
table_name=table_name,
table_schema=table_schema,
question=query,
)
sql_parser = DefaultSQLParser()
sql_query = sql_parser.parse_response_to_sql(response_str, ev.query)
try:
# Execute the SQL query
results = list(db.query(sql_query))
except sqlite_utils.utils.sqlite3.OperationalError as exc:
print_text(f"Error executing query: {sql_query}")
raise ValueError("Invalid query") from exc
return JsonAnalyzerEvent(
sql_query=sql_query, table_schema=table_schema, results=results
)
@step
async def synthesize(
self, ctx: Context, ev: JsonAnalyzerEvent
) -> StopEvent:
"""Synthesize the response."""
llm = await ctx.get("llm", default=None)
query = await ctx.get("query", default=None)
response_str = llm.predict(
DEFAULT_RESPONSE_SYNTHESIS_PROMPT,
sql_query=ev.sql_query,
table_schema=ev.table_schema,
sql_response=ev.results,
query_str=query,
)
response_metadata = {
"sql_query": ev.sql_query,
"table_schema": str(ev.table_schema),
}
response = Response(response=response_str, metadata=response_metadata)
return StopEvent(result=response)
from llama_index.core.base.response.schema import Response from llama_index.core.indices.struct_store.sql_retriever import ( DefaultSQLParser, ) from llama_index.core.prompts.default_prompts import DEFAULT_JSONALYZE_PROMPT from llama_index.core.utils import print_text from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) from llama_index.llms.openai import OpenAI from IPython.display import Markdown, display class JSONAnalyzeQueryEngineWorkflow(Workflow): @step async def jsonalyzer( self, ctx: Context, ev: StartEvent ) -> JsonAnalyzerEvent: """ 使用类似 SQL 的查询方法分析 JSON 数据。 这个异步方法设置一个内存中的 SQLite 数据库,加载 JSON 数据, 根据自然语言问题生成 SQL 查询,执行查询,并返回结果 以及 SQL 查询和表 schema。 Args: ctx (Context): 在执行期间用于存储数据的上下文对象。 ev (StartEvent): 包含输入参数的事件对象。 Returns: JsonAnalyzerEvent: 包含 SQL 查询、表 schema 和查询结果的事件对象。 该方法执行以下步骤: 1. 导入所需的 'sqlite-utils' 包。 2. 从输入事件中提取必要的数据。 3. 设置内存中的 SQLite 数据库并加载 JSON 数据。 4. 使用 LLM 基于输入问题生成 SQL 查询。 5. 执行 SQL 查询并检索结果。 6. 返回结果以及 SQL 查询和表 schema。 Note: 此方法需要安装 'sqlite-utils' 包。 """ try: import sqlite_utils except ImportError as exc: IMPORT_ERROR_MSG = ( "sqlite-utils 需要使用此查询引擎:\n" "pip install sqlite-utils" ) raise ImportError(IMPORT_ERROR_MSG) from exc await ctx.set("query", ev.get("query")) await ctx.set("llm", ev.get("llm")) query = ev.get("query") table_name = ev.get("table_name") list_of_dict = ev.get("list_of_dict") prompt = DEFAULT_JSONALYZE_PROMPT # 实例化内存中的 SQLite 数据库 db = sqlite_utils.Database(memory=True) try: # 将字典列表加载到 SQLite 数据库中 db[ev.table_name].insert_all(list_of_dict) except sqlite_utils.utils.sqlite3.IntegrityError as exc: print_text( f"插入表 {table_name} 时出错,预期格式为:" ) print_text("[{col1: val1, col2: val2, ...}, ...]") raise ValueError("list_of_dict 无效") from exc # 获取表 schema table_schema = db[table_name].columns_dict # 获取文本到 SQL 提示的 SQL 查询 response_str = await ev.llm.apredict( prompt=prompt, table_name=table_name, table_schema=table_schema, question=query, ) sql_parser = DefaultSQLParser() sql_query = sql_parser.parse_response_to_sql(response_str, ev.query) try: # 执行 SQL 查询 results = list(db.query(sql_query)) except sqlite_utils.utils.sqlite3.OperationalError as exc: print_text(f"执行查询时出错: {sql_query}") raise ValueError("无效查询") from exc return JsonAnalyzerEvent( sql_query=sql_query, table_schema=table_schema, results=results ) @step async def synthesize( self, ctx: Context, ev: JsonAnalyzerEvent ) -> StopEvent: """合成响应。""" llm = await ctx.get("llm", default=None) query = await ctx.get("query", default=None) response_str = llm.predict( DEFAULT_RESPONSE_SYNTHESIS_PROMPT, sql_query=ev.sql_query, table_schema=ev.table_schema, sql_response=ev.results, query_str=query, ) response_metadata = { "sql_query": ev.sql_query, "table_schema": str(ev.table_schema), } response = Response(response=response_str, metadata=response_metadata) return StopEvent(result=response)
创建 JSON 列表¶
In [ ]
已复制!
json_list = [
{
"name": "John Doe",
"age": 25,
"major": "Computer Science",
"email": "[email protected]",
"address": "123 Main St",
"city": "New York",
"state": "NY",
"country": "USA",
"phone": "+1 123-456-7890",
"occupation": "Software Engineer",
},
{
"name": "Jane Smith",
"age": 30,
"major": "Business Administration",
"email": "[email protected]",
"address": "456 Elm St",
"city": "San Francisco",
"state": "CA",
"country": "USA",
"phone": "+1 234-567-8901",
"occupation": "Marketing Manager",
},
{
"name": "Michael Johnson",
"age": 35,
"major": "Finance",
"email": "[email protected]",
"address": "789 Oak Ave",
"city": "Chicago",
"state": "IL",
"country": "USA",
"phone": "+1 345-678-9012",
"occupation": "Financial Analyst",
},
{
"name": "Emily Davis",
"age": 28,
"major": "Psychology",
"email": "[email protected]",
"address": "234 Pine St",
"city": "Los Angeles",
"state": "CA",
"country": "USA",
"phone": "+1 456-789-0123",
"occupation": "Psychologist",
},
{
"name": "Alex Johnson",
"age": 27,
"major": "Engineering",
"email": "[email protected]",
"address": "567 Cedar Ln",
"city": "Seattle",
"state": "WA",
"country": "USA",
"phone": "+1 567-890-1234",
"occupation": "Civil Engineer",
},
{
"name": "Jessica Williams",
"age": 32,
"major": "Biology",
"email": "[email protected]",
"address": "890 Walnut Ave",
"city": "Boston",
"state": "MA",
"country": "USA",
"phone": "+1 678-901-2345",
"occupation": "Biologist",
},
{
"name": "Matthew Brown",
"age": 26,
"major": "English Literature",
"email": "[email protected]",
"address": "123 Peach St",
"city": "Atlanta",
"state": "GA",
"country": "USA",
"phone": "+1 789-012-3456",
"occupation": "Writer",
},
{
"name": "Olivia Wilson",
"age": 29,
"major": "Art",
"email": "[email protected]",
"address": "456 Plum Ave",
"city": "Miami",
"state": "FL",
"country": "USA",
"phone": "+1 890-123-4567",
"occupation": "Artist",
},
{
"name": "Daniel Thompson",
"age": 31,
"major": "Physics",
"email": "[email protected]",
"address": "789 Apple St",
"city": "Denver",
"state": "CO",
"country": "USA",
"phone": "+1 901-234-5678",
"occupation": "Physicist",
},
{
"name": "Sophia Clark",
"age": 27,
"major": "Sociology",
"email": "[email protected]",
"address": "234 Orange Ln",
"city": "Austin",
"state": "TX",
"country": "USA",
"phone": "+1 012-345-6789",
"occupation": "Social Worker",
},
{
"name": "Christopher Lee",
"age": 33,
"major": "Chemistry",
"email": "[email protected]",
"address": "567 Mango St",
"city": "San Diego",
"state": "CA",
"country": "USA",
"phone": "+1 123-456-7890",
"occupation": "Chemist",
},
{
"name": "Ava Green",
"age": 28,
"major": "History",
"email": "[email protected]",
"address": "890 Cherry Ave",
"city": "Philadelphia",
"state": "PA",
"country": "USA",
"phone": "+1 234-567-8901",
"occupation": "Historian",
},
{
"name": "Ethan Anderson",
"age": 30,
"major": "Business",
"email": "[email protected]",
"address": "123 Lemon Ln",
"city": "Houston",
"state": "TX",
"country": "USA",
"phone": "+1 345-678-9012",
"occupation": "Entrepreneur",
},
{
"name": "Isabella Carter",
"age": 28,
"major": "Mathematics",
"email": "[email protected]",
"address": "456 Grape St",
"city": "Phoenix",
"state": "AZ",
"country": "USA",
"phone": "+1 456-789-0123",
"occupation": "Mathematician",
},
{
"name": "Andrew Walker",
"age": 32,
"major": "Economics",
"email": "[email protected]",
"address": "789 Berry Ave",
"city": "Portland",
"state": "OR",
"country": "USA",
"phone": "+1 567-890-1234",
"occupation": "Economist",
},
{
"name": "Mia Evans",
"age": 29,
"major": "Political Science",
"email": "[email protected]",
"address": "234 Lime St",
"city": "Washington",
"state": "DC",
"country": "USA",
"phone": "+1 678-901-2345",
"occupation": "Political Analyst",
},
]
json_list = [ { "name": "John Doe", "age": 25, "major": "Computer Science", "email": "[email protected]", "address": "123 Main St", "city": "New York", "state": "NY", "country": "USA", "phone": "+1 123-456-7890", "occupation": "Software Engineer", }, { "name": "Jane Smith", "age": 30, "major": "Business Administration", "email": "[email protected]", "address": "456 Elm St", "city": "San Francisco", "state": "CA", "country": "USA", "phone": "+1 234-567-8901", "occupation": "Marketing Manager", }, { "name": "Michael Johnson", "age": 35, "major": "Finance", "email": "[email protected]", "address": "789 Oak Ave", "city": "Chicago", "state": "IL", "country": "USA", "phone": "+1 345-678-9012", "occupation": "Financial Analyst", }, { "name": "Emily Davis", "age": 28, "major": "Psychology", "email": "[email protected]", "address": "234 Pine St", "city": "Los Angeles", "state": "CA", "country": "USA", "phone": "+1 456-789-0123", "occupation": "Psychologist", }, { "name": "Alex Johnson", "age": 27, "major": "Engineering", "email": "[email protected]", "address": "567 Cedar Ln", "city": "Seattle", "state": "WA", "country": "USA", "phone": "+1 567-890-1234", "occupation": "Civil Engineer", }, { "name": "Jessica Williams", "age": 32, "major": "Biology", "email": "[email protected]", "address": "890 Walnut Ave", "city": "Boston", "state": "MA", "country": "USA", "phone": "+1 678-901-2345", "occupation": "Biologist", }, { "name": "Matthew Brown", "age": 26, "major": "English Literature", "email": "[email protected]", "address": "123 Peach St", "city": "Atlanta", "state": "GA", "country": "USA", "phone": "+1 789-012-3456", "occupation": "Writer", }, { "name": "Olivia Wilson", "age": 29, "major": "Art", "email": "[email protected]", "address": "456 Plum Ave", "city": "Miami", "state": "FL", "country": "USA", "phone": "+1 890-123-4567", "occupation": "Artist", }, { "name": "Daniel Thompson", "age": 31, "major": "Physics", "email": "[email protected]", "address": "789 Apple St", "city": "Denver", "state": "CO", "country": "USA", "phone": "+1 901-234-5678", "occupation": "Physicist", }, { "name": "Sophia Clark", "age": 27, "major": "Sociology", "email": "[email protected]", "address": "234 Orange Ln", "city": "Austin", "state": "TX", "country": "USA", "phone": "+1 012-345-6789", "occupation": "Social Worker", }, { "name": "Christopher Lee", "age": 33, "major": "Chemistry", "email": "[email protected]", "address": "567 Mango St", "city": "San Diego", "state": "CA", "country": "USA", "phone": "+1 123-456-7890", "occupation": "Chemist", }, { "name": "Ava Green", "age": 28, "major": "History", "email": "[email protected]", "address": "890 Cherry Ave", "city": "Philadelphia", "state": "PA", "country": "USA", "phone": "+1 234-567-8901", "occupation": "Historian", }, { "name": "Ethan Anderson", "age": 30, "major": "Business", "email": "[email protected]", "address": "123 Lemon Ln", "city": "Houston", "state": "TX", "country": "USA", "phone": "+1 345-678-9012", "occupation": "Entrepreneur", }, { "name": "Isabella Carter", "age": 28, "major": "Mathematics", "email": "[email protected]", "address": "456 Grape St", "city": "Phoenix", "state": "AZ", "country": "USA", "phone": "+1 456-789-0123", "occupation": "Mathematician", }, { "name": "Andrew Walker", "age": 32, "major": "Economics", "email": "[email protected]", "address": "789 Berry Ave", "city": "Portland", "state": "OR", "country": "USA", "phone": "+1 567-890-1234", "occupation": "Economist", }, { "name": "Mia Evans", "age": 29, "major": "Political Science", "email": "[email protected]", "address": "234 Lime St", "city": "Washington", "state": "DC", "country": "USA", "phone": "+1 678-901-2345", "occupation": "Political Analyst", }, ]
定义 LLM¶
In [ ]
已复制!
llm = OpenAI(model="gpt-3.5-turbo")
llm = OpenAI(model="gpt-3.5-turbo")
运行工作流!¶
In [ ]
已复制!
w = JSONAnalyzeQueryEngineWorkflow()
w = JSONAnalyzeQueryEngineWorkflow()
In [ ]
已复制!
# Run a query
query = "What is the maximum age among the individuals?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
# 运行一个查询 query = "这些个体中的最大年龄是多少?" result = await w.run( query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME ) display( Markdown("> 问题:{}".format(query)), Markdown("回答:{}".format(result)), )
问题:这些人的最大年龄是多少?
答案:这些人的最大年龄是 35 岁。
In [ ]
已复制!
query = "How many individuals have an occupation related to science or engineering?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
query = "有多少个体的职业与科学或工程相关?" result = await w.run( query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME ) display( Markdown("> 问题:{}".format(query)), Markdown("回答:{}".format(result)), )
问题:有多少人的职业与科学或工程相关?
答案:有 0 人的职业与科学或工程相关。
In [ ]
已复制!
query = "How many individuals have a phone number starting with '+1 234'?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
query = "有多少个体的电话号码以 '+1 234' 开头?" result = await w.run( query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME ) display( Markdown("> 问题:{}".format(query)), Markdown("回答:{}".format(result)), )
问题:有多少人的电话号码以 '+1 234' 开头?
答案:有 2 人的电话号码以 '+1 234' 开头。
In [ ]
已复制!
query = "What is the percentage of individuals residing in California (CA)?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
query = "居住在加利福尼亚州 (CA) 的个体百分比是多少?" result = await w.run( query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME ) display( Markdown("> 问题:{}".format(query)), Markdown("回答:{}".format(result)), )
问题:居住在加州 (CA) 的人占多少百分比?
答案:居住在加州 (CA) 的人占 18.75%。
In [ ]
已复制!
query = "How many individuals have a major in Psychology?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
query = "有多少个体的专业是心理学?" result = await w.run( query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME ) display( Markdown("> 问题:{}".format(query)), Markdown("回答:{}".format(result)), )
问题:有多少人的专业是心理学?
答案:有 1 人的专业是心理学。