Skip to content

rs_dpr_service/utils/init_opentelemetry.md

<< Back to index

OpenTelemetry utility

fastapi_hook(span, scope, message=None)

Callback function invoked by FastAPIInstrumentor. It implements the hooks:

  • server_request_hook: called with the server span and ASGI scope object for every incoming request.
  • client_request_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries for when the method receive is called.
  • client_response_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries for when the method send is called.

See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def fastapi_hook(span: Span, scope: dict[str, Any], message=None):
    """
    Callback function invoked by FastAPIInstrumentor. It implements the hooks:

      - server_request_hook: called with the server span and ASGI scope object for every incoming request.
      - client_request_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries
                             for when the method receive is called.
      - client_response_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries
                              for when the method send is called.

    See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html
    """
    if not (span and span.is_recording()):
        return

    # Copy this attribute by adding a '_' prefix to the name,
    # so it appears at the top in the grafana UI, it's more readable
    span.set_attribute("_path", str(scope.get("path")))

    if trace_headers():
        span.set_attribute("http.scope.headers", parse_data(scope.get("headers")))
        if message:
            span.set_attribute("http.message.headers", parse_data(message.get("headers")))

    if trace_body() and message:
        span.set_attribute("http.message.body", parse_data(message.get("body")))

init_traces(app, service_name, logger=None)

Init instrumentation of OpenTelemetry traces.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
service_name str

service name

required
logger

non-default logger to user

None
Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def init_traces(app: fastapi.FastAPI | None, service_name: str, logger=None):  # pylint: disable=too-many-branches
    """
    Init instrumentation of OpenTelemetry traces.

    Args:
        app (fastapi.FastAPI): FastAPI application
        service_name (str): service name
        logger: non-default logger to user
    """

    # See: https://github.com/softwarebloat/python-tracing-demo/tree/main

    logger = logger or default_logger

    # Don't call this line from pytest because it causes errors:
    # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
    if not FROM_PYTEST:
        tempo_endpoint = os.getenv("TEMPO_ENDPOINT")
        if not tempo_endpoint:
            logger.warning("'TEMPO_ENDPOINT' variable is missing, cannot initialize OpenTelemetry")
            return

        # TODO: to avoid errors in local mode:
        # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
        #
        # The below line does not work either but at least we have less error messages.
        # See: https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-221?focusedId=162092&
        # page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-162092
        #
        # Now we have a single line error, which is less worst:
        # Failed to export metrics to tempo:4317, error code: StatusCode.UNIMPLEMENTED
        os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = tempo_endpoint

    otel_resource = Resource(attributes={"service.name": service_name})
    otel_tracer = TracerProvider(resource=otel_resource)
    trace.set_tracer_provider(otel_tracer)

    if not FROM_PYTEST:
        otel_tracer.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=tempo_endpoint)))

    if app:
        if trace_headers() or trace_body():
            FastAPIInstrumentor.instrument_app(
                app,
                tracer_provider=otel_tracer,
                server_request_hook=fastapi_hook,
                client_request_hook=fastapi_hook,
                client_response_hook=fastapi_hook,
            )
        else:
            FastAPIInstrumentor.instrument_app(app, tracer_provider=otel_tracer)
        # logger.debug(f"OpenTelemetry instrumentation of 'fastapi.FastAPIInstrumentor'")

    # Instrument all the dependencies under opentelemetry.instrumentation.*
    # NOTE: we need 'poetry run opentelemetry-bootstrap -a install' to install these.

    package = opentelemetry.instrumentation
    prefix = package.__name__ + "."
    classes = set()

    # We need an empty PYTHONPATH if the env var is missing
    os.environ["PYTHONPATH"] = os.getenv("PYTHONPATH", "")

    # Recursively find all package modules
    for _, module_str, _ in pkgutil.walk_packages(path=package.__path__, prefix=prefix, onerror=None):

        # Don't instrument these modules, they have errors, maybe we should see why
        if module_str in [
            "opentelemetry.instrumentation.tortoiseorm",
            "opentelemetry.instrumentation.auto_instrumentation.sitecustomize",
        ]:
            continue

        # Import and find all module classes
        __import__(module_str)
        for _, _class in inspect.getmembers(sys.modules[module_str]):
            if (not inspect.isclass(_class)) or (_class in classes):
                continue

            # Save the class (classes are found several times when imported by other modules)
            classes.add(_class)

            # Don't instrument these classes, they have errors, maybe we should see why
            if _class in [AsyncioInstrumentor, BaseInstrumentor, HTTPXClientInstrumentor]:
                continue

            # If the "instrument" method exists, call it
            _instrument = getattr(_class, "instrument", None)
            if callable(_instrument):
                _class_instance = _class()
                if _class_instance.is_instrumented_by_opentelemetry:
                    continue
                # name = f"{module_str}.{_class.__name__}".removeprefix(prefix)
                # logger.debug(f"OpenTelemetry instrumentation of {name!r}")

                # Handle specific hooks
                if _class == RequestsInstrumentor and (trace_headers() or trace_body()):
                    _class_instance.instrument(
                        tracer_provider=otel_tracer,
                        request_hook=requests_hook,
                        response_hook=requests_hook,
                    )
                elif _class == FastAPIInstrumentor and (trace_headers() or trace_body()):
                    _class_instance.instrument(
                        tracer_provider=otel_tracer,
                        server_request_hook=fastapi_hook,
                        client_request_hook=fastapi_hook,
                        client_response_hook=fastapi_hook,
                    )

                # General case (no hooks)
                else:
                    _class_instance.instrument(tracer_provider=otel_tracer)

