step(*args: Any, workflow: Optional[Type[Workflow]] = None, pass_context: bool = False, num_workers: int = 4, retry_policy: Optional[RetryPolicy] = None) -> Callable
用于将方法和函数标记为工作流步骤的装饰器。
装饰器在导入时进行评估,但我们需要等到运行时才启动通信通道。因此,我们将此步骤将消耗的事件列表临时存储在函数对象本身中。
参数
名称 |
类型 |
描述 |
默认值 |
workflow
|
可选[类型[Workflow]]
|
将被装饰的步骤添加到其中的工作流类。仅在使用装饰器修饰自由函数而非类方法时需要。
|
无
|
num_workers
|
int
|
将处理此装饰步骤事件的工作程序数量。默认值在大多数情况下都有效。
|
4
|
retry_policy
|
可选[RetryPolicy]
|
|
无
|
源代码位于 llama-index-core/llama_index/core/workflow/decorators.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 | def step(
*args: Any,
workflow: Optional[Type["Workflow"]] = None,
pass_context: bool = False,
num_workers: int = 4,
retry_policy: Optional[RetryPolicy] = None,
) -> Callable:
"""
Decorator used to mark methods and functions as workflow steps.
Decorators are evaluated at import time, but we need to wait for
starting the communication channels until runtime. For this reason,
we temporarily store the list of events that will be consumed by this
step in the function object itself.
Args:
workflow: Workflow class to which the decorated step will be added. Only needed when using the
decorator on free functions instead of class methods.
num_workers: The number of workers that will process events for the decorated step. The default
value works most of the times.
retry_policy: The policy used to retry a step that encountered an error while running.
"""
def decorator(func: Callable) -> Callable:
if not isinstance(num_workers, int) or num_workers <= 0:
raise WorkflowValidationError(
"num_workers must be an integer greater than 0"
)
# This will raise providing a message with the specific validation failure
spec = inspect_signature(func)
validate_step_signature(spec)
event_name, accepted_events = next(iter(spec.accepted_events.items()))
# store the configuration in the function object
func.__step_config = StepConfig( # type: ignore[attr-defined]
accepted_events=accepted_events,
event_name=event_name,
return_types=spec.return_types,
context_parameter=spec.context_parameter,
num_workers=num_workers,
requested_services=spec.requested_services or [],
retry_policy=retry_policy,
)
# If this is a free function, call add_step() explicitly.
if is_free_function(func.__qualname__):
if workflow is None:
msg = f"To decorate {func.__name__} please pass a workflow class to the @step decorator."
raise WorkflowValidationError(msg)
workflow.add_step(func)
return func
if len(args):
# The decorator was used without parentheses, like `@step`
func = args[0]
decorator(func)
return func
return decorator
|