async def process_l0_last_steps(
mission: str,
session: str,
flow_params: Level0FlowParams,
input_products: list[FlowInputProduct],
verbose: bool,
):
"""
Final processing steps that are common to all missions.
Raises:
ValueError: _description_
"""
logger = get_run_logger()
logger.info(f"Mode verbose is set to {verbose}")
# Resolve parameters
flow_params = flow_params or Level0FlowParams()
p = await flow_params.resolve(mission=mission, level="0")
flow_env = FlowEnv(FlowEnvArgs(owner_id=p.owner_identifier))
with flow_env.start_span(__name__, f"sentinel{mission}-level0-processing"):
item_session: Item | None = await get_single_catalog_item(flow_env, session, [p.session_collection])
if not item_session:
logger.error("❌ The processing cannot be launched.")
return
# Satellite identifier
satellite_identifier = f"sentinel-{mission}{session[:3].lower()}"
# Published date
published = item_session.properties.get("published")
if not isinstance(published, str):
raise ValueError("Missing or invalid 'published' property in item_session")
end_datetime = datetime.fromisoformat(published)
start_datetime = end_datetime
# Generated products
generated_product = (
p.generated_product_to_collection_identifier
if p.generated_product_to_collection_identifier is not None
else []
)
# Auxiliary products
aux_product = (
p.auxiliary_product_to_collection_identifier
if p.auxiliary_product_to_collection_identifier is not None
else []
)
# Call DPR flow
await call_dpr_flow(
FlowEnvArgs(owner_id=p.owner_identifier),
input_products=input_products,
start_datetime=start_datetime,
end_datetime=end_datetime,
satellite_identifier=satellite_identifier,
dask_cluster_label=p.dask_cluster_label,
processor_name=p.processor_name,
processor_version=p.processor_version,
pipeline=p.pipeline,
unit=p.unit,
priority=p.priority,
processing_mode=p.processing_mode,
workflow=p.workflow,
generated_product_to_collection_identifier=generated_product,
auxiliary_product_to_collection_identifier=aux_product,
)