Skip to content

rs_server_common/utils/init_opentelemetry.md

<< Back to index

OpenTelemetry utility

init_traces(app, service_name, logger=None)

Init instrumentation of OpenTelemetry traces.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
service_name str

service name

required
logger

non-default logger to user

None
Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def init_traces(app: fastapi.FastAPI, service_name: str, logger=None):
    """
    Init instrumentation of OpenTelemetry traces.

    Args:
        app (fastapi.FastAPI): FastAPI application
        service_name (str): service name
        logger: non-default logger to user
    """

    # See: https://github.com/softwarebloat/python-tracing-demo/tree/main

    logger = logger or default_logger

    # Don't call this line from pytest because it causes errors:
    # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
    if not FROM_PYTEST:
        tempo_endpoint = os.getenv("TEMPO_ENDPOINT")
        if not tempo_endpoint:
            logger.warning("'TEMPO_ENDPOINT' variable is missing, cannot initialize OpenTelemetry")
            return

        # TODO: to avoid errors in local mode:
        # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
        #
        # The below line does not work either but at least we have less error messages.
        # See: https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-221?focusedId=162092&
        # page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-162092
        #
        # Now we have a single line error, which is less worst:
        # Failed to export metrics to tempo:4317, error code: StatusCode.UNIMPLEMENTED
        os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = tempo_endpoint

    otel_resource = Resource(attributes={"service.name": service_name})
    otel_tracer = TracerProvider(resource=otel_resource)
    trace.set_tracer_provider(otel_tracer)

    if not FROM_PYTEST:
        otel_tracer.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=tempo_endpoint)))

    FastAPIInstrumentor.instrument_app(app, tracer_provider=otel_tracer)
    # logger.debug(f"OpenTelemetry instrumentation of 'fastapi.FastAPIInstrumentor'")

    # Instrument all the dependencies under opentelemetry.instrumentation.*
    # NOTE: we need 'poetry run opentelemetry-bootstrap -a install' to install these.

    package = opentelemetry.instrumentation
    prefix = package.__name__ + "."
    classes = set()

    # We need an empty PYTHONPATH if the env var is missing
    os.environ["PYTHONPATH"] = os.getenv("PYTHONPATH", "")

    # Recursively find all package modules
    for _, module_str, _ in pkgutil.walk_packages(path=package.__path__, prefix=prefix, onerror=None):

        # Don't instrument these modules, they have errors, maybe we should see why
        if module_str in [
            "opentelemetry.instrumentation.tortoiseorm",
            "opentelemetry.instrumentation.auto_instrumentation.sitecustomize",
        ]:
            continue

        # Import and find all module classes
        __import__(module_str)
        for _, _class in inspect.getmembers(sys.modules[module_str]):
            if (not inspect.isclass(_class)) or (_class in classes):
                continue

            # Save the class (classes are found several times when imported by other modules)
            classes.add(_class)

            # Don't instrument these classes, they have errors, maybe we should see why
            if _class in [AsyncioInstrumentor, AwsLambdaInstrumentor, BaseInstrumentor, HTTPXClientInstrumentor]:
                continue

            # If the "instrument" method exists, call it
            _instrument = getattr(_class, "instrument", None)
            if callable(_instrument):

                _class_instance = _class()
                if _class == RequestsInstrumentor and (trace_headers() or trace_body()):
                    _class_instance.instrument(
                        tracer_provider=otel_tracer,
                        request_hook=request_hook,
                        response_hook=response_hook,
                    )
                elif not _class_instance.is_instrumented_by_opentelemetry:
                    _class_instance.instrument(tracer_provider=otel_tracer)

parse_data(data)

Convert data to a string representation

Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def parse_data(data) -> str:
    """Convert data to a string representation"""

    if not data:
        return ""

    # Try to decode bytes
    if isinstance(data, bytes):
        data = data.decode("utf-8")

    # Try to convert to a dict
    try:
        data = dict(data)
    except Exception:  # pylint: disable=broad-exception-caught # nosec
        pass

    # Or to parse to a dict
    try:
        data = json.loads(data)
    except Exception:  # pylint: disable=broad-exception-caught # nosec
        pass

    # If we have a dict, try to format it as json
    if isinstance(data, dict):
        data = json.dumps(data, indent=2)

    return data or ""

request_hook(span, request)

HTTP requests intrumentation

Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def request_hook(span, request):
    """
    HTTP requests intrumentation
    """
    if not span:
        return

    # Copy the http.url attribute into _url so it appears at the
    # top in the grafana UI, it's more readable
    span.set_attribute("_url", span.attributes.get("http.url"))

    if trace_headers():
        span.set_attribute("http.request.headers", parse_data(request.headers))

    if trace_body():
        span.set_attribute("http.request.body", parse_data(request.body))

response_hook(span, request, response)

HTTP responses intrumentation

Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
107
108
109
110
111
112
113
114
115
116
117
118
def response_hook(span, request, response):  # pylint: disable=W0613
    """
    HTTP responses intrumentation
    """
    if not span:
        return

    if trace_headers():
        span.set_attribute("http.response.headers", parse_data(response.headers))

    if trace_body():
        span.set_attribute("http.response.content", parse_data(response.content))

start_span(instrumenting_module_name, name, span_context=None)

Context manager for creating a new main or child OpenTelemetry span and set it as the current span in this tracer's context.

Parameters:

Name Type Description Default
instrumenting_module_name str

Caller module name, just pass name

required
name str

The name of the span to be created (use a custom name)

required
span_context SpanContext | None

Parent span context. Only to create a child span.

None

Yields:

Type Description
Span

The newly-created span.

Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@_agnosticcontextmanager
def start_span(
    instrumenting_module_name: str,
    name: str,
    span_context: SpanContext | None = None,
) -> Iterator[Span]:
    """
    Context manager for creating a new main or child OpenTelemetry span and set it
    as the current span in this tracer's context.

    Args:
        instrumenting_module_name: Caller module name, just pass __name__
        name: The name of the span to be created (use a custom name)
        span_context: Parent span context. Only to create a child span.

    Yields:
        The newly-created span.
    """
    tracer = trace.get_tracer(instrumenting_module_name)

    # Create a main span
    if not span_context:
        with tracer.start_as_current_span(name) as span:
            yield span

    # Create a child span
    else:
        main_span_context = SpanContext(
            trace_id=span_context.trace_id,
            span_id=span_context.span_id,
            is_remote=True,
            trace_flags=TraceFlags(TraceFlags.SAMPLED),
        )
        main_span = NonRecordingSpan(main_span_context)
        with trace.use_span(main_span):  # pylint: disable=not-context-manager
            # Optionnaly, we could use the main span instead of creating
            # a new one, to be discussed.
            with tracer.start_as_current_span(name) as span:
                yield span

trace_body()

Trace request bodies and response contents ?

Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
55
56
57
def trace_body():
    """Trace request bodies and response contents ?"""
    return env_bool("OTEL_PYTHON_REQUESTS_TRACE_BODY", default=False)

trace_headers()

Trace request headers ?

Source code in docs/rs-server/services/common/rs_server_common/utils/init_opentelemetry.py
50
51
52
def trace_headers():
    """Trace request headers ?"""
    return env_bool("OTEL_PYTHON_REQUESTS_TRACE_HEADERS", default=False)