Skip to content

rs_workflows/storage_configuration.md

<< Back to index

Module for managing storage configuration.

This module defines the StorageConfig class, which is responsible for loading and parsing storage configuration from a JSON file. It establishes mappings for product-specific storage, default unit storage, and pipeline storage, and resolves storage credentials using provided secrets.

StorageConfig

A class to load and query the storage_configuration.json file.

Source code in docs/rs-client-libraries/rs_workflows/storage_configuration.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class StorageConfig:
    """
    A class to load and query the storage_configuration.json file.
    """

    def __init__(self, secrets: dict, config_path: str, logger=None):
        with open(config_path, encoding="utf-8") as f:
            self.data = json.load(f)

        # Build quick-lookups for faster access
        self._specific_storage = {item["product_name"]: item["storage"] for item in self.data["product"]["specific"]}

        self._default_unit_storage = {
            item["section"]: item["storage"] for item in self.data["product"]["default"]["unit"]
        }

        self._default_pipeline_storage = {
            item["section"]: item["storage"] for item in self.data["product"]["default"]["pipeline"]
        }
        self._store_params = []

        for conf in self.data["storage_configuration"]:
            if "name" in conf:
                if "storage_options" in conf:
                    try:
                        storage_options = StorageOptions(
                            name=conf["name"],
                            key=secrets[conf["storage_options"]["key"].strip("${}")],
                            secret=secrets[conf["storage_options"]["secret"].strip("${}")],
                            client_kwargs={
                                "endpoint_url": secrets[conf["storage_options"]["endpoint_url"].strip("${}")],
                                "region_name": secrets[conf["storage_options"]["region_name"].strip("${}")],
                            },
                        )
                        self._store_params.append(StoreParams(storage_options=storage_options))
                    except KeyError as ke:
                        logger.warning(
                            f"Secret value for key {ke} not found in the prefect "
                            f"block secrets. This section from template will not be usable.",
                        )
                        continue
                elif conf["name"] in ("shared_disk", "local_disk"):
                    self._store_params.append(
                        StoreParams(
                            storage_path=StoragePath(
                                name=conf["name"],
                                opening_mode=conf["opening_mode"],
                                relative_path=conf["relative_path"],
                            ),
                        ),
                    )

        self.default_adfs_storage = self.data["product"]["default"]["adfs"]["storage"]

    def get_storage_for_specific_product(self, product_name: str) -> str | None:
        """
        Return the storage name for a specific product.
        """
        return self._specific_storage.get(product_name, None)

    def get_storage_for_unit_section(self, section: str) -> str | None:
        """Get storage for a unit section (input_products, output_products, etc.)"""
        return self._default_unit_storage.get(section, None)

    def get_storage_for_pipeline_section(self, section: str) -> str | None:
        """Get storage for a pipeline section (pipeline_input, pipeline_output, other, etc.)"""
        return self._default_pipeline_storage.get(section, None)

    def get_store_params(self, storage_name: str) -> StoreParams | None:
        """
        Return the store parameters for a given storage name
        (e.g., 's3', 'shared_disk', etc.).
        """
        for store_param in self._store_params:
            if store_param.storage_options and store_param.storage_options.name == storage_name:
                return store_param
            if store_param.storage_path and store_param.storage_path.name == storage_name:
                return store_param
        return None

    def get_all_storage_names(self) -> list[StoreParams]:
        """Return a list of all defined storage names."""
        return self._store_params

get_all_storage_names()

Return a list of all defined storage names.

Source code in docs/rs-client-libraries/rs_workflows/storage_configuration.py
109
110
111
def get_all_storage_names(self) -> list[StoreParams]:
    """Return a list of all defined storage names."""
    return self._store_params

get_storage_for_pipeline_section(section)

Get storage for a pipeline section (pipeline_input, pipeline_output, other, etc.)

Source code in docs/rs-client-libraries/rs_workflows/storage_configuration.py
93
94
95
def get_storage_for_pipeline_section(self, section: str) -> str | None:
    """Get storage for a pipeline section (pipeline_input, pipeline_output, other, etc.)"""
    return self._default_pipeline_storage.get(section, None)

get_storage_for_specific_product(product_name)

Return the storage name for a specific product.

Source code in docs/rs-client-libraries/rs_workflows/storage_configuration.py
83
84
85
86
87
def get_storage_for_specific_product(self, product_name: str) -> str | None:
    """
    Return the storage name for a specific product.
    """
    return self._specific_storage.get(product_name, None)

get_storage_for_unit_section(section)

Get storage for a unit section (input_products, output_products, etc.)

Source code in docs/rs-client-libraries/rs_workflows/storage_configuration.py
89
90
91
def get_storage_for_unit_section(self, section: str) -> str | None:
    """Get storage for a unit section (input_products, output_products, etc.)"""
    return self._default_unit_storage.get(section, None)

get_store_params(storage_name)

Return the store parameters for a given storage name (e.g., 's3', 'shared_disk', etc.).

Source code in docs/rs-client-libraries/rs_workflows/storage_configuration.py
 97
 98
 99
100
101
102
103
104
105
106
107
def get_store_params(self, storage_name: str) -> StoreParams | None:
    """
    Return the store parameters for a given storage name
    (e.g., 's3', 'shared_disk', etc.).
    """
    for store_param in self._store_params:
        if store_param.storage_options and store_param.storage_options.name == storage_name:
            return store_param
        if store_param.storage_path and store_param.storage_path.name == storage_name:
            return store_param
    return None