77import json
88
99import osparc
10+ import tenacity
1011from _utils import skip_if_osparc_version
1112from httpx import AsyncClient
1213from packaging .version import Version
1314from uuid import UUID
14-
15- DEFAULT_TIMEOUT_SECONDS = 15 * 60 # 10 min
15+ import pytest
16+ from contextlib import contextmanager
17+ from typing import Callable , Iterator , Set
18+ from tenacity import Retrying
19+
20+ DEFAULT_TIMEOUT_SECONDS = 15 * 60 # 15 min
21+
22+
23+ @pytest .fixture
24+ def create_sleeper_jobs (
25+ api_client : osparc .ApiClient ,
26+ sleeper : osparc .Solver ,
27+ ) -> Callable [[int ], Iterator [Set [UUID | str ]]]:
28+ @contextmanager
29+ def sleeper_jobs (n_jobs : int = 1 ) -> Iterator [Set [UUID | str ]]:
30+ job_ids = set ()
31+ solvers_api = osparc .SolversApi (api_client = api_client )
32+ try :
33+ for _ in range (n_jobs ):
34+ job = solvers_api .create_job (
35+ sleeper .id , sleeper .version , osparc .JobInputs ({"input1" : 1.0 })
36+ )
37+ assert isinstance (job , osparc .Job )
38+ print (job .to_str ())
39+ job_ids .add (job .id )
40+ yield job_ids
41+ finally :
42+ for job_id in job_ids :
43+ for attempt in Retrying (
44+ reraise = True ,
45+ wait = tenacity .wait_fixed (2 ),
46+ stop = tenacity .stop_after_delay (60 ),
47+ ):
48+ with attempt :
49+ solvers_api .stop_job (sleeper .id , sleeper .version , job_id )
50+ solvers_api .delete_job (sleeper .id , sleeper .version , job_id )
51+
52+ return sleeper_jobs
1653
1754
1855@skip_if_osparc_version (at_least = Version ("0.8.3.post0.dev20" ))
19- def test_jobs (api_client : osparc .ApiClient , sleeper : osparc .Solver ):
56+ def test_jobs (
57+ api_client : osparc .ApiClient ,
58+ create_sleeper_jobs : Callable [[int ], Iterator [Set [UUID | str ]]],
59+ sleeper : osparc .Solver ,
60+ ):
2061 """Test the jobs method
2162
2263 Args:
@@ -30,68 +71,56 @@ def test_jobs(api_client: osparc.ApiClient, sleeper: osparc.Solver):
3071 assert n_init_iter >= 0
3172
3273 # create n_jobs jobs
33- created_job_ids = []
34- for _ in range (n_jobs ):
35- job : osparc .Job = solvers_api .create_job (
36- sleeper .id , sleeper .version , osparc .JobInputs ({"input1" : 1.0 })
37- )
38- created_job_ids .append (job .id )
39-
40- tmp_iter = solvers_api .iter_jobs (sleeper .id , sleeper .version )
41- solvers_api .iter_jobs (sleeper .id , sleeper .version )
42-
43- final_iter = solvers_api .iter_jobs (sleeper .id , sleeper .version )
44- assert len (final_iter ) > 0 , "No jobs were available"
45- assert n_init_iter + n_jobs == len (
46- final_iter
47- ), "An incorrect number of jobs was recorded"
48-
49- for ii , elm in enumerate (tmp_iter ):
50- assert isinstance (elm , osparc .Job )
51- if ii > 100 :
52- break
74+ with create_sleeper_jobs (n_jobs ):
75+ tmp_iter = solvers_api .iter_jobs (sleeper .id , sleeper .version )
76+ solvers_api .iter_jobs (sleeper .id , sleeper .version )
77+ final_iter = solvers_api .iter_jobs (sleeper .id , sleeper .version )
78+ assert len (final_iter ) > 0 , "No jobs were available"
79+ assert n_init_iter + n_jobs == len (
80+ final_iter
81+ ), "An incorrect number of jobs was recorded"
5382
54- # cleanup
55- for elm in created_job_ids :
56- solvers_api .delete_job (sleeper .id , sleeper .version , elm )
83+ for ii , elm in enumerate (tmp_iter ):
84+ assert isinstance (elm , osparc .Job )
85+ if ii > 100 :
86+ break
5787
5888
5989@skip_if_osparc_version (at_least = Version ("0.6.5" ))
6090async def test_logstreaming (
61- api_client : osparc .ApiClient , sleeper : osparc .Solver , async_client : AsyncClient
91+ api_client : osparc .ApiClient ,
92+ sleeper : osparc .Solver ,
93+ create_sleeper_jobs : Callable [[int ], Iterator [Set [UUID ]]],
94+ async_client : AsyncClient ,
6295):
6396 """Test log streaming"""
6497 solvers_api : osparc .SolversApi = osparc .SolversApi (api_client )
65- job = solvers_api .create_job (
66- sleeper .id , sleeper .version , osparc .JobInputs ({"input1" : 1.0 })
67- )
68- assert isinstance (job , osparc .Job )
69- print (job .to_str ())
70-
71- solvers_api .start_job (sleeper .id , sleeper .version , job .id )
72-
73- nloglines : int = 0
74- url = f"/v0/solvers/{ sleeper .id } /releases/{ sleeper .version } /jobs/{ job .id } /logstream"
75- print (f"starting logstreaming from { url } ..." )
76-
77- async with async_client .stream (
78- "GET" ,
79- url ,
80- timeout = DEFAULT_TIMEOUT_SECONDS ,
81- ) as response :
82- print (response .headers )
83- async for line in response .aiter_lines ():
84- log = json .loads (line )
85- job_id = log .get ("job_id" )
86- assert job_id
87- assert job_id == (
88- f"{ job .id } " if isinstance (job .id , UUID ) else job .id
89- ) # keep test backwards compatible
90- nloglines += 1
91- print ("\n " .join (log .get ("messages" )))
92- await response .aclose ()
93- break
94-
95- assert nloglines > 0 , f"Could not stream log for { sleeper .id = } , \
96- { sleeper .version = } and { job .id = } " # type: ignore
97- solvers_api .delete_job (sleeper .id , sleeper .version , job .id )
98+ with create_sleeper_jobs () as jobs :
99+ job_id = next (iter (jobs ))
100+
101+ solvers_api .start_job (sleeper .id , sleeper .version , job_id )
102+
103+ nloglines : int = 0
104+ url = f"/v0/solvers/{ sleeper .id } /releases/{ sleeper .version } /jobs/{ job_id } /logstream"
105+ print (f"starting logstreaming from { url } ..." )
106+
107+ async with async_client .stream (
108+ "GET" ,
109+ url ,
110+ timeout = DEFAULT_TIMEOUT_SECONDS ,
111+ ) as response :
112+ print (response .headers )
113+ async for line in response .aiter_lines ():
114+ log = json .loads (line )
115+ log_job_id = log .get ("job_id" )
116+ assert log_job_id
117+ assert log_job_id == (
118+ f"{ job_id } " if isinstance (job_id , UUID ) else job_id
119+ ) # keep test backwards compatible
120+ nloglines += 1
121+ print ("\n " .join (log .get ("messages" )))
122+ await response .aclose ()
123+ break
124+
125+ assert nloglines > 0 , f"Could not stream log for { sleeper .id = } , \
126+ { sleeper .version = } and { job_id = } " # type: ignore
0 commit comments