Skip to content

rs_workflows/payload_builder.md

<< Back to index

.

TaskTableError

Bases: ValueError

Errors related to Task Table parsing/validation.

Source code in docs/rs-client-libraries/rs_workflows/payload_builder.py
28
29
class TaskTableError(ValueError):
    """Errors related to Task Table parsing/validation."""

build_cql2_json(query, values)

Recursively replaces placeholders of the form {var} in a dictionary or list using the mapping from 'values'.

Source code in docs/rs-client-libraries/rs_workflows/payload_builder.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def build_cql2_json(query: dict[str, Any], values: dict[str, Any]):
    """
    Recursively replaces placeholders of the form {var} in a dictionary or list
    using the mapping from 'values'.
    """

    def _replace(item):
        if isinstance(item, str):
            # replace if found, else keep
            match = VARIABLE_PATTERN.match(item)
            return values.get(match.group(1), item) if match else item
        if isinstance(item, list):
            return [_replace(x) for x in item]
        if isinstance(item, dict):
            return {k: _replace(v) for k, v in item.items()}
        return item

    # Work on a deep copy so we don't mutate the original
    return _replace(deepcopy(query))["stac"]

build_unit_list(tasktable, pipeline=None, unit=None, processing_mode=None, external_variables=None)

Build the list of units needed for the payload, from the tasktable given and the flow mode (pipeline or unit). If the mode is "unit", the list returned will only contain the requested unit, with its input and output linked to the external input and output. If the mode is "pipeline", the list will contain all the ordered units of the pipeline, with their inputs and outputs correctly chained if needed.

Parameters:

Name Type Description Default
tasktable dict[str, Any]

dictionary containing the content of the tasktable

required
pipeline str | None

name of the pipeline to build, if the mode is "pipeline". If "unit" is given, has to be None.

None
unit str | None

name of the unit to build, if the mode is "unit". If "pipeline" is given, has to be None.

None
processing_mode Iterable[str] | None

list of processing modes used for this flow. Optional

None
external_variables dict[str, Any] | None

dictionary of external variables values from flow's input. Optional

None
Source code in docs/rs-client-libraries/rs_workflows/payload_builder.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def build_unit_list(
    tasktable: dict[str, Any],
    pipeline: str | None = None,
    unit: str | None = None,
    processing_mode: Iterable[str] | None = None,
    external_variables: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
    """
    Build the list of units needed for the payload, from the tasktable given and the flow mode (pipeline or unit).
    If the mode is "unit", the list returned will only contain the requested unit, with its input and output linked
    to the external input and output.
    If the mode is "pipeline", the list will contain all the ordered units of the pipeline, with their inputs and
    outputs correctly chained if needed.

    Args:
        tasktable: dictionary containing the content of the tasktable
        pipeline: name of the pipeline to build, if the mode is "pipeline". If "unit" is given, has to be None.
        unit: name of the unit to build, if the mode is "unit". If "pipeline" is given, has to be None.
        processing_mode: list of processing modes used for this flow. Optional
        external_variables: dictionary of external variables values from flow's input. Optional
    """
    # Validate pipelines shape
    if not isinstance(tasktable, dict):
        raise TaskTableError(f"Task table root must be a JSON object (dict): {tasktable}")
    if "pipelines" not in tasktable or not isinstance(tasktable["pipelines"], list):
        raise TaskTableError(f"Missing or invalid 'pipelines' list in task table: {tasktable}")
    if "units" not in tasktable or not isinstance(tasktable["units"], list):
        raise TaskTableError(f"Missing or invalid 'units' list in task table: {tasktable}")
    if "io" not in tasktable or not isinstance(tasktable["io"], list):
        raise TaskTableError(f"Missing or invalid 'io' list in task table: {tasktable}")

    if pipeline and unit:
        raise TaskTableError("Provide either 'pipeline' or 'unit', not both.")
    if not pipeline and not unit:
        raise TaskTableError("One of 'pipeline' or 'unit' must be provided.")

    # Retrieve list of units from "units" field
    units_index: dict[str, dict[str, Any]] = {}
    for u in tasktable["units"]:
        if isinstance(u, dict) and isinstance(u.get("name"), str):
            units_index[u["name"]] = u
    if not units_index:
        raise TaskTableError('No valid unit entries found in "units".')

    # Retrieve details of inputs/outputs from "io" field
    io_index: dict[str, dict[str, Any]] = {}
    for io in tasktable["io"]:
        if isinstance(io, dict) and isinstance(io.get("name"), str):
            io_index[io["name"]] = io

    out_units: list[dict[str, Any]] = []

    if unit:
        # Handle only one unit
        unit_details = _build_single_unit_details(
            unit,
            units_index,
            io_index,
            processing_mode,
            external_variables,
        )
        out_units.append(unit_details)

    if pipeline:
        # Retrieve pipeline definition from the list of pipelines
        full_pipeline = next((p for p in tasktable.get("pipelines", []) if p.get("name") == pipeline), None)

        if not full_pipeline:
            raise TaskTableError(
                f"Could not find pipeline named '{pipeline}' in tasktable. "
                f"Existing pipelines: {tasktable.get("pipelines", [])}",
            )

        # Retrieve list of steps from the pipeline
        pipeline_steps = full_pipeline.get("steps")
        if not pipeline_steps or not isinstance(pipeline_steps, list):
            raise TaskTableError(f"Pipeline '{pipeline}' is missing a valid 'steps' list.")

        for step in pipeline_steps:
            # Validate step structure
            if not isinstance(step, dict):
                raise TaskTableError(
                    f"Step in pipeline '{pipeline}' doesn't have the expected format (expected JSON dict): {step}",
                )
            for field in ("unit_name", "step_id", "input_products", "output_products"):
                if field not in step:
                    raise TaskTableError(f"Step in pipeline '{pipeline}' is missing mandatory field '{field}': {step}")

            # Build unit details for this step
            unit_details = _build_single_unit_details(
                step["unit_name"],
                units_index,
                io_index,
                processing_mode,
                external_variables,
                full_pipeline=full_pipeline,
                step_id=step["step_id"],
                parameters=step.get("parameters", None),
            )

            out_units.append(unit_details)

    return out_units