Skip to content

rs_dpr_service/utils/settings.md

<< Back to index

Store diverse objects and values used throughout the application.

ExperimentalConfig

Bases: BaseModel

Experimental configuration, used only for testing.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/settings.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class ExperimentalConfig(BaseModel):
    """Experimental configuration, used only for testing."""

    class LocalCluster(BaseModel):
        """
        Overwrite the payload file to use a Dask LocalCluster configuration instead of a Dask Gateway.

        If disabled (by default), we use the Dask Gateway cluster and workers that have been initialized by RSPY.

        If enabled in cluster mode, a nested Dask LocalCluster is initialized by EOPF inside the RSPY Dask Gateway.
        The Dask Gateway should be set with a single worker, else we face unexpected behaviour. The EOPF LocalCluster
        will run inside this single worker.

        If enabled in local mode, the RSPY Dask Gateway is not used. We use only the EOPF LocalCluster. Your local mode
        should use the docker image ghcr.io/rs-python/dask-gateway-server/eopf/localcluster and not
        ghcr.io/rs-python/rs-dpr-service because it contains both the rs-dpr-service and the processor source code
        and dependencies. We use this mode to be able to debug and put breakpoints in the EOPF and processor source
        code.
        """

        enabled: bool = False  # Use False to disable

        #
        # Dask LocalCluster configuration, see: https://distributed.dask.org/en/latest/api.html#distributed.LocalCluster

        # Number of workers (=processes) to start. Default is CPU_COUNT.
        n_workers: int | None = None

        # Sets the memory limit *per worker (=process)*
        memory_limit: str | float | int | None = "auto"

        # Number of threads per each worker (=process).
        # Should always be 1 because the processors are not thread-safe.
        threads_per_worker: int = 1

    class LocalFiles(BaseModel):
        """
        Overwrite the payload file to read/write on the local disk rather than on the S3 bucket.
        Only works with a LocalCluster.
        """

        # Local directory
        local_dir: str | None = None  # Use None to disable

        # Download input files again from the S3 bucket if they are already present on the local directory ?
        overwrite_input: bool = False

        # Upload output files to the S3 bucket ?
        upload_output: bool = False

    local_cluster: LocalCluster = LocalCluster()
    local_files: LocalFiles = LocalFiles()

LocalCluster

Bases: BaseModel

Overwrite the payload file to use a Dask LocalCluster configuration instead of a Dask Gateway.

If disabled (by default), we use the Dask Gateway cluster and workers that have been initialized by RSPY.

If enabled in cluster mode, a nested Dask LocalCluster is initialized by EOPF inside the RSPY Dask Gateway. The Dask Gateway should be set with a single worker, else we face unexpected behaviour. The EOPF LocalCluster will run inside this single worker.

If enabled in local mode, the RSPY Dask Gateway is not used. We use only the EOPF LocalCluster. Your local mode should use the docker image ghcr.io/rs-python/dask-gateway-server/eopf/localcluster and not ghcr.io/rs-python/rs-dpr-service because it contains both the rs-dpr-service and the processor source code and dependencies. We use this mode to be able to debug and put breakpoints in the EOPF and processor source code.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/settings.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class LocalCluster(BaseModel):
    """
    Overwrite the payload file to use a Dask LocalCluster configuration instead of a Dask Gateway.

    If disabled (by default), we use the Dask Gateway cluster and workers that have been initialized by RSPY.

    If enabled in cluster mode, a nested Dask LocalCluster is initialized by EOPF inside the RSPY Dask Gateway.
    The Dask Gateway should be set with a single worker, else we face unexpected behaviour. The EOPF LocalCluster
    will run inside this single worker.

    If enabled in local mode, the RSPY Dask Gateway is not used. We use only the EOPF LocalCluster. Your local mode
    should use the docker image ghcr.io/rs-python/dask-gateway-server/eopf/localcluster and not
    ghcr.io/rs-python/rs-dpr-service because it contains both the rs-dpr-service and the processor source code
    and dependencies. We use this mode to be able to debug and put breakpoints in the EOPF and processor source
    code.
    """

    enabled: bool = False  # Use False to disable

    #
    # Dask LocalCluster configuration, see: https://distributed.dask.org/en/latest/api.html#distributed.LocalCluster

    # Number of workers (=processes) to start. Default is CPU_COUNT.
    n_workers: int | None = None

    # Sets the memory limit *per worker (=process)*
    memory_limit: str | float | int | None = "auto"

    # Number of threads per each worker (=process).
    # Should always be 1 because the processors are not thread-safe.
    threads_per_worker: int = 1

LocalFiles

Bases: BaseModel

Overwrite the payload file to read/write on the local disk rather than on the S3 bucket. Only works with a LocalCluster.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/settings.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class LocalFiles(BaseModel):
    """
    Overwrite the payload file to read/write on the local disk rather than on the S3 bucket.
    Only works with a LocalCluster.
    """

    # Local directory
    local_dir: str | None = None  # Use None to disable

    # Download input files again from the S3 bucket if they are already present on the local directory ?
    overwrite_input: bool = False

    # Upload output files to the S3 bucket ?
    upload_output: bool = False

env_bool(var, default)

Return True if an environemnt variable is set to 1, true or yes (case insensitive). Return False if set to 0, false or no (case insensitive). Return the default value if not set or set to a different value.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/settings.py
81
82
83
84
85
86
87
88
89
90
91
92
def env_bool(var: str, default: bool) -> bool:
    """
    Return True if an environemnt variable is set to 1, true or yes (case insensitive).
    Return False if set to 0, false or no (case insensitive).
    Return the default value if not set or set to a different value.
    """
    val = os.getenv(var, str(default)).lower()
    if val in ("y", "yes", "t", "true", "on", "1"):
        return True
    if val in ("n", "no", "f", "false", "off", "0"):
        return False
    return default

set_dask_env(host_env)

Pass environment variables to the dask workers.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/settings.py
 95
 96
 97
 98
 99
100
101
102
103
104
def set_dask_env(host_env: dict):
    """Pass environment variables to the dask workers."""
    for name in ["S3_ACCESSKEY", "S3_SECRETKEY", "S3_ENDPOINT", "S3_REGION"]:
        os.environ[name] = host_env[name]

    # Some kind of workaround for boto3 to avoid checksum being added inside
    # the file contents uploaded to the s3 bucket e.g. x-amz-checksum-crc32:xxx
    # See: https://github.com/boto/boto3/issues/4435
    os.environ["AWS_REQUEST_CHECKSUM_CALCULATION"] = "when_required"
    os.environ["AWS_RESPONSE_CHECKSUM_VALIDATION"] = "when_required"