140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 | def build_unit_list(
tasktable: dict[str, Any],
pipeline: str | None = None,
unit: str | None = None,
processing_mode: Iterable[str] | None = None,
*,
start_datetime: datetime | None = None,
end_datetime: datetime | None = None,
) -> dict[str, Any]:
"""
STEP 1: Build the list of processing units from the Task Table.
"""
# Validate pipelines shape
if not isinstance(tasktable, dict):
raise TaskTableError("Task table root must be a JSON object (dict).")
if "pipelines" not in tasktable or not isinstance(tasktable["pipelines"], list):
raise TaskTableError('Missing or invalid "pipelines" list in task table.')
if "units" not in tasktable or not isinstance(tasktable["units"], list):
raise TaskTableError('Missing or invalid "units" list in task table.')
if "io" not in tasktable or not isinstance(tasktable["io"], list):
raise TaskTableError('Missing or invalid "io" list in task table.')
# Build indices for quick lookup
units_index: dict[str, dict[str, Any]] = {}
for u in tasktable["units"]:
if isinstance(u, dict) and isinstance(u.get("name"), str):
units_index[u["name"]] = u
if not units_index:
raise TaskTableError('No valid unit entries found in "units".')
io_index: dict[str, dict[str, Any]] = {}
for io in tasktable["io"]:
if isinstance(io, dict) and isinstance(io.get("name"), str):
io_index[io["name"]] = io
if pipeline and unit:
raise TaskTableError('Provide either "pipeline" or "unit", not both.')
if not pipeline and not unit:
raise TaskTableError('One of "pipeline" or "unit" must be provided.')
# Select unit names from pipeline or explicit unit
if unit:
unit_names = [unit]
if unit not in units_index:
units = list(units_index)
available = ", ".join(f"'{u}'" for u in units)
raise TaskTableError(f'Unit "{unit}" not found in "units". Available units: {available}.')
else:
unit_names = _select_unit_names(tasktable, pipeline=pipeline)
# pipeline provided: per-step origin maps for each unit in the selected pipeline.
origin_map_by_unit: dict[str, dict[str, dict[str, Any]]] = {}
if pipeline:
pl = next((p for p in tasktable.get("pipelines", []) if p.get("name") == pipeline), None)
steps = pl.get("steps", []) if pl else []
for s in steps:
if not isinstance(s, dict):
continue
uname = s.get("unit_name")
if not uname:
continue
origin_map_by_unit[uname] = {
"in": s.get("input_products", {}) or {},
"out": s.get("output_products", {}) or {},
}
else:
# unit provided: take the first pipeline step that defines this unit
s = None
for p in tasktable.get("pipelines", []):
if not isinstance(p, dict):
continue
for st in p.get("steps") or []:
if isinstance(st, dict) and st.get("unit_name") in unit_names:
s = st
break
if s:
break
if not s:
raise TaskTableError(f'No pipeline step found for unit "{unit_names[0]}" to derive origins.')
uname = s.get("unit_name")
origin_map_by_unit[uname] = {
"in": s.get("input_products", {}) or {},
"out": s.get("output_products", {}) or {},
}
# Build output units
out_units: list[dict[str, Any]] = []
for uname in unit_names:
udef = units_index.get(uname)
if not udef:
raise TaskTableError(f'Unit "{uname}" not found in "units".')
module = udef.get("module")
if not isinstance(module, str) or not module:
raise TaskTableError(f'Unit "{uname}" is missing a valid "module" string.')
input_products = _build_entries(
udef.get("input_products", []),
io_index,
processing_mode,
with_origin=True,
unit_name=uname,
origin_kind="in",
origin_map_by_unit=origin_map_by_unit,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
input_adfs = _build_entries(
udef.get("input_adfs", []),
io_index,
processing_mode,
with_origin=False,
unit_name=uname,
origin_kind="in",
origin_map_by_unit=origin_map_by_unit,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
output_products = _build_entries(
udef.get("output_products", []),
io_index,
processing_mode,
with_origin=True,
unit_name=uname,
origin_kind="out",
origin_map_by_unit=origin_map_by_unit,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
out_units.append(
{
"name": uname,
"module": module,
"input_products": input_products,
"input_adfs": input_adfs,
"output_products": output_products,
},
)
return {"units": out_units}
|