Skip to content

rs_server_catalog/data_lifecycle.md

<< Back to index

Data lifecycle management

DataLifecycle

Initialize the data lifecycle management (cleaning of old assets). Will run a periodic task to:

  • Retrieve all expired items (expired field <= current_date() and unpublished field not set).

  • For each asset of these items: remove the the associated file from the S3 bucket, remove the asset from the item.

  • Set the unpublished and updated fields of the STAC item to current date using PATCH item catalog endpoint.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
client_search CoreCrudClient

CoreCrudClient instance for searching items

required
client_bulk

BulkTransactionsClient instance for bulk update

required
periodic_task

Periodic task

required
period

Period in seconds between two tasks. If <0, the task is deactivated.

required
cancel

Cancel the task

required
fake_request

Fake HTTP request

required
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_lifecycle.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class DataLifecycle:
    """
    Initialize the data lifecycle management (cleaning of old assets). Will run a periodic task to:

    - Retrieve all expired items (expired field <= current_date() and unpublished field not set).

    - For each asset of these items: remove the the associated file from the S3 bucket,
        remove the asset from the item.

    - Set the unpublished and updated fields of the STAC item to current date using PATCH item catalog endpoint.

    Args:
        app: FastAPI application
        client_search: CoreCrudClient instance for searching items
        client_bulk: BulkTransactionsClient instance for bulk update
        periodic_task: Periodic task
        period: Period in seconds between two tasks. If <0, the task is deactivated.
        cancel: Cancel the task
        fake_request: Fake HTTP request
    """

    def __init__(self, app: FastAPI, client_search: CoreCrudClient):
        """Constructor"""
        self.logger = Logging.default(__name__)
        self.app: FastAPI = app
        self.client_search: CoreCrudClient = client_search
        self.client_bulk = BulkTransactionsClient()
        self.periodic_task: Task | None = None
        self.period: float = float(os.getenv("RSPY_DATA_LIFECYCLE_PERIOD") or -1)
        self.cancel_flag: bool = False
        self.fake_request = self.get_fake_request()

    def get_fake_request(self, extra_scope: dict | None = None) -> Request:
        """
        Return a fake request instance to work with the database.

        Args:
            extra_scope: Extra scope values
        """
        scope = {
            "app": self.app,
            "type": "http",
            "method": "GET",
            "path": "dummy-path",
            "headers": {},
        } | (extra_scope or {})
        request = Request(scope=scope)
        request._base_url = URL("https://dummy-url")  # pylint: disable=protected-access
        return request

    async def cancel(self):
        """Cancel the periodic task"""
        self.cancel_flag = True
        if not self.periodic_task:
            return

        # See: https://superfastpython.com/asyncio-periodic-task/#How_to_Run_a_Periodic_Task
        self.periodic_task.cancel()
        try:
            await self.periodic_task
        except asyncio.exceptions.CancelledError:  # NOSONAR - see https://github.com/python/cpython/issues/103486
            pass

    def run(self):
        """Trigger the periodic task in a distinct thread and exit."""
        if (self.period >= 0) and (not self.cancel_flag):
            self.periodic_task = asyncio.create_task(self._periodic_loop())

    async def _periodic_loop(self):
        """Run the periodic task in an infinite loop."""
        # Infinite loop
        while not self.cancel_flag:
            start_time = time.time()
            try:
                # Run the task
                with init_opentelemetry.start_span(__name__, "data_lifecycle"):
                    await self.periodic_once()

            # Log any error
            except Exception:  # pylint: disable=broad-exception-caught
                self.logger.error(traceback.format_exc())

            # If the caller cancelled execution, we exit the infinite loop before the sleep.
            if self.cancel_flag:
                return

            # Measure execution time of the task in seconds
            runtime = time.time() - start_time

            # We remove this execution time to the period in seconds between two tasks,
            # so the tasks run at fixed intervals.
            # If the current task took more time than the period, then a task was skipped, we don't run it.
            runtime = runtime % self.period
            sleep_value = self.period - runtime

            # Wait n seconds before next run
            if sleep_value != math.inf:
                self.logger.debug(f"Wait {str(timedelta(seconds=round(sleep_value)))} before next cleaning")
            await asyncio.sleep(sleep_value)

    async def periodic_once(self, genuine_request: Request | None = None):
        """
        Run the periodic task once.

        Args:
            genuine_request: request coming from the http endpoint. Only in local mode and from the pytests.
        """
        # Current datetime
        now: str = datetime.now().strftime(ISO_8601_FORMAT)

        # Filter on expired items that have not already been unpublished
        _filter = {
            "op": "and",
            "args": [
                {"op": "<", "args": [{"property": "expires"}, now]},
                {"op": "isNull", "args": [{"property": "unpublished"}]},
            ],
        }

        # Search the database. We call directly the stac_fastapi layer, not the rs-server-catalog
        # http endpoint, so we don't handle the /catalog prefix, the owner_id, the authentication, ...
        item_collection: ItemCollection = await self.client_search.get_search(
            genuine_request or self.fake_request,
            filter_expr=json.dumps(_filter),
            filter_lang="cql2-json",
            limit=ITEM_LIMIT,
        )
        items: list[Item] = item_collection.get("features", [])

        if items:
            self.logger.debug(f"Clean {len(items)} items")
        else:
            self.logger.debug("No items to clean")
            return

        # Order assets by key=bucket name and value=list of bucket keys
        bucket_info: dict[str, list[str]] = defaultdict(list)

        # Update each item locally and update bucket info
        for item in items:
            self._update_local_item(item, now, bucket_info)

        # Order the items by collection_name
        items_by_collection: dict[str, list[Item]] = defaultdict(list)
        for item in items:
            items_by_collection[item["collection"]].append(item)

        # First, update the items in the stac database using a bulk transaction.
        # We need one transaction by collection name, run in parallel.
        async with asyncio.TaskGroup():
            for col_name, col_items in items_by_collection.items():

                # Convert the items into a dict with key=item id and value=items
                bulk_items = bulk_transactions.Items(
                    items={item["id"]: item for item in col_items},
                    method=bulk_transactions.BulkTransactionMethod.UPSERT,
                )

                # The collection name goes into the fake request endpoint path
                extra_scope = {"path_params": {"collection_id": col_name}}
                if genuine_request:
                    bulk_request = copy.copy(genuine_request)
                    bulk_request.scope = copy.copy(genuine_request.scope)
                    bulk_request.scope.update(extra_scope)
                else:
                    bulk_request = self.get_fake_request(extra_scope)

                # Run the bulk transaction.
                # NOTE: we call directly the stac_fastapi layer, not the rs-server-catalog http endpoint
                self.logger.debug(await self.client_bulk.bulk_item_insert(bulk_items, bulk_request))

        # Then, delete all files from the buckets in parallel.
        # NOTE: if ever this fails, a secondary data lifecycle is set on OVH Object Storage side to clean up
        # automatically the files on the buckets.
        # This is done 24 hours after the expiration delay set on the config map.
        bucket_files = []

        for bucket_name, bucket_keys in bucket_info.items():
            bucket_files.extend([f"s3://{bucket_name}/{key}" for key in bucket_keys])

        await S3StorageHandler().adelete_keys_from_s3(bucket_files)
        self.logger.debug("Finished deleting s3 keys")

    def _update_local_item(self, item: Item, now: str, bucket_info: dict[str, list[str]]):
        """
        Update a single item instance locally and update bucket info.

        Args:
            item: Item to clean
            now: current datetime
            bucket_info: bucket information to be updated
        """
        # Set the updated and unpublished properties to current datetime
        item.setdefault("properties", {})["updated"] = now
        item.setdefault("properties", {})["unpublished"] = now

        # Remove all the assets from the item
        assets = item.get("assets", {})
        item["assets"] = {}

        # Remove the links. We don't need to save them in stac.
        # They are automatically generated at runtime with GET requests.
        item["links"] = []

        # Update bucket info for each existing asset file path
        for asset in assets.values():
            try:
                href = asset["href"]
                parsed = urlparse(href)
                bucket_name = parsed.netloc
                bucket_key = parsed.path.strip("/")
                if (parsed.scheme.lower() != "s3") or (not bucket_name) or (not bucket_key):
                    raise KeyError()
                bucket_info[bucket_name].append(bucket_key)

            except KeyError:
                self.logger.debug(f"Asset has no valid href: {asset}")

