Skip to content

rs_dpr_service/utils/init_opentelemetry.md

<< Back to index

OpenTelemetry utility

botocore_hook(span, _service_name, _operation_name, api_params)

Callback function invoked by BotocoreInstrumentor and AiobotocoreInstrumentor

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
158
159
160
161
162
163
164
def botocore_hook(span, _service_name, _operation_name, api_params: dict):
    """Callback function invoked by BotocoreInstrumentor and AiobotocoreInstrumentor"""
    if not (span and span.is_recording()):
        return
    bucket = api_params.get("Bucket", "")
    key = api_params.get("Key", "")
    span.set_attribute("_path", f"s3://{bucket}/{key}")

decode(binary_value)

Try to decode binary value

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
56
57
58
59
60
61
62
63
def decode(binary_value):
    """Try to decode binary value"""
    try:
        if isinstance(binary_value, bytes):
            return binary_value.decode("utf-8")
    except UnicodeDecodeError:
        pass
    return binary_value

fastapi_hook(span, scope, message=None)

Callback function invoked by FastAPIInstrumentor. It implements the hooks:

  • server_request_hook: called with the server span and ASGI scope object for every incoming request.
  • client_request_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries for when the method receive is called.
  • client_response_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries for when the method send is called.

See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def fastapi_hook(span: Span, scope: dict[str, Any], message=None):
    """
    Callback function invoked by FastAPIInstrumentor. It implements the hooks:

      - server_request_hook: called with the server span and ASGI scope object for every incoming request.
      - client_request_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries
                             for when the method receive is called.
      - client_response_hook: called with the internal span, and ASGI scope and event which are sent as dictionaries
                              for when the method send is called.

    See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html
    """
    if not (span and span.is_recording()):
        return

    # Copy this attribute by adding a '_' prefix to the name,
    # so it appears at the top in the grafana UI, it's more readable
    span.set_attribute("_path", str(scope.get("path")))

    if trace_requests_headers():
        span.set_attribute("http.scope.headers", parse_data(scope.get("headers")))
        if message:
            span.set_attribute("http.message.headers", parse_data(message.get("headers")))

    if trace_requests_body() and message:
        span.set_attribute("http.message.body", parse_data(message.get("body")))

init_traces(app, service_name)

Init instrumentation of OpenTelemetry traces.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
service_name str

service name

required
Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def init_traces(app: fastapi.FastAPI | None, service_name: str):
    """
    Init instrumentation of OpenTelemetry traces.

    Args:
        app (fastapi.FastAPI): FastAPI application
        service_name (str): service name
    """
    with lock:
        global INITIALIZED  # pylint: disable=global-statement
        if INITIALIZED:
            return
        INITIALIZED = True

    # Set the opentelemetry service name
    os.environ["OTEL_SERVICE_NAME"] = service_name

    # Send openelemetry signals to tempo
    if not (tempo_endpoint := os.getenv("TEMPO_ENDPOINT")):
        return
    os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = tempo_endpoint

    # We'll use custom instrumentation for these packages (separated by ,)
    org_disabled = os.getenv("OTEL_PYTHON_DISABLED_INSTRUMENTATIONS", "")
    os.environ["OTEL_PYTHON_DISABLED_INSTRUMENTATIONS"] = f"{org_disabled},aiobotocore,botocore,fastapi,requests"

    # Run the opentelemetry auto instrumentation on all packages under opentelemetry.instrumentation.*
    # This is what the command line "opentelemetry-instrumentation" would do.
    # NOTE: we need 'poetry run opentelemetry-bootstrap -a install' to install these packages.
    try:
        auto_instrumentation.initialize()
    finally:
        os.environ["OTEL_PYTHON_DISABLED_INSTRUMENTATIONS"] = org_disabled

    #
    # Specific opentelemetry instrumentation with custom hooks
    #

    if app:
        FastAPIInstrumentor.instrument_app(
            app,
            server_request_hook=fastapi_hook,
            client_request_hook=fastapi_hook,
            client_response_hook=fastapi_hook,
        )

    AiobotocoreInstrumentor().instrument(request_hook=botocore_hook)
    BotocoreInstrumentor().instrument(request_hook=botocore_hook)
    RequestsInstrumentor().instrument(request_hook=requests_hook, response_hook=requests_hook)

parse_data(data)

