Skip to content

rs_workflows/benchmarking/benchmarking_flow.md

<< Back to index

Benchmarking flows.

benchmark_processor(env, processor_name, processor_version, scenario_name) async

https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-1099 This flow reads one Prefect variable named "benchmarking--settings" associated to the input parameter "processor name". This variable provides a mocked structured information to define the benchmarking context.

This flow will
  • Execute a mock task using the provided input parameters: processor name, version, and scenario.
  • Generate a fake Markdown artifact named "benchmarking-result".
This artifact contains
  • The input parameters passed to the benchmark-processor flow.
  • The settings fetched from the Prefect variable.
Source code in docs/rs-client-libraries/rs_workflows/benchmarking/benchmarking_flow.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@flow(name="benchmark-processor")
async def benchmark_processor(
    env: FlowEnvArgs,
    processor_name: str,
    processor_version: str,
    scenario_name: str,
):
    """
    https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-1099
    This flow reads one Prefect variable named "benchmarking-<processor_name>-settings" associated to the input
    parameter "processor name".
    This variable provides a mocked structured information to define the benchmarking context.

    This flow will:
      - Execute a mock task using the provided input parameters: processor name, version, and scenario.
      - Generate a fake Markdown artifact named "benchmarking-result".

    This artifact contains:
      - The input parameters passed to the benchmark-processor flow.
      - The settings fetched from the Prefect variable.
    """
    # Call the task of the same name
    return benchmark_processor_task.submit(env, processor_name, processor_version, scenario_name).result()

benchmark_processor_task(env, processor_name, processor_version, scenario_name) async

Task called by the flow of the same name.

Source code in docs/rs-client-libraries/rs_workflows/benchmarking/benchmarking_flow.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@task(name="Benchmark Processor")
async def benchmark_processor_task(
    env: FlowEnvArgs,
    processor_name: str,
    processor_version: str,
    scenario_name: str,
):
    """Task called by the flow of the same name."""
    logger = get_run_logger()
    logger.setLevel(logging.DEBUG)

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "benchmark-processor"):

        # Read the prefect variable value
        all_value = await Variable.get(f"benchmarking-{processor_name}-settings")

        # Read contents for the given scenario and processor version
        this_value = all_value["scenarios"][scenario_name]["processor_versions"][processor_version]

        # Markdown report
        parameters = {
            "owner_id": env.owner_id,
            "processor_name": processor_name,
            "processor_version": processor_version,
            "scenario_name": scenario_name,
        }
        report = f"""
# Benchmarking processor report

### Prefect flow run
{f"{os.environ['RSPY_PREFECT_URL']}/runs/flow-run/{flow_run.id}"}

### Called by
{env.called_by}

### Parameters
```json
{json.dumps(parameters, indent=2)}
```

### Settings
```json
{json.dumps(this_value, indent=2)}
```
"""
        # Save as a markdown artifact
        artifact_key_name: str = "benchmarking-result"
        await acreate_markdown_artifact(
            key=artifact_key_name,
            markdown=report,
            description="Benchmarking processor report",
        )
        logger.info(f"📌 Artifact named '{artifact_key_name}' has been linked to this flow.")

        # Sleep a few seconds to simulate a processor runtime
        await asyncio.sleep(10)