跳到内容

Google

GmailToolSpec #

基类: BaseToolSpec

GMail 工具规范。

赋予代理读取、起草和发送 Gmail 消息的能力

源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
class GmailToolSpec(BaseToolSpec):
    """
    GMail tool spec.

    Gives the agent the ability to read, draft and send gmail messages

    """

    spec_functions = [
        "load_data",
        "search_messages",
        "create_draft",
        "update_draft",
        "get_draft",
        "send_draft",
    ]
    query: str = None
    use_iterative_parser: bool = False
    max_results: int = 10
    service: Any = None

    def _cache_service(self) -> None:
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        if not self.service:
            self.service = build("gmail", "v1", credentials=credentials)

    def load_data(self) -> List[Document]:
        """Load emails from the user's account."""
        self._cache_service()

        return self.search_messages()

    def _get_credentials(self) -> Any:
        """
        Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.

        """
        import os

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials
        from google_auth_oauthlib.flow import InstalledAppFlow

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def search_messages(self, query: str, max_results: Optional[int] = None):
        """
        Searches email messages given a query string and the maximum number
        of results requested by the user
           Returns: List of relevant message objects up to the maximum number of results.

        Args:
            query[str]: The user's query
            max_results (Optional[int]): The maximum number of search results
            to return.

        """
        if not max_results:
            max_results = self.max_results

        self._cache_service()

        messages = (
            self.service.users()
            .messages()
            .list(userId="me", q=query, maxResults=int(max_results))
            .execute()
            .get("messages", [])
        )

        results = []
        try:
            for message in messages:
                message_data = self.get_message_data(message)
                text = message_data.pop("body")
                extra_info = message_data
                results.append(Document(text=text, extra_info=extra_info))
        except Exception as e:
            raise Exception("Can't get message data" + str(e))

        return results

    def get_message_data(self, message):
        message_id = message["id"]
        message_data = (
            self.service.users()
            .messages()
            .get(format="raw", userId="me", id=message_id)
            .execute()
        )
        if self.use_iterative_parser:
            body = self.extract_message_body_iterative(message_data)
        else:
            body = self.extract_message_body(message_data)

        if not body:
            return None

        return {
            "id": message_data["id"],
            "threadId": message_data["threadId"],
            "snippet": message_data["snippet"],
            "body": body,
        }

    def extract_message_body_iterative(self, message: dict):
        if message["raw"]:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf8"))
            mime_msg = email.message_from_bytes(body)
        else:
            mime_msg = message

        body_text = ""
        if mime_msg.get_content_type() == "text/plain":
            plain_text = mime_msg.get_payload(decode=True)
            charset = mime_msg.get_content_charset("utf-8")
            body_text = plain_text.decode(charset).encode("utf-8").decode("utf-8")

        elif mime_msg.get_content_maintype() == "multipart":
            msg_parts = mime_msg.get_payload()
            for msg_part in msg_parts:
                body_text += self.extract_message_body_iterative(msg_part)

        return body_text

    def extract_message_body(self, message: dict):
        from bs4 import BeautifulSoup

        try:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)

            # If the message body contains HTML, parse it with BeautifulSoup
            if "text/html" in mime_msg:
                soup = BeautifulSoup(body, "html.parser")
                body = soup.get_text()
            return body.decode("utf-8")
        except Exception as e:
            raise Exception("Can't parse message body" + str(e))

    def _build_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        email_message = EmailMessage()

        email_message.set_content(message)

        email_message["To"] = to
        email_message["Subject"] = subject

        encoded_message = base64.urlsafe_b64encode(email_message.as_bytes()).decode()

        return {"message": {"raw": encoded_message}}

    def create_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        """
        Create and insert a draft email.
           Print the returned draft's message and id.
           Returns: Draft object, including draft id and message meta data.

        Args:
            to (Optional[str]): The email addresses to send the message to
            subject (Optional[str]): The subject for the event
            message (Optional[str]): The message for the event

        """
        self._cache_service()
        service = self.service

        return (
            service.users()
            .drafts()
            .create(userId="me", body=self._build_draft(to, subject, message))
            .execute()
        )

    def update_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
        draft_id: str = None,
    ) -> str:
        """
        Update a draft email.
           Print the returned draft's message and id.
           This function is required to be passed a draft_id that is obtained when creating messages
           Returns: Draft object, including draft id and message meta data.

        Args:
            to (Optional[str]): The email addresses to send the message to
            subject (Optional[str]): The subject for the event
            message (Optional[str]): The message for the event
            draft_id (str): the id of the draft to be updated

        """
        self._cache_service()
        service = self.service

        if draft_id is None:
            return (
                "You did not provide a draft id when calling this function. If you"
                " previously created or retrieved the draft, the id is available in"
                " context"
            )

        draft = self.get_draft(draft_id)
        headers = draft["message"]["payload"]["headers"]
        for header in headers:
            if header["name"] == "To" and not to:
                to = header["value"]
            elif header["name"] == "Subject" and not subject:
                subject = header["value"]

        return (
            service.users()
            .drafts()
            .update(
                userId="me", id=draft_id, body=self._build_draft(to, subject, message)
            )
            .execute()
        )

    def get_draft(self, draft_id: str = None) -> str:
        """
        Get a draft email.
           Print the returned draft's message and id.
           Returns: Draft object, including draft id and message meta data.

        Args:
            draft_id (str): the id of the draft to be updated

        """
        self._cache_service()
        service = self.service
        return service.users().drafts().get(userId="me", id=draft_id).execute()

    def send_draft(self, draft_id: str = None) -> str:
        """
        Sends a draft email.
           Print the returned draft's message and id.
           Returns: Draft object, including draft id and message meta data.

        Args:
            draft_id (str): the id of the draft to be updated

        """
        self._cache_service()
        service = self.service
        return (
            service.users().drafts().send(userId="me", body={"id": draft_id}).execute()
        )

