Skip to content

rs_workflows/payload_builder.md

<< Back to index

.

TaskTableError

Bases: ValueError

Errors related to Task Table parsing/validation.

Source code in docs/rs-client-libraries/rs_workflows/payload_builder.py
27
28
class TaskTableError(ValueError):
    """Errors related to Task Table parsing/validation."""

build_cql2_json(task_table, query_name, values)

Recursively replaces placeholders of the form {var} in a dictionary or list using the mapping from 'values'.

Source code in docs/rs-client-libraries/rs_workflows/payload_builder.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def build_cql2_json(task_table, query_name, values):
    """
    Recursively replaces placeholders of the form {var} in a dictionary or list
    using the mapping from 'values'.
    """
    template = {}
    for cql_filter in task_table["queries"]:
        if cql_filter["name"] == query_name:
            # Work on a deep copy so we don't mutate the original
            template = deepcopy(cql_filter)
    pattern = re.compile(r"^{(.*)}$")  # matches exactly "{var}" (whole string)

    def _replace(item):
        if isinstance(item, str):
            match = pattern.match(item)
            if match:
                key = match.group(1)
                return values.get(key, item)  # replace if found, else keep
            return item
        if isinstance(item, list):
            return [_replace(x) for x in item]
        if isinstance(item, dict):
            return {k: _replace(v) for k, v in item.items()}
        return item

    return _replace(template)

build_unit_list(tasktable, pipeline=None, unit=None, processing_mode=None, *, start_datetime=None, end_datetime=None)

STEP 1: Build the list of processing units from the Task Table.

Source code in docs/rs-client-libraries/rs_workflows/payload_builder.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def build_unit_list(
    tasktable: dict[str, Any],
    pipeline: str | None = None,
    unit: str | None = None,
    processing_mode: Iterable[str] | None = None,
    *,
    start_datetime: datetime | None = None,
    end_datetime: datetime | None = None,
) -> dict[str, Any]:
    """
    STEP 1: Build the list of processing units from the Task Table.
    """
    # Validate pipelines shape
    if not isinstance(tasktable, dict):
        raise TaskTableError("Task table root must be a JSON object (dict).")
    if "pipelines" not in tasktable or not isinstance(tasktable["pipelines"], list):
        raise TaskTableError('Missing or invalid "pipelines" list in task table.')
    if "units" not in tasktable or not isinstance(tasktable["units"], list):
        raise TaskTableError('Missing or invalid "units" list in task table.')
    if "io" not in tasktable or not isinstance(tasktable["io"], list):
        raise TaskTableError('Missing or invalid "io" list in task table.')

    # Build indices for quick lookup
    units_index: dict[str, dict[str, Any]] = {}
    for u in tasktable["units"]:
        if isinstance(u, dict) and isinstance(u.get("name"), str):
            units_index[u["name"]] = u
    if not units_index:
        raise TaskTableError('No valid unit entries found in "units".')

    io_index: dict[str, dict[str, Any]] = {}
    for io in tasktable["io"]:
        if isinstance(io, dict) and isinstance(io.get("name"), str):
            io_index[io["name"]] = io

    if pipeline and unit:
        raise TaskTableError('Provide either "pipeline" or "unit", not both.')
    if not pipeline and not unit:
        raise TaskTableError('One of "pipeline" or "unit" must be provided.')

    # Select unit names from pipeline or explicit unit
    if unit:
        unit_names = [unit]
        if unit not in units_index:
            units = list(units_index)
            available = ", ".join(f"'{u}'" for u in units)
            raise TaskTableError(f'Unit "{unit}" not found in "units". Available units: {available}.')
    else:
        unit_names = _select_unit_names(tasktable, pipeline=pipeline)

    # pipeline provided: per-step origin maps for each unit in the selected pipeline.
    origin_map_by_unit: dict[str, dict[str, dict[str, Any]]] = {}
    if pipeline:
        pl = next((p for p in tasktable.get("pipelines", []) if p.get("name") == pipeline), None)
        steps = pl.get("steps", []) if pl else []
        for s in steps:
            if not isinstance(s, dict):
                continue
            uname = s.get("unit_name")
            if not uname:
                continue
            origin_map_by_unit[uname] = {
                "in": s.get("input_products", {}) or {},
                "out": s.get("output_products", {}) or {},
            }
    else:
        # unit provided: take the first pipeline step that defines this unit
        s = None
        for p in tasktable.get("pipelines", []):
            if not isinstance(p, dict):
                continue
            for st in p.get("steps") or []:
                if isinstance(st, dict) and st.get("unit_name") in unit_names:
                    s = st
                    break
            if s:
                break

        if not s:
            raise TaskTableError(f'No pipeline step found for unit "{unit_names[0]}" to derive origins.')

        uname = s.get("unit_name")
        origin_map_by_unit[uname] = {
            "in": s.get("input_products", {}) or {},
            "out": s.get("output_products", {}) or {},
        }

    # Build output units
    out_units: list[dict[str, Any]] = []
    for uname in unit_names:
        udef = units_index.get(uname)
        if not udef:
            raise TaskTableError(f'Unit "{uname}" not found in "units".')

        module = udef.get("module")
        if not isinstance(module, str) or not module:
            raise TaskTableError(f'Unit "{uname}" is missing a valid "module" string.')

        input_products = _build_entries(
            udef.get("input_products", []),
            io_index,
            processing_mode,
            with_origin=True,
            unit_name=uname,
            origin_kind="in",
            origin_map_by_unit=origin_map_by_unit,
            start_datetime=start_datetime,
            end_datetime=end_datetime,
        )
        input_adfs = _build_entries(
            udef.get("input_adfs", []),
            io_index,
            processing_mode,
            with_origin=False,
            unit_name=uname,
            origin_kind="in",
            origin_map_by_unit=origin_map_by_unit,
            start_datetime=start_datetime,
            end_datetime=end_datetime,
        )
        output_products = _build_entries(
            udef.get("output_products", []),
            io_index,
            processing_mode,
            with_origin=True,
            unit_name=uname,
            origin_kind="out",
            origin_map_by_unit=origin_map_by_unit,
            start_datetime=start_datetime,
            end_datetime=end_datetime,
        )

        out_units.append(
            {
                "name": uname,
                "module": module,
                "input_products": input_products,
                "input_adfs": input_adfs,
                "output_products": output_products,
            },
        )

    return {"units": out_units}