Skip to content

Commit 2c79c8f

Browse files
committed
feat!: replace decorators with middlewares
1 parent e3939f6 commit 2c79c8f

25 files changed

Lines changed: 497 additions & 625 deletions

poetry.lock

Lines changed: 23 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pytest = ">=7.4,<9.0"
3636
pytest-asyncio = "^0.21.1"
3737
pytest-grpc = "^0.8.0"
3838
pytest-mock = "^3.11.1"
39+
pytest-cov = "^5.0.0"
3940
pylint = ">=2.17.5,<4.0.0"
4041
black = "^25.0.0"
4142
mypy = "^1.10.0"
@@ -93,7 +94,8 @@ markers = [
9394

9495
[tool.coverage.run]
9596
omit = [
96-
"pyzeebe/proto/*"
97+
"pyzeebe/proto/*",
98+
"tests/*",
9799
]
98100

99101
[tool.ruff]

pyzeebe/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
from pyzeebe.client.sync_client import SyncZeebeClient
1010
from pyzeebe.job.job import Job, JobController
1111
from pyzeebe.job.job_status import JobStatus
12+
from pyzeebe.middlewares import BaseMiddleware
1213
from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler
1314
from pyzeebe.task.task_config import TaskConfig
14-
from pyzeebe.task.types import TaskDecorator
1515
from pyzeebe.worker.task_router import ZeebeTaskRouter
1616
from pyzeebe.worker.worker import ZeebeWorker
1717

@@ -28,8 +28,8 @@
2828
"JobStatus",
2929
"ExceptionHandler",
3030
"TaskConfig",
31-
"TaskDecorator",
3231
"ZeebeTaskRouter",
3332
"default_exception_handler",
3433
"ZeebeWorker",
34+
"BaseMiddleware",
3535
)

pyzeebe/job/job.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from dataclasses import dataclass
4-
from typing import TYPE_CHECKING, Any
4+
from typing import TYPE_CHECKING
55

66
from pyzeebe.job.job_status import JobStatus
77
from pyzeebe.types import Headers, Variables
@@ -27,9 +27,9 @@ class Job:
2727
variables: Variables
2828
tenant_id: str | None = None
2929
status: JobStatus = JobStatus.Running
30-
task_result = None
30+
task_result: Variables | None = None
3131

32-
def set_task_result(self, task_result: Any) -> None:
32+
def set_task_result(self, task_result: Variables) -> None:
3333
object.__setattr__(self, "task_result", task_result)
3434

3535
def _set_status(self, value: JobStatus) -> None:
@@ -46,11 +46,9 @@ def __init__(self, job: Job, zeebe_adapter: ZeebeAdapter) -> None:
4646
self._job = job
4747
self._zeebe_adapter = zeebe_adapter
4848

49-
async def set_running_after_decorators_status(self) -> None:
50-
"""
51-
RunningAfterDecorators status means that the task has been completed as intended and the after decorators will now run.
52-
"""
53-
self._job._set_status(JobStatus.RunningAfterDecorators)
49+
@property
50+
def job(self) -> Job:
51+
return self._job
5452