load_data #

load_data() -> List[Document]

从用户账户加载电子邮件。

源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
45
46
47
48
49
def load_data(self) -> List[Document]:
    """Load emails from the user's account."""
    self._cache_service()

    return self.search_messages()

search_messages #

search_messages(query: str, max_results: Optional[int] = None)

根据查询字符串和用户请求的最大结果数搜索电子邮件。返回:最多包含最大结果数的相关消息对象列表。

参数

名称 类型 描述 默认值
query[str]

用户查询

必需
max_results Optional[int]

最大搜索结果数

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def search_messages(self, query: str, max_results: Optional[int] = None):
    """
    Searches email messages given a query string and the maximum number
    of results requested by the user
       Returns: List of relevant message objects up to the maximum number of results.

    Args:
        query[str]: The user's query
        max_results (Optional[int]): The maximum number of search results
        to return.

    """
    if not max_results:
        max_results = self.max_results

    self._cache_service()

    messages = (
        self.service.users()
        .messages()
        .list(userId="me", q=query, maxResults=int(max_results))
        .execute()
        .get("messages", [])
    )

    results = []
    try:
        for message in messages:
            message_data = self.get_message_data(message)
            text = message_data.pop("body")
            extra_info = message_data
            results.append(Document(text=text, extra_info=extra_info))
    except Exception as e:
        raise Exception("Can't get message data" + str(e))

    return results

create_draft #

create_draft(to: Optional[List[str]] = None, subject: Optional[str] = None, message: Optional[str] = None) -> str

创建并插入一封草稿电子邮件。打印返回的草稿消息和ID。返回:草稿对象,包括草稿ID和消息元数据。

参数

名称 类型 描述 默认值
to Optional[str]

收件人电子邮件地址

None
subject Optional[str]

电子邮件主题

None
message Optional[str]

电子邮件正文

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_draft(
    self,
    to: Optional[List[str]] = None,
    subject: Optional[str] = None,
    message: Optional[str] = None,
) -> str:
    """
    Create and insert a draft email.
       Print the returned draft's message and id.
       Returns: Draft object, including draft id and message meta data.

    Args:
        to (Optional[str]): The email addresses to send the message to
        subject (Optional[str]): The subject for the event
        message (Optional[str]): The message for the event

    """
    self._cache_service()
    service = self.service

    return (
        service.users()
        .drafts()
        .create(userId="me", body=self._build_draft(to, subject, message))
        .execute()
    )

