1
+ import json
1
2
from pathlib import Path
3
+ from typing import Dict
2
4
3
5
import pytest
4
6
from _pytest import fixtures
7
+ from filelock import FileLock
5
8
6
9
from tests .e2e .metrics .infrastructure import MetricsStack
7
10
8
11
9
- @pytest .fixture (autouse = True )
10
- def infrastructure (request : fixtures .SubRequest ) -> MetricsStack :
12
+ @pytest .fixture (autouse = True , scope = "module" )
13
+ def infrastructure (request : fixtures .SubRequest , tmp_path_factory : pytest . TempPathFactory , worker_id ) -> MetricsStack :
11
14
"""Setup and teardown logic for E2E test infrastructure
12
15
13
16
Parameters
@@ -25,9 +28,25 @@ def infrastructure(request: fixtures.SubRequest) -> MetricsStack:
25
28
Iterator[MetricsStack]
26
29
Deployed Infrastructure
27
30
"""
31
+ stack = MetricsStack (handlers_dir = Path (f"{ request .fspath .dirname } /handlers" ))
28
32
try :
29
- stack = MetricsStack (handlers_dir = Path (f"{ request .fspath .dirname } /handlers" ))
30
- stack .deploy ()
31
- yield stack
33
+ if worker_id == "master" :
34
+ # no parallelization, deploy stack and let fixture be cached
35
+ yield stack .deploy ()
36
+
37
+ # tmp dir shared by all workers
38
+ root_tmp_dir = tmp_path_factory .getbasetemp ().parent
39
+
40
+ cache = root_tmp_dir / "cache.json"
41
+ with FileLock (f"{ cache } .lock" ):
42
+ # If cache exists, return stack outputs back
43
+ # otherwise it's the first run by the main worker
44
+ # deploy and return stack outputs so subsequent workers can reuse
45
+ if cache .is_file ():
46
+ stack_outputs = json .loads (cache .read_text ())
47
+ else :
48
+ stack_outputs : Dict = stack .deploy ()
49
+ cache .write_text (json .dumps (stack_outputs ))
50
+ yield stack_outputs
32
51
finally :
33
52
stack .delete ()
0 commit comments