5553
async def set_success_status(self, variables: Variables | None = None) -> None:
5654
"""

pyzeebe/job/job_status.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,5 @@
44
class JobStatus(Enum):
55
Running = "Running"
66
Completed = "Completed"
7-
RunningAfterDecorators = "RunningAfterDecorators"
87
Failed = "Failed"
98
ErrorThrown = "ErrorThrown"

pyzeebe/middlewares/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .base import BaseMiddleware as BaseMiddleware
2+
from .base import ExceptionMiddleware as ExceptionMiddleware
3+
from .types import ConsumeMiddlewareStack as ConsumeMiddlewareStack
4+
from .types import ExecuteMiddlewareStack as ExecuteMiddlewareStack

pyzeebe/middlewares/base.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import Mapping
5+
6+
from pyzeebe.job.job import Job, JobController
7+
from pyzeebe.task.exception_handler import ExceptionHandler
8+
9+
from .types import ConsumeMiddlewareStack, ExecuteMiddlewareStack
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class BaseMiddleware:
15+
async def consume_scope(self, call_next: ConsumeMiddlewareStack, job: Job) -> Job:
16+
return await call_next(job)
17+
18+
async def execute_scope(self, call_next: ExecuteMiddlewareStack, job: Job, job_controller: JobController) -> Job:
19+
return await call_next(job, job_controller)
20+
21+
22+
class ExceptionMiddleware(BaseMiddleware):
23+
def __init__(self, exception_handlers: Mapping[type[Exception], ExceptionHandler]) -> None:
24+
assert Exception in exception_handlers
25+
26+
self.exception_handlers = exception_handlers
27+
28+
async def execute_scope(self, call_next: ExecuteMiddlewareStack, job: Job, job_controller: JobController) -> Job:
29+
try:
30+
job = await call_next(job, job_controller)
31+
except Exception as err:
32+
handler = self.exception_handlers.get(type(err), self.exception_handlers[Exception])
33+
await handler(err, job, job_controller)
34+
else:
35+
await job_controller.set_success_status(job.task_result)
36+
37+
return job

pyzeebe/middlewares/types.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from collections.abc import Awaitable, Callable
2+
3+
from pyzeebe.job.job import Job, JobController
4+
5+
ConsumeMiddlewareStack = Callable[[Job], Awaitable[Job]]
6+
ExecuteMiddlewareStack = Callable[[Job, JobController], Awaitable[Job]]

pyzeebe/task/task.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
from __future__ import annotations
22

3+
from dataclasses import dataclass
34
from typing import Any
45

56
from pyzeebe.function_tools import Function
67
from pyzeebe.task.task_config import TaskConfig
78
from pyzeebe.task.types import JobHandler
89

910

11+
@dataclass()
1012
class Task:
11-
def __init__(self, original_function: Function[..., Any], job_handler: JobHandler, config: TaskConfig) -> None:
12-
self.original_function = original_function
13-
self.job_handler = job_handler
14-
self.config = config
13+
original_function: Function[..., Any]
14+
job_handler: JobHandler
15+
config: TaskConfig
1516

1617
@property
1718
def type(self) -> str:
1819
return self.config.type
19-
20-
def __repr__(self) -> str:
21-
return (
22-
f"Task(config= {self.config}, original_function={self.original_function}, "
23-
f"job_handler={self.job_handler})"
24-
)

pyzeebe/task/task_builder.py

Lines changed: 21 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import functools
44
import inspect
55
import logging
6-
from collections.abc import Sequence
76
from typing import Any, TypeVar
87

98
from typing_extensions import ParamSpec
@@ -14,10 +13,9 @@
1413
from pyzeebe.function_tools.dict_tools import convert_to_dict_function
1514
from pyzeebe.function_tools.parameter_tools import get_job_parameter_name
1615
from pyzeebe.job.job import JobController
17-
from pyzeebe.task.exception_handler import default_exception_handler
1816
from pyzeebe.task.task import Task
1917
from pyzeebe.task.task_config import TaskConfig
20-
from pyzeebe.task.types import AsyncTaskDecorator, DecoratorRunner, JobHandler
18+
from pyzeebe.task.types import JobHandler
2119
from pyzeebe.types import Variables
2220

2321
P = ParamSpec("P")
@@ -34,20 +32,10 @@ def build_task(task_function: Function[..., Any], task_config: TaskConfig) -> Ta
3432
def build_job_handler(task_function: Function[..., Any], task_config: TaskConfig) -> JobHandler:
3533
prepared_task_function = prepare_task_function(task_function, task_config)
3634

37-
before_decorator_runner = create_decorator_runner(task_config.before)
38-
after_decorator_runner = create_decorator_runner(task_config.after)
39-
4035
@functools.wraps(task_function)
4136
async def job_handler(job: Job, job_controller: JobController) -> Job:
42-
job = await before_decorator_runner(job)
43-
return_variables, succeeded = await run_original_task_function(
44-
prepared_task_function, task_config, job, job_controller
45-
)
37+
return_variables = await run_original_task_function(prepared_task_function, task_config, job)
4638
job.set_task_result(return_variables)
47-
await job_controller.set_running_after_decorators_status()
48-
job = await after_decorator_runner(job)
49-
if succeeded:
50-
await job_controller.set_success_status(variables=return_variables)
5139
return job
5240

5341
return job_handler
@@ -63,38 +51,30 @@ def prepare_task_function(task_function: Function[P, R], task_config: TaskConfig
6351
return task_function # type: ignore[return-value]
6452

6553

66-
async def run_original_task_function(
67-
task_function: DictFunction[...], task_config: TaskConfig, job: Job, job_controller: JobController
68-
) -> tuple[Variables, bool]:
69-
try:
70-
if task_config.variables_to_fetch is None:
71-
variables: dict[str, Any] = {}
72-
elif task_wants_all_variables(task_config):
73-
if only_job_is_required_in_task_function(task_function):
74-
variables = {}
75-
else:
76-
variables = {**job.variables}
54+
async def run_original_task_function(task_function: DictFunction[...], task_config: TaskConfig, job: Job) -> Variables:
55+
if task_config.variables_to_fetch is None:
56+
variables: dict[str, Any] = {}
57+
elif task_wants_all_variables(task_config):
58+
if only_job_is_required_in_task_function(task_function):
59+
variables = {}
7760
else:
78-
variables = {
79-
k: v
80-
for k, v in job.variables.items()
81-
if k in task_config.variables_to_fetch or k == task_config.job_parameter_name
82-
}
61+
variables = {**job.variables}
62+
else:
63+
variables = {
64+
k: v
65+
for k, v in job.variables.items()
66+
if k in task_config.variables_to_fetch or k == task_config.job_parameter_name
67+
}
8368

84-
if task_config.job_parameter_name:
85-
variables[task_config.job_parameter_name] = job
69+
if task_config.job_parameter_name:
70+
variables[task_config.job_parameter_name] = job
8671

87-
returned_value = await task_function(**variables)
72+
returned_value = await task_function(**variables)
8873

89-
if returned_value is None:
90-
returned_value = {}
74+
if returned_value is None:
75+
returned_value = {}
9176

92-
return returned_value, True
93-
except Exception as e:
94-
logger.debug("Failed job: %s. Error: %s.", job, e)
95-
exception_handler = task_config.exception_handler or default_exception_handler
96-
await exception_handler(e, job, job_controller)
97-
return job.variables, False
77+
return returned_value
9878

9979

10080
def only_job_is_required_in_task_function(task_function: DictFunction[...]) -> bool:
@@ -104,20 +84,3 @@ def only_job_is_required_in_task_function(task_function: DictFunction[...]) -> b
10484

10585
def task_wants_all_variables(task_config: TaskConfig) -> bool:
10686
return task_config.variables_to_fetch == []
107-
108-
109-
def create_decorator_runner(decorators: Sequence[AsyncTaskDecorator]) -> DecoratorRunner:
110-
async def decorator_runner(job: Job) -> Job:
111-
for decorator in decorators:
112-
job = await run_decorator(decorator, job)
113-
return job
114-
115-
return decorator_runner
116-
117-
118-
async def run_decorator(decorator: AsyncTaskDecorator, job: Job) -> Job:
119-
try:
120-
return await decorator(job)
121-
except Exception as e:
122-
logger.warning("Failed to run decorator %s. Exception: %s", decorator, e, exc_info=True)
123-
return job

0 commit comments

Comments
 (0)