This is the generic l0 processing flow.
It performs common L0 task like retrieving session from catalog an staging it from cadip if needed.
It call process_s1l0, process_s2l0 or process_s3l0
Only session parameter is mandatory.
All other parameters get their default values from Prefect variable but can be overriden on demand.
Source code in docs/rs-client-libraries/rs_workflows/on_demand/common/l0.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 | @flow(name="process level-0")
async def process_l0(
session: str,
flow_params: Level0FlowParams | None = None,
verbose: bool = False,
) -> None:
"""
This is the generic l0 processing flow.
It performs common L0 task like retrieving session from catalog an staging it from cadip if needed.
It call process_s1l0, process_s2l0 or process_s3l0
Only session parameter is mandatory.
All other parameters get their default values from Prefect variable but can be overriden on demand.
"""
logger = get_run_logger()
logger.info(f"Mode verbose is set to {verbose}")
# Check session name format
pattern = re.compile(r"^S[123]._")
if not pattern.match(session):
logger.error("❌ Bad Sentinel-1,2,3 session name.")
raise ValueError(f"Invalid session name: '{session}'")
# We detect the mission
mission: str = session[1]
logger.info(f"✔️ Sentinel-{mission} session name is correct.")
# Override of some parameters with default configuration
if flow_params is None:
flow_params = Level0FlowParams()
p: Level0FlowParams = await flow_params.resolve(mission, level="0")
flow_env = FlowEnv(FlowEnvArgs(owner_id=p.owner_identifier))
with flow_env.start_span(__name__, "level0-processing"):
found = False
# Check that the chosen dask_cluster_label is deployed
if await is_dask_cluster_running(p.dask_cluster_label) is False:
raise ValueError(f"❌ '{p.dask_cluster_label}' is unknown or not ready.")
# Try to retrieve the session on the collection
item_session: Item | None = await get_single_catalog_item(flow_env, session, [p.session_collection])
# If the session is not on the rs-catalog, we will try to stage it
if item_session:
found = True
evicted, eviction_date = is_evicted(item_session)
if evicted:
logger.error(f"❌ The session '{session}' has been evicted (eviction date = {eviction_date}) ")
raise ValueError(f"'{session}' has been evicted")
if is_published(item_session) is False:
logger.error(f"❌ The session '{session}' has not been published yet")
raise ValueError(f"'{session}' has not been publised")
else:
logger.info(f"Try to stage session {session} from {mission} stations :{p.cadip_collections}")
station = await get_cadip_station(
flow_env,
session,
p.cadip_collections,
)
if station is not None:
found = await stage_session_common(flow_env, station, session)
# The session is stagged at this step.
# We can call the flow
logger.info(f"We start Sentinel-{mission} processing.")
if found:
match int(mission):
case 1:
await process_s1l0_task(session=session, flow_params=p, verbose=verbose)
case 3:
await process_s3l0_task(session=session, flow_params=p, verbose=verbose)
|