Convert data to a string representation

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def parse_data(data) -> str:
    """Convert data to a string representation"""

    if not data:
        return ""

    # Try to decode bytes
    if isinstance(data, bytes):
        data = data.decode("utf-8")

    # Try to convert to a dict
    try:
        data = dict(data)
    except Exception:  # pylint: disable=broad-exception-caught # nosec
        pass

    # Or to parse to a dict
    try:
        data = json.loads(data)
    except Exception:  # pylint: disable=broad-exception-caught # nosec
        pass

    # If we have a dict
    if isinstance(data, dict):

        # Decode bytes
        data = {decode(key): decode(value) for key, value in data.items()}

        # Convert to strings
        data = {str(key): str(value) for key, value in data.items()}

        # Apply json formatting
        data = json.dumps(data, indent=2)

    return data or ""

record_error(span, e)

Record an exception in the span and mark it as ERROR if recording.

Parameters:

Name Type Description Default
span Span

The tracing span.

required
e Exception

The exception to record.

required
Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
259
260
261
262
263
264
265
266
267
268
269
def record_error(span: Span, e: Exception):
    """
    Record an exception in the span and mark it as ERROR if recording.

    Args:
        span (Span): The tracing span.
        e (Exception): The exception to record.
    """
    if span.is_recording() and e is not None:
        span.record_exception(e)
        span.set_status(Status(StatusCode.ERROR, str(e)))

requests_hook(span, request, response=None)

Callback function invoked by RequestsInstrumentor. It implements the hooks:

  • request_hook: invoked right after a span is created.
  • response_hook: invoked right before the span has finished processing a response.

See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/requests/requests.html

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def requests_hook(span: Span, request: requests.PreparedRequest, response: requests.Response | None = None):
    """
    Callback function invoked by RequestsInstrumentor. It implements the hooks:

      - request_hook: invoked right after a span is created.
      - response_hook: invoked right before the span has finished processing a response.

    See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/requests/requests.html
    """
    if not (span and span.is_recording()):
        return

    # Copy this attribute by adding a '_' prefix to the name,
    # so it appears at the top in the grafana UI, it's more readable
    span.set_attribute("_url", span.attributes.get("http.url"))  # type: ignore

    if trace_requests_headers():
        span.set_attribute("http.request.headers", parse_data(request.headers))
        if response:
            span.set_attribute("http.response.headers", parse_data(response.headers))

    if trace_requests_body():
        span.set_attribute("http.request.body", parse_data(request.body))
        if response:
            span.set_attribute("http.response.content", parse_data(response.content))

start_span(instrumenting_module_name, name, span_context=None)

Context manager for creating a new main or child OpenTelemetry span and set it as the current span in this tracer's context.

Parameters:

Name Type Description Default
instrumenting_module_name str

Caller module name, just pass name

required
name str

The name of the span to be created (use a custom name)

required
span_context SpanContext | None

Parent span context. Only to create a child span.

None

Yields:

Type Description
Span

The newly-created span.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
@_agnosticcontextmanager
def start_span(
    instrumenting_module_name: str,
    name: str,
    span_context: SpanContext | None = None,
) -> Iterator[Span]:
    """
    Context manager for creating a new main or child OpenTelemetry span and set it
    as the current span in this tracer's context.

    Args:
        instrumenting_module_name: Caller module name, just pass __name__
        name: The name of the span to be created (use a custom name)
        span_context: Parent span context. Only to create a child span.

    Yields:
        The newly-created span.
    """
    tracer = trace.get_tracer(instrumenting_module_name)

    # Create a main span
    if not span_context:
        with tracer.start_as_current_span(name) as span:
            yield span

    # Create a child span
    else:
        main_span_context = SpanContext(
            trace_id=span_context.trace_id,
            span_id=span_context.span_id,
            is_remote=True,
            trace_flags=TraceFlags(TraceFlags.SAMPLED),
        )
        main_span = NonRecordingSpan(main_span_context)
        with trace.use_span(main_span):  # pylint: disable=not-context-manager
            # Optionnaly, we could use the main span instead of creating
            # a new one, to be discussed.
            with tracer.start_as_current_span(name) as span:
                yield span

trace_requests_body()

Trace request bodies and response contents ?

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
51
52
53
def trace_requests_body():
    """Trace request bodies and response contents ?"""
    return env_bool("OTEL_PYTHON_REQUESTS_TRACE_BODY", default=False)

trace_requests_headers()

Trace request headers ?

Source code in docs/rs-dpr-service/rs_dpr_service/utils/init_opentelemetry.py
46
47
48
def trace_requests_headers():
    """Trace request headers ?"""
    return env_bool("OTEL_PYTHON_REQUESTS_TRACE_HEADERS", default=False)