Add Tracer to the telemetry lib

This wraps the tracer and allows us to capture keyboard interrupts.
This is based on cros builds chromite_tracer:
https://crrev.com/ff59bf4edd7614a50f27e1cc067e0b6f0af62f1e/lib/telemetry/trace/chromite_tracer.py
except the failed_packages removed (and a custom span is subsequently
not needed)

Bug: 326277821
Change-Id: I0ac60797563c777ee00abac1e5924c45fb59df29
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/5896032
Commit-Queue: Struan Shrimpton <sshrimp@google.com>
Reviewed-by: Terrence Reilly <treilly@google.com>
changes/32/5896032/7
Struan Shrimpton 5 months ago committed by LUCI CQ
parent 2c16d7f80b
commit b5210fdb74

@ -0,0 +1,150 @@
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tracer, Provider and span context."""
import contextlib
from typing import Any, Iterator, Optional, Sequence
import logging
import pathlib
import sys
from opentelemetry import context as otel_context_api
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk import trace as otel_trace_sdk
from opentelemetry.util import types as otel_types
from . import config
from . import clearcut_span_exporter
from . import detector
@contextlib.contextmanager
def use_span(
span: otel_trace_api.Span,
end_on_exit: bool = False,
record_exception: bool = True,
set_status_on_exception: bool = True,
) -> Iterator[otel_trace_api.Span]:
"""Takes a non-active span and activates it in the current context."""
try:
token = otel_context_api.attach(
# This is needed since the key needs to be the same as
# used in the rest of opentelemetry code.
otel_context_api.set_value(otel_trace_api._SPAN_KEY, span))
try:
yield span
finally:
otel_context_api.detach(token)
except KeyboardInterrupt as exc:
if span.is_recording():
if record_exception:
span.record_exception(exc)
if set_status_on_exception:
span.set_status(otel_trace_api.StatusCode.OK)
raise
except BaseException as exc:
if span.is_recording():
# Record the exception as an event
if record_exception:
span.record_exception(exc)
# Set status in case exception was raised
if set_status_on_exception:
span.set_status(
otel_trace_api.Status(
status_code=otel_trace_api.StatusCode.ERROR,
description=f"{type(exc).__name__}: {exc}",
))
raise
finally:
if end_on_exit:
span.end()
class Tracer(otel_trace_api.Tracer):
"""Specific otel tracer."""
def __init__(self, inner: otel_trace_sdk.Tracer) -> None:
self._inner = inner
def start_span(
self,
name: str,
context: Optional[otel_context_api.Context] = None,
kind: otel_trace_api.SpanKind = otel_trace_api.SpanKind.INTERNAL,
attributes: otel_types.Attributes = None,
links: Optional[Sequence[otel_trace_api.Link]] = None,
start_time: Optional[int] = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
) -> otel_trace_api.Span:
return self._inner.start_span(
name,
context=context,
kind=kind,
attributes=attributes,
links=links,
start_time=start_time,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
)
@contextlib.contextmanager
def start_as_current_span(
self,
name: str,
context: Optional[otel_context_api.Context] = None,
kind: otel_trace_api.SpanKind = otel_trace_api.SpanKind.INTERNAL,
attributes: otel_types.Attributes = None,
links: Optional[Sequence[otel_trace_api.Link]] = None,
start_time: Optional[int] = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
end_on_exit: bool = True,
) -> Iterator[otel_trace_api.Span]:
span = self.start_span(
name=name,
context=context,
kind=kind,
attributes=attributes,
links=links,
start_time=start_time,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
)
with use_span(
span,
end_on_exit=end_on_exit,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
) as span_context:
yield span_context
class TracerProvider(otel_trace_api.TracerProvider):
"""Specific otel tracer provider."""
def __init__(self, inner: otel_trace_sdk.TracerProvider) -> None:
self._inner = inner
def get_tracer(
self,
instrumenting_module_name: str,
instrumenting_library_version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> otel_trace_api.Tracer:
tracer = self._inner.get_tracer(
instrumenting_module_name=instrumenting_module_name,
instrumenting_library_version=instrumenting_library_version,
schema_url=schema_url,
)
return Tracer(tracer)
def __getattr__(self, name: str) -> Any:
"""Method allows to delegate method calls."""
return getattr(self._inner, name)

@ -0,0 +1,77 @@
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test the trace implementation."""
import contextlib
from typing import Sequence
from opentelemetry import trace as trace_api
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk import resources
from opentelemetry.sdk.trace import export as export_sdk
from . import trace as trace
class SpanExporterStub(export_sdk.SpanExporter):
"""Stub for SpanExporter."""
def __init__(self) -> None:
self._spans = []
@property
def spans(self):
return self._spans
def export(
self, spans: Sequence[trace_sdk.ReadableSpan]
) -> export_sdk.SpanExportResult:
self._spans.extend(spans)
return export_sdk.SpanExportResult.SUCCESS
def test_span_to_capture_keyboard_interrupt_as_decorator() -> None:
"""Test span can capture KeyboardInterrupt as decorator."""
exporter = SpanExporterStub()
provider = trace.TracerProvider(trace_sdk.TracerProvider())
provider.add_span_processor(
export_sdk.BatchSpanProcessor(span_exporter=exporter))
tracer = provider.get_tracer(__name__)
@tracer.start_as_current_span("test-span")
def test_function() -> None:
raise KeyboardInterrupt()
with contextlib.suppress(KeyboardInterrupt):
test_function()
provider.shutdown()
assert len(exporter.spans) == 1
assert exporter.spans[0].events[0].name == "exception"
assert (exporter.spans[0].events[0].attributes["exception.type"] ==
"KeyboardInterrupt")
assert exporter.spans[0].status.status_code == trace_api.StatusCode.OK
def test_span_to_capture_keyboard_interrupt_in_context() -> None:
"""Test span can capture KeyboardInterrupt in context."""
exporter = SpanExporterStub()
provider = trace.TracerProvider(trace_sdk.TracerProvider())
provider.add_span_processor(
export_sdk.BatchSpanProcessor(span_exporter=exporter))
tracer = provider.get_tracer(__name__)
with contextlib.suppress(KeyboardInterrupt):
with tracer.start_as_current_span("test-span"):
raise KeyboardInterrupt()
provider.shutdown()
assert len(exporter.spans) == 1
assert exporter.spans[0].events[0].name == "exception"
assert (exporter.spans[0].events[0].attributes["exception.type"] ==
"KeyboardInterrupt")
assert exporter.spans[0].status.status_code == trace_api.StatusCode.OK
Loading…
Cancel
Save