update_draft #

update_draft(to: Optional[List[str]] = None, subject: Optional[str] = None, message: Optional[str] = None, draft_id: str = None) -> str

更新草稿电子邮件。打印返回的草稿消息和ID。此函数需要传入创建消息时获得的draft_id。返回:草稿对象,包括草稿ID和消息元数据。

参数

名称 类型 描述 默认值
to Optional[str]

收件人电子邮件地址

None
subject Optional[str]

电子邮件主题

None
message Optional[str]

电子邮件正文

None
draft_id str

要更新的草稿ID

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def update_draft(
    self,
    to: Optional[List[str]] = None,
    subject: Optional[str] = None,
    message: Optional[str] = None,
    draft_id: str = None,
) -> str:
    """
    Update a draft email.
       Print the returned draft's message and id.
       This function is required to be passed a draft_id that is obtained when creating messages
       Returns: Draft object, including draft id and message meta data.

    Args:
        to (Optional[str]): The email addresses to send the message to
        subject (Optional[str]): The subject for the event
        message (Optional[str]): The message for the event
        draft_id (str): the id of the draft to be updated

    """
    self._cache_service()
    service = self.service

    if draft_id is None:
        return (
            "You did not provide a draft id when calling this function. If you"
            " previously created or retrieved the draft, the id is available in"
            " context"
        )

    draft = self.get_draft(draft_id)
    headers = draft["message"]["payload"]["headers"]
    for header in headers:
        if header["name"] == "To" and not to:
            to = header["value"]
        elif header["name"] == "Subject" and not subject:
            subject = header["value"]

    return (
        service.users()
        .drafts()
        .update(
            userId="me", id=draft_id, body=self._build_draft(to, subject, message)
        )
        .execute()
    )

get_draft #

get_draft(draft_id: str = None) -> str

获取草稿电子邮件。打印返回的草稿消息和ID。返回:草稿对象,包括草稿ID和消息元数据。

参数

名称 类型 描述 默认值
draft_id str

要更新的草稿ID

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_draft(self, draft_id: str = None) -> str:
    """
    Get a draft email.
       Print the returned draft's message and id.
       Returns: Draft object, including draft id and message meta data.

    Args:
        draft_id (str): the id of the draft to be updated

    """
    self._cache_service()
    service = self.service
    return service.users().drafts().get(userId="me", id=draft_id).execute()

send_draft #

send_draft(draft_id: str = None) -> str

发送草稿电子邮件。打印返回的草稿消息和ID。返回:草稿对象,包括草稿ID和消息元数据。

参数

名称 类型 描述 默认值
draft_id str

要更新的草稿ID

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def send_draft(self, draft_id: str = None) -> str:
    """
    Sends a draft email.
       Print the returned draft's message and id.
       Returns: Draft object, including draft id and message meta data.

    Args:
        draft_id (str): the id of the draft to be updated

    """
    self._cache_service()
    service = self.service
    return (
        service.users().drafts().send(userId="me", body={"id": draft_id}).execute()
    )

GoogleCalendarToolSpec #

基类: BaseToolSpec

Google Calendar 工具规范。

目前只是数据加载器的简单包装。TODO:向Google Calendar规范添加更多方法。