parse_data(data)

Convert data to a string representation

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def parse_data(data) -> str:
    """Convert data to a string representation"""

    if not data:
        return ""

    # Try to decode bytes
    if isinstance(data, bytes):
        data = data.decode("utf-8")

    # Try to convert to a dict
    try:
        data = dict(data)
    except Exception:  # pylint: disable=broad-exception-caught # nosec
        pass

    # Or to parse to a dict
    try:
        data = json.loads(data)
    except Exception:  # pylint: disable=broad-exception-caught # nosec
        pass

    # If we have a dict
    if isinstance(data, dict):

        # Decode bytes
        data = {
            key.decode("utf-8") if isinstance(key, bytes) else key: (
                value.decode("utf-8") if isinstance(value, bytes) else value
            )
            for key, value in data.items()
        }

        # Convert to strings
        data = {str(key): str(value) for key, value in data.items()}

        # Apply json formatting
        data = json.dumps(data, indent=2)

    return data or ""

record_error(span, e)

Record an exception in the span and mark it as ERROR if recording.

Parameters:

Name Type Description Default
span Span

The tracing span.

required
e Exception

The exception to record.

required
Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
316
317
318
319
320
321
322
323
324
325
326
def record_error(span: Span, e: Exception):
    """
    Record an exception in the span and mark it as ERROR if recording.

    Args:
        span (Span): The tracing span.
        e (Exception): The exception to record.
    """
    if span.is_recording() and e is not None:
        span.record_exception(e)
        span.set_status(Status(StatusCode.ERROR, str(e)))

requests_hook(span, request, response=None)

Callback function invoked by RequestsInstrumentor. It implements the hooks:

  • request_hook: invoked right after a span is created.
  • response_hook: invoked right before the span has finished processing a response.

See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/requests/requests.html

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def requests_hook(span: Span, request: requests.PreparedRequest, response: requests.Response | None = None):
    """
    Callback function invoked by RequestsInstrumentor. It implements the hooks:

      - request_hook: invoked right after a span is created.
      - response_hook: invoked right before the span has finished processing a response.

    See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/requests/requests.html
    """
    if not (span and span.is_recording()):
        return

    # Copy this attribute by adding a '_' prefix to the name,
    # so it appears at the top in the grafana UI, it's more readable
    span.set_attribute("_url", span.attributes.get("http.url"))  # type: ignore

    if trace_headers():
        span.set_attribute("http.request.headers", parse_data(request.headers))
        if response:
            span.set_attribute("http.response.headers", parse_data(response.headers))

    if trace_body():
        span.set_attribute("http.request.body", parse_data(request.body))
        if response:
            span.set_attribute("http.response.content", parse_data(response.content))

start_span(instrumenting_module_name, name, span_context=None)

Context manager for creating a new main or child OpenTelemetry span and set it as the current span in this tracer's context.

Parameters:

Name Type Description Default
instrumenting_module_name str

Caller module name, just pass name

required
name str

The name of the span to be created (use a custom name)

required
span_context SpanContext | None

Parent span context. Only to create a child span.

None

Yields:

Type Description
Span

The newly-created span.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@_agnosticcontextmanager
def start_span(
    instrumenting_module_name: str,
    name: str,
    span_context: SpanContext | None = None,
) -> Iterator[Span]:
    """
    Context manager for creating a new main or child OpenTelemetry span and set it
    as the current span in this tracer's context.

    Args:
        instrumenting_module_name: Caller module name, just pass __name__
        name: The name of the span to be created (use a custom name)
        span_context: Parent span context. Only to create a child span.

    Yields:
        The newly-created span.
    """
    tracer = trace.get_tracer(instrumenting_module_name)

    # Create a main span
    if not span_context:
        with tracer.start_as_current_span(name) as span:
            yield span

    # Create a child span
    else:
        main_span_context = SpanContext(
            trace_id=span_context.trace_id,
            span_id=span_context.span_id,
            is_remote=True,
            trace_flags=TraceFlags(TraceFlags.SAMPLED),
        )
        main_span = NonRecordingSpan(main_span_context)
        with trace.use_span(main_span):  # pylint: disable=not-context-manager
            # Optionnaly, we could use the main span instead of creating
            # a new one, to be discussed.
            with tracer.start_as_current_span(name) as span:
                yield span

trace_body()

Trace request bodies and response contents ?

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
58
59
60
def trace_body():
    """Trace request bodies and response contents ?"""
    return env_bool("OTEL_PYTHON_REQUESTS_TRACE_BODY", default=False)

trace_headers()

Trace request headers ?

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
53
54
55
def trace_headers():
    """Trace request headers ?"""
    return env_bool("OTEL_PYTHON_REQUESTS_TRACE_HEADERS", default=False)