跳到主内容

重试策略

源代码位于 llama-index-core/llama_index/core/workflow/retry_policy.py

next #

决定是否应进行另一次重试,返回下次运行前等待的秒数。
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@runtime_checkable
class RetryPolicy(Protocol):
    def next(
        self, elapsed_time: float, attempts: int, error: Exception
    ) -> Optional[float]:
        """
        Decides if we should make another retry, returning the number of seconds to wait before the next run.

        Args:
            elapsed_time: Time in seconds that passed since the last attempt.
            attempts: The number of attempts done so far.
            error: The last error occurred.

        Returns:
            The amount of seconds to wait before the next attempt, or None if we stop retrying.

        """

参数

next(elapsed_time: float, attempts: int, error: Exception) -> Optional[float]

名称

类型

描述 默认值 elapsed_time float
自上次尝试以来经过的时间(秒)。 必需

attempts

int
到目前为止已进行的尝试次数。 error

Exception

int
发生的最后一个错误。 返回值

可选[float]

int

下次尝试前等待的秒数,如果停止重试则为 None。

默认值 elapsed_time
ConstantDelayRetryPolicy #

一种简单的策略,按固定间隔重试步骤多次。

决定是否应进行另一次重试,返回下次运行前等待的秒数。
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def next(
    self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:
    """
    Decides if we should make another retry, returning the number of seconds to wait before the next run.

    Args:
        elapsed_time: Time in seconds that passed since the last attempt.
        attempts: The number of attempts done so far.
        error: The last error occurred.

    Returns:
        The amount of seconds to wait before the next attempt, or None if we stop retrying.

    """

返回顶部

决定是否应进行另一次重试,返回下次运行前等待的秒数。
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class ConstantDelayRetryPolicy:
    """A simple policy that retries a step at regular intervals for a number of times."""

    def __init__(self, maximum_attempts: int = 3, delay: float = 5) -> None:
        """
        Creates a ConstantDelayRetryPolicy instance.

        Args:
            maximum_attempts: How many consecutive times the workflow should try to run the step in case of an error.
            delay: how much time in seconds must pass before another attempt.

        """
        self.maximum_attempts = maximum_attempts
        self.delay = delay

    def next(
        self, elapsed_time: float, attempts: int, error: Exception
    ) -> Optional[float]:
        if attempts >= self.maximum_attempts:
            return None

        return self.delay