__init__(app, client_search)

Constructor

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_lifecycle.py
65
66
67
68
69
70
71
72
73
74
def __init__(self, app: FastAPI, client_search: CoreCrudClient):
    """Constructor"""
    self.logger = Logging.default(__name__)
    self.app: FastAPI = app
    self.client_search: CoreCrudClient = client_search
    self.client_bulk = BulkTransactionsClient()
    self.periodic_task: Task | None = None
    self.period: float = float(os.getenv("RSPY_DATA_LIFECYCLE_PERIOD") or -1)
    self.cancel_flag: bool = False
    self.fake_request = self.get_fake_request()

cancel() async

Cancel the periodic task

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_lifecycle.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def cancel(self):
    """Cancel the periodic task"""
    self.cancel_flag = True
    if not self.periodic_task:
        return

    # See: https://superfastpython.com/asyncio-periodic-task/#How_to_Run_a_Periodic_Task
    self.periodic_task.cancel()
    try:
        await self.periodic_task
    except asyncio.exceptions.CancelledError:  # NOSONAR - see https://github.com/python/cpython/issues/103486
        pass

get_fake_request(extra_scope=None)

Return a fake request instance to work with the database.

Parameters:

Name Type Description Default
extra_scope dict | None

Extra scope values

None
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_lifecycle.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_fake_request(self, extra_scope: dict | None = None) -> Request:
    """
    Return a fake request instance to work with the database.

    Args:
        extra_scope: Extra scope values
    """
    scope = {
        "app": self.app,
        "type": "http",
        "method": "GET",
        "path": "dummy-path",
        "headers": {},
    } | (extra_scope or {})
    request = Request(scope=scope)
    request._base_url = URL("https://dummy-url")  # pylint: disable=protected-access
    return request