源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class GoogleCalendarToolSpec(BaseToolSpec):
    """
    Google Calendar tool spec.

    Currently a simple wrapper around the data loader.
    TODO: add more methods to the Google Calendar spec.

    """

    spec_functions = ["load_data", "create_event", "get_date"]

    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """
        Load data from user's calendar.

        Args:
            number_of_results (Optional[int]): the number of events to return. Defaults to 100.
            start_date (Optional[Union[str, datetime.date]]): the start date to return events from in date isoformat. Defaults to today.

        """
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

    def _get_credentials(self) -> Any:
        """
        Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.

        """
        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def create_event(
        self,
        title: Optional[str] = None,
        description: Optional[str] = None,
        location: Optional[str] = None,
        start_datetime: Optional[Union[str, datetime.datetime]] = None,
        end_datetime: Optional[Union[str, datetime.datetime]] = None,
        attendees: Optional[List[str]] = None,
    ) -> str:
        """
            Create an event on the users calendar.

        Args:
            title (Optional[str]): The title for the event
            description (Optional[str]): The description for the event
            location (Optional[str]): The location for the event
            start_datetime Optional[Union[str, datetime.datetime]]: The start datetime for the event
            end_datetime Optional[Union[str, datetime.datetime]]: The end datetime for the event
            attendees Optional[List[str]]: A list of email address to invite to the event

        """
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        attendees_list = (
            [{"email": attendee} for attendee in attendees] if attendees else []
        )

        start_time = (
            datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )
        end_time = (
            datetime.datetime.strptime(end_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )

        event = {
            "summary": title,
            "location": location,
            "description": description,
            "start": {
                "dateTime": start_time,
            },
            "end": {
                "dateTime": end_time,
            },
            "attendees": attendees_list,
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return (
            "Your calendar event has been created successfully! You can move on to the"
            " next step."
        )

    def get_date(self):
        """
        A function to return todays date. Call this before any other functions if you are unaware of the date.
        """
        return datetime.date.today()

load_data #

load_data(number_of_results: Optional[int] = 100, start_date: Optional[Union[str, date]] = None) -> List[Document]

从用户日历加载数据。

参数

名称 类型 描述 默认值
number_of_results Optional[int]

要返回的事件数量。默认为100。

100
start_date Optional[Union[str, date]]

以日期 ISO 格式返回事件的开始日期。默认为今天。

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def load_data(
    self,
    number_of_results: Optional[int] = 100,
    start_date: Optional[Union[str, datetime.date]] = None,
) -> List[Document]:
    """
    Load data from user's calendar.

    Args:
        number_of_results (Optional[int]): the number of events to return. Defaults to 100.
        start_date (Optional[Union[str, datetime.date]]): the start date to return events from in date isoformat. Defaults to today.

    """
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    service = build("calendar", "v3", credentials=credentials)

    if start_date is None:
        start_date = datetime.date.today()
    elif isinstance(start_date, str):
        start_date = datetime.date.fromisoformat(start_date)

    start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
    start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

    events_result = (
        service.events()
        .list(
            calendarId="primary",
            timeMin=start_datetime_utc,
            maxResults=number_of_results,
            singleEvents=True,
            orderBy="startTime",
        )
        .execute()
    )

    events = events_result.get("items", [])

    if not events:
        return []

    results = []
    for event in events:
        if "dateTime" in event["start"]:
            start_time = event["start"]["dateTime"]
        else:
            start_time = event["start"]["date"]

        if "dateTime" in event["end"]:
            end_time = event["end"]["dateTime"]
        else:
            end_time = event["end"]["date"]

        event_string = f"Status: {event['status']}, "
        event_string += f"Summary: {event['summary']}, "
        event_string += f"Start time: {start_time}, "
        event_string += f"End time: {end_time}, "

        organizer = event.get("organizer", {})
        display_name = organizer.get("displayName", "N/A")
        email = organizer.get("email", "N/A")
        if display_name != "N/A":
            event_string += f"Organizer: {display_name} ({email})"
        else:
            event_string += f"Organizer: {email}"

        results.append(Document(text=event_string))

    return results

create_event #

create_event(title: Optional[str] = None, description: Optional[str] = None, location: Optional[str] = None, start_datetime: Optional[Union[str, datetime]] = None, end_datetime: Optional[Union[str, datetime]] = None, attendees: Optional[List[str]] = None) -> str
Create an event on the users calendar.

参数

名称 类型 描述 默认值
title Optional[str]

事件标题

None
description Optional[str]

事件描述

None
location Optional[str]

事件地点

None
start_datetime Optional[Union[str, datetime]]

事件开始日期时间

None
end_datetime Optional[Union[str, datetime]]

事件结束日期时间

None
attendees Optional[List[str]]

要邀请参加事件的电子邮件地址列表

None
源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def create_event(
    self,
    title: Optional[str] = None,
    description: Optional[str] = None,
    location: Optional[str] = None,
    start_datetime: Optional[Union[str, datetime.datetime]] = None,
    end_datetime: Optional[Union[str, datetime.datetime]] = None,
    attendees: Optional[List[str]] = None,
) -> str:
    """
        Create an event on the users calendar.

    Args:
        title (Optional[str]): The title for the event
        description (Optional[str]): The description for the event
        location (Optional[str]): The location for the event
        start_datetime Optional[Union[str, datetime.datetime]]: The start datetime for the event
        end_datetime Optional[Union[str, datetime.datetime]]: The end datetime for the event
        attendees Optional[List[str]]: A list of email address to invite to the event

    """
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    service = build("calendar", "v3", credentials=credentials)

    attendees_list = (
        [{"email": attendee} for attendee in attendees] if attendees else []
    )

    start_time = (
        datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
        .astimezone()
        .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    )
    end_time = (
        datetime.datetime.strptime(end_datetime, "%Y-%m-%dT%H:%M:%S%z")
        .astimezone()
        .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    )

    event = {
        "summary": title,
        "location": location,
        "description": description,
        "start": {
            "dateTime": start_time,
        },
        "end": {
            "dateTime": end_time,
        },
        "attendees": attendees_list,
    }
    event = service.events().insert(calendarId="primary", body=event).execute()
    return (
        "Your calendar event has been created successfully! You can move on to the"
        " next step."
    )

get_date #

get_date()

返回今天日期的函数。如果您不知道日期,请在调用其他函数之前调用此函数。

源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
204
205
206
207
208
def get_date(self):
    """
    A function to return todays date. Call this before any other functions if you are unaware of the date.
    """
    return datetime.date.today()

GoogleSearchToolSpec #

基类: BaseToolSpec

Google Search 工具规范。

源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/search/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class GoogleSearchToolSpec(BaseToolSpec):
    """Google Search tool spec."""

    spec_functions = ["google_search"]

    def __init__(self, key: str, engine: str, num: Optional[int] = None) -> None:
        """Initialize with parameters."""
        self.key = key
        self.engine = engine
        self.num = num

    def google_search(self, query: str):
        """
        Make a query to the Google search engine to receive a list of results.

        Args:
            query (str): The query to be passed to Google search.
            num (int, optional): The number of search results to return. Defaults to None.

        Raises:
            ValueError: If the 'num' is not an integer between 1 and 10.

        """
        url = QUERY_URL_TMPL.format(
            key=self.key, engine=self.engine, query=urllib.parse.quote_plus(query)
        )

        if self.num is not None:
            if not 1 <= self.num <= 10:
                raise ValueError("num should be an integer between 1 and 10, inclusive")
            url += f"&num={self.num}"

        response = requests.get(url)
        return [Document(text=response.text)]
google_search(query: str)

向 Google 搜索引擎发出查询以接收结果列表。

参数

名称 类型 描述 默认值
query str

要传递给 Google 搜索的查询。

必需
num int

要返回的搜索结果数量。默认为 None。

必需

抛出

类型 描述
ValueError

如果 'num' 不是介于 1 和 10 之间的整数。

源代码位于 llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/search/base.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def google_search(self, query: str):
    """
    Make a query to the Google search engine to receive a list of results.

    Args:
        query (str): The query to be passed to Google search.
        num (int, optional): The number of search results to return. Defaults to None.

    Raises:
        ValueError: If the 'num' is not an integer between 1 and 10.

    """
    url = QUERY_URL_TMPL.format(
        key=self.key, engine=self.engine, query=urllib.parse.quote_plus(query)
    )

    if self.num is not None:
        if not 1 <= self.num <= 10:
            raise ValueError("num should be an integer between 1 and 10, inclusive")
        url += f"&num={self.num}"

    response = requests.get(url)
    return [Document(text=response.text)]