periodic_once(genuine_request=None) async

Run the periodic task once.

Parameters:

Name Type Description Default
genuine_request Request | None

request coming from the http endpoint. Only in local mode and from the pytests.

None
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_lifecycle.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
async def periodic_once(self, genuine_request: Request | None = None):
    """
    Run the periodic task once.

    Args:
        genuine_request: request coming from the http endpoint. Only in local mode and from the pytests.
    """
    # Current datetime
    now: str = datetime.now().strftime(ISO_8601_FORMAT)

    # Filter on expired items that have not already been unpublished
    _filter = {
        "op": "and",
        "args": [
            {"op": "<", "args": [{"property": "expires"}, now]},
            {"op": "isNull", "args": [{"property": "unpublished"}]},
        ],
    }

    # Search the database. We call directly the stac_fastapi layer, not the rs-server-catalog
    # http endpoint, so we don't handle the /catalog prefix, the owner_id, the authentication, ...
    item_collection: ItemCollection = await self.client_search.get_search(
        genuine_request or self.fake_request,
        filter_expr=json.dumps(_filter),
        filter_lang="cql2-json",
        limit=ITEM_LIMIT,
    )
    items: list[Item] = item_collection.get("features", [])

    if items:
        self.logger.debug(f"Clean {len(items)} items")
    else:
        self.logger.debug("No items to clean")
        return

    # Order assets by key=bucket name and value=list of bucket keys
    bucket_info: dict[str, list[str]] = defaultdict(list)

    # Update each item locally and update bucket info
    for item in items:
        self._update_local_item(item, now, bucket_info)

    # Order the items by collection_name
    items_by_collection: dict[str, list[Item]] = defaultdict(list)
    for item in items:
        items_by_collection[item["collection"]].append(item)

    # First, update the items in the stac database using a bulk transaction.
    # We need one transaction by collection name, run in parallel.
    async with asyncio.TaskGroup():
        for col_name, col_items in items_by_collection.items():

            # Convert the items into a dict with key=item id and value=items
            bulk_items = bulk_transactions.Items(
                items={item["id"]: item for item in col_items},
                method=bulk_transactions.BulkTransactionMethod.UPSERT,
            )

            # The collection name goes into the fake request endpoint path
            extra_scope = {"path_params": {"collection_id": col_name}}
            if genuine_request:
                bulk_request = copy.copy(genuine_request)
                bulk_request.scope = copy.copy(genuine_request.scope)
                bulk_request.scope.update(extra_scope)
            else:
                bulk_request = self.get_fake_request(extra_scope)

            # Run the bulk transaction.
            # NOTE: we call directly the stac_fastapi layer, not the rs-server-catalog http endpoint
            self.logger.debug(await self.client_bulk.bulk_item_insert(bulk_items, bulk_request))

    # Then, delete all files from the buckets in parallel.
    # NOTE: if ever this fails, a secondary data lifecycle is set on OVH Object Storage side to clean up
    # automatically the files on the buckets.
    # This is done 24 hours after the expiration delay set on the config map.
    bucket_files = []

    for bucket_name, bucket_keys in bucket_info.items():
        bucket_files.extend([f"s3://{bucket_name}/{key}" for key in bucket_keys])

    await S3StorageHandler().adelete_keys_from_s3(bucket_files)
    self.logger.debug("Finished deleting s3 keys")

run()

Trigger the periodic task in a distinct thread and exit.

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_lifecycle.py
107
108
109
110
def run(self):
    """Trigger the periodic task in a distinct thread and exit."""
    if (self.period >= 0) and (not self.cancel_flag):
        self.periodic_task = asyncio.create_task(self._periodic_loop())