diff --git a/infra_lib/telemetry/anonymization_unittest.py b/infra_lib/telemetry/anonymization_unittest.py index 877290af6..4bcc81604 100644 --- a/infra_lib/telemetry/anonymization_unittest.py +++ b/infra_lib/telemetry/anonymization_unittest.py @@ -7,9 +7,22 @@ import getpass import re import pytest +from .proto import trace_span_pb2 from . import anonymization +def test_anonymizing_filter_to_redact_info_from_msg() -> None: + """Test AnonymizingFilter to apply the passed anonymizer to msg.""" + msg = trace_span_pb2.TraceSpan() + msg.name = "log-user-user1234" + + anonymizer = anonymization.Anonymizer([(re.escape("user1234"), "")]) + f = anonymization.AnonymizingFilter(anonymizer) + + filtered_msg = f(msg) + assert filtered_msg.name == "log-user-" + + def test_default_anonymizer_to_remove_username_from_path(monkeypatch) -> None: """Test that default Anonymizer redacts username.""" monkeypatch.setattr(getpass, "getuser", lambda: "user") diff --git a/infra_lib/telemetry/clearcut_span_exporter.py b/infra_lib/telemetry/clearcut_span_exporter.py new file mode 100644 index 000000000..81fbbb7ca --- /dev/null +++ b/infra_lib/telemetry/clearcut_span_exporter.py @@ -0,0 +1,320 @@ +# Copyright 2024 The Chromium Authors +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Defines the telemetry exporter for exporting to ClearCut.""" + +import datetime +import logging +import time +import urllib.error +import urllib.request + +from typing import Callable, Dict, Optional, Pattern, Sequence, Tuple +from google.protobuf import ( + json_format, + message as proto_msg, + struct_pb2, +) +from opentelemetry import trace as otel_trace_api +from opentelemetry.sdk import ( + trace as otel_trace_sdk, + resources as otel_resources, +) +from opentelemetry.sdk.trace import export as otel_export +from opentelemetry.util import types as otel_types + +from . import anonymization +from . import detector +from .proto import trace_span_pb2 +from .proto import clientanalytics_pb2 + +_DEFAULT_ENDPOINT = 'https://play.googleapis.com/log' +_DEFAULT_TIMEOUT = 15 +_DEFAULT_FLUSH_TIMEOUT_MILLIS = 30000 +_DEAULT_MAX_WAIT_SECS = 60 +# Preallocated in Clearcut proto to cros Build. +_LOG_SOURCE = 2044 +# Preallocated in Clearcut proto to Python clients. +_CLIENT_TYPE = 33 +_DEFAULT_MAX_QUEUE_SIZE = 1000 + + +class ClearcutSpanExporter(otel_export.SpanExporter): + """Exports the spans to google http endpoint.""" + + def __init__( + self, + endpoint: str = _DEFAULT_ENDPOINT, + timeout: int = _DEFAULT_TIMEOUT, + max_wait_secs: int = _DEAULT_MAX_WAIT_SECS, + max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE, + prefilter: Optional[Callable[[trace_span_pb2.TraceSpan], + trace_span_pb2.TraceSpan]] = None, + ) -> None: + self._endpoint = endpoint + self._timeout = timeout + self._prefilter = prefilter or anonymization.AnonymizingFilter( + anonymization.Anonymizer()) + self._log_source = _LOG_SOURCE + self._next_request_dt = datetime.datetime.now() + self._max_wait_secs = max_wait_secs + self._queue = [] + self._max_queue_size = max_queue_size + + def export( + self, spans: Sequence[otel_trace_sdk.ReadableSpan] + ) -> otel_export.SpanExportResult: + spans = (self._prefilter(self._translate_span(s)) for s in spans) + self._queue.extend(spans) + + if len(self._queue) >= self._max_queue_size: + return (otel_export.SpanExportResult.SUCCESS + if self._export_batch() else + otel_export.SpanExportResult.FAILURE) + + return otel_export.SpanExportResult.SUCCESS + + def shutdown(self) -> None: + self.force_flush() + + def force_flush(self, + timeout_millis: int = _DEFAULT_FLUSH_TIMEOUT_MILLIS + ) -> bool: + if self._queue: + return self._export_batch(timeout=timeout_millis / 1000) + + return True + + def _translate_context( + self, data: otel_trace_api.SpanContext + ) -> trace_span_pb2.TraceSpan.Context: + ctx = trace_span_pb2.TraceSpan.Context() + ctx.trace_id = f'0x{otel_trace_api.format_trace_id(data.trace_id)}' + ctx.span_id = f'0x{otel_trace_api.format_span_id(data.span_id)}' + ctx.trace_state = repr(data.trace_state) + return ctx + + def _translate_attributes(self, + data: otel_types.Attributes) -> struct_pb2.Struct: + patch = {} + for key, value in data.items(): + if isinstance(value, tuple): + value = list(value) + patch[key] = value + + struct = struct_pb2.Struct() + try: + struct.update(patch) + except Exception as exception: + logging.debug('Set attribute failed: %s', exception) + return struct + + def _translate_span_attributes( + self, data: otel_trace_sdk.ReadableSpan) -> struct_pb2.Struct: + return self._translate_attributes(data.attributes) + + def _translate_links( + self, + data: otel_trace_sdk.ReadableSpan) -> trace_span_pb2.TraceSpan.Link: + links = [] + + for link_data in data.links: + link = trace_span_pb2.TraceSpan.Link() + link.context.MergeFrom(self._translate_context(link_data.context)) + link.attributes.MergeFrom( + self._translate_attributes(link_data.attributes)) + links.append(link) + + return links + + def _translate_events( + self, data: otel_trace_sdk.ReadableSpan + ) -> trace_span_pb2.TraceSpan.Event: + events = [] + for event_data in data.events: + event = trace_span_pb2.TraceSpan.Event() + event.event_time_millis = int(event_data.timestamp / 1e6) + event.name = event_data.name + event.attributes.MergeFrom( + self._translate_attributes(event_data.attributes)) + events.append(event) + return events + + def _translate_instrumentation_scope( + self, data: otel_trace_sdk.ReadableSpan + ) -> trace_span_pb2.TraceSpan.InstrumentationScope: + instrumentation_scope = data.instrumentation_scope + scope = trace_span_pb2.TraceSpan.InstrumentationScope() + scope.name = instrumentation_scope.name + scope.version = instrumentation_scope.version + return scope + + def _translate_env(self, data: Dict[str, str]) -> Dict[str, str]: + environ = {} + for key, value in data.items(): + if key.startswith('process.env.'): + key = key.split('process.env.')[1] + environ[key] = value + return environ + + def _translate_resource( + self, data: otel_trace_sdk.ReadableSpan + ) -> trace_span_pb2.TraceSpan.Resource: + attrs = dict(data.resource.attributes) + resource = trace_span_pb2.TraceSpan.Resource() + resource.system.cpu = attrs.pop(detector.CPU_NAME, '') + resource.system.host_architecture = attrs.pop(detector.CPU_ARCHITECTURE, + '') + resource.system.os_name = attrs.pop(detector.OS_NAME, '') + resource.system.os_version = attrs.pop(otel_resources.OS_DESCRIPTION, + '') + resource.system.os_type = attrs.pop(otel_resources.OS_TYPE, '') + resource.process.pid = str(attrs.pop(otel_resources.PROCESS_PID, '')) + resource.process.executable_name = attrs.pop( + otel_resources.PROCESS_EXECUTABLE_NAME, '') + resource.process.executable_path = attrs.pop( + otel_resources.PROCESS_EXECUTABLE_PATH, '') + resource.process.command = attrs.pop(otel_resources.PROCESS_COMMAND, '') + resource.process.command_args.extend( + attrs.pop(otel_resources.PROCESS_COMMAND_ARGS, [])) + resource.process.owner_is_root = (attrs.pop( + otel_resources.PROCESS_OWNER, 9999) == 0) + resource.process.runtime_name = attrs.pop( + otel_resources.PROCESS_RUNTIME_NAME, '') + resource.process.runtime_version = attrs.pop( + otel_resources.PROCESS_RUNTIME_VERSION, '') + resource.process.runtime_description = attrs.pop( + otel_resources.PROCESS_RUNTIME_DESCRIPTION, '') + resource.process.api_version = str( + attrs.pop(detector.PROCESS_RUNTIME_API_VERSION, '')) + resource.process.env.update(self._translate_env(attrs)) + resource.attributes.MergeFrom(self._translate_attributes(attrs)) + return resource + + def _translate_status( + self, data: otel_trace_sdk.ReadableSpan + ) -> trace_span_pb2.TraceSpan.Status: + status = trace_span_pb2.TraceSpan.Status() + + if data.status.status_code == otel_trace_sdk.StatusCode.ERROR: + status.status_code = ( + trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_ERROR) + else: + status.status_code = ( + trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_OK) + + if data.status.description: + status.message = data.status.description + + return status + + def _translate_sdk( + self, data: otel_trace_sdk.ReadableSpan + ) -> trace_span_pb2.TraceSpan.TelemetrySdk: + attrs = data.resource.attributes + sdk = trace_span_pb2.TraceSpan.TelemetrySdk() + sdk.name = attrs.get(otel_resources.TELEMETRY_SDK_NAME) + sdk.version = attrs.get(otel_resources.TELEMETRY_SDK_VERSION) + sdk.language = attrs.get(otel_resources.TELEMETRY_SDK_LANGUAGE) + return sdk + + def _translate_kind( + self, + data: otel_trace_api.SpanKind) -> trace_span_pb2.TraceSpan.SpanKind: + if data == otel_trace_api.SpanKind.INTERNAL: + return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_INTERNAL + elif data == otel_trace_api.SpanKind.CLIENT: + return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_CLIENT + elif data == otel_trace_api.SpanKind.SERVER: + return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_SERVER + return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_UNSPECIFIED + + def _translate_span( + self, + data: otel_trace_sdk.ReadableSpan) -> trace_span_pb2.TraceSpan: + span = trace_span_pb2.TraceSpan() + span.name = data.name + span.context.MergeFrom(self._translate_context(data.get_span_context())) + + if isinstance(data.parent, otel_trace_api.Span): + ctx = data.parent.context + span.parent_span_id = ( + f'0x{otel_trace_api.format_span_id(ctx.span_id)}') + elif isinstance(data.parent, otel_trace_api.SpanContext): + span.parent_span_id = ( + f'0x{otel_trace_api.format_span_id(data.parent.span_id)}') + + span.start_time_millis = int(data.start_time / 1e6) + span.end_time_millis = int(data.end_time / 1e6) + span.span_kind = self._translate_kind(data.kind) + span.instrumentation_scope.MergeFrom( + self._translate_instrumentation_scope(data)) + span.events.extend(self._translate_events(data)) + span.links.extend(self._translate_links(data)) + span.attributes.MergeFrom(self._translate_span_attributes(data)) + span.status.MergeFrom(self._translate_status(data)) + span.resource.MergeFrom(self._translate_resource(data)) + span.telemetry_sdk.MergeFrom(self._translate_sdk(data)) + + return span + + def _export_batch(self, timeout: Optional[int] = None) -> bool: + """Export the spans to clearcut via http api.""" + + spans = self._queue[:self._max_queue_size] + self._queue = self._queue[self._max_queue_size:] + + wait_delta = self._next_request_dt - datetime.datetime.now() + wait_time = wait_delta.total_seconds() + + # Drop the packets if wait time is more than threshold. + if wait_time > self._max_wait_secs: + logging.warning( + 'dropping %d spans for large wait: %d', + len(spans), + wait_time, + ) + return True + + if wait_time > 0: + time.sleep(wait_time) + + logrequest = self._prepare_request_body(spans) + + req = urllib.request.Request( + self._endpoint, + data=logrequest.SerializeToString(), + method='POST', + ) + logresponse = clientanalytics_pb2.LogResponse() + + try: + with urllib.request.urlopen(req, timeout=timeout + or self._timeout) as f: + logresponse.ParseFromString(f.read()) + except urllib.error.URLError as url_exception: + logging.warning(url_exception) + return False + except proto_msg.DecodeError as decode_error: + logging.warning('could not decode data into proto: %s', + decode_error) + return False + + now = datetime.datetime.now() + delta = datetime.timedelta( + milliseconds=logresponse.next_request_wait_millis) + self._next_request_dt = now + delta + return True + + def _prepare_request_body(self, spans) -> clientanalytics_pb2.LogRequest: + log_request = clientanalytics_pb2.LogRequest() + log_request.request_time_ms = int(time.time() * 1000) + log_request.client_info.client_type = _CLIENT_TYPE + log_request.log_source = self._log_source + + for span in spans: + log_event = log_request.log_event.add() + log_event.event_time_ms = int(time.time() * 1000) + log_event.source_extension = span.SerializeToString() + + return log_request diff --git a/infra_lib/telemetry/clearcut_span_exporter_unittest.py b/infra_lib/telemetry/clearcut_span_exporter_unittest.py new file mode 100644 index 000000000..b895785e9 --- /dev/null +++ b/infra_lib/telemetry/clearcut_span_exporter_unittest.py @@ -0,0 +1,250 @@ +# Copyright 2024 The Chromium Authors +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Unittests for SpanExporter classes.""" + +import datetime +import re +import time +import urllib.request + +from opentelemetry.sdk import trace +from opentelemetry.sdk.trace import export + +from .proto import clientanalytics_pb2 +from .proto import trace_span_pb2 +from . import anonymization +from . import clearcut_span_exporter + + +class MockResponse: + """Mock requests.Response.""" + + def __init__(self, status, text) -> None: + self._status = status + self._text = text + + def __enter__(self): + return self + + def __exit__(self, *args) -> None: + pass + + def read(self): + return self._text + + +tracer = trace.TracerProvider().get_tracer(__name__) + + +def test_otel_span_translation(monkeypatch) -> None: + """Test ClearcutSpanExporter to translate otel spans to TraceSpan.""" + requests = [] + + def mock_urlopen(request, timeout=0): + requests.append((request, timeout)) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 1 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("name") + span.end() + + e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=1) + + assert e.export([span]) == export.SpanExportResult.SUCCESS + req, _ = requests[0] + log_request = clientanalytics_pb2.LogRequest() + log_request.ParseFromString(req.data) + + assert log_request.request_time_ms <= int(time.time() * 1000) + assert len(log_request.log_event) == 1 + + # The following constants are defined in clearcut_span_exporter + # as _CLIENT_TYPE and _LOG_SOURCE respectively. + assert log_request.client_info.client_type == 33 + assert log_request.log_source == 2044 + + tspan = trace_span_pb2.TraceSpan() + tspan.ParseFromString(log_request.log_event[0].source_extension) + + assert tspan.name == span.name + assert tspan.start_time_millis == int(span.start_time / 1e6) + assert tspan.end_time_millis == int(span.end_time / 1e6) + + +def test_otel_span_translation_with_anonymization(monkeypatch) -> None: + """Test ClearcutSpanExporter to anonymize spans to before export.""" + requests = [] + + def mock_urlopen(request, timeout=0): + requests.append((request, timeout)) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 1 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("span-user4321") + span.set_attributes({"username": "user4321"}) + span.add_event("event-for-user4321") + span.end() + + anonymizer = anonymization.Anonymizer([(re.escape("user4321"), "")]) + f = anonymization.AnonymizingFilter(anonymizer) + e = clearcut_span_exporter.ClearcutSpanExporter(prefilter=f, + max_queue_size=1) + + assert e.export([span]) == export.SpanExportResult.SUCCESS + req, _ = requests[0] + log_request = clientanalytics_pb2.LogRequest() + log_request.ParseFromString(req.data) + + tspan = trace_span_pb2.TraceSpan() + tspan.ParseFromString(log_request.log_event[0].source_extension) + + assert tspan.name == "span-" + assert tspan.events[0].name == "event-for-" + assert tspan.attributes["username"] == "" + + +def test_export_to_http_api(monkeypatch) -> None: + """Test ClearcutSpanExporter to export spans over http.""" + requests = [] + + def mock_urlopen(request, timeout=0): + requests.append((request, timeout)) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 1 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("name") + span.end() + endpoint = "http://domain.com/path" + + e = clearcut_span_exporter.ClearcutSpanExporter(endpoint=endpoint, + timeout=7, + max_queue_size=1) + + assert e.export([span]) + req, timeout = requests[0] + assert req.full_url == endpoint + assert timeout == 7 + + +def test_export_to_http_api_throttle(monkeypatch) -> None: + """Test ClearcutSpanExporter to throttle based on prev response.""" + mock_open_times = [] + + def mock_urlopen(request, timeout=0): + mock_open_times.append(datetime.datetime.now()) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 1000 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("name") + span.end() + + e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=1) + + assert e.export([span]) + assert e.export([span]) + + # We've called export() on the same exporter instance twice, so we expect + # the following things to be true: + # 1. The request.urlopen() function has been called exactly twice, and + # 2. The calls to urlopen() are more than 1000 ms apart (due to the + # value in the mock_urlopen response). + # The mock_open_times list is a proxy for observing this behavior directly. + assert len(mock_open_times) == 2 + assert (mock_open_times[1] - mock_open_times[0]).total_seconds() > 1 + + +def test_export_to_drop_spans_if_wait_more_than_threshold(monkeypatch) -> None: + """Test ClearcutSpanExporter to drop span if wait is more than threshold.""" + mock_open_times = [] + + def mock_urlopen(request, timeout=0): + nonlocal mock_open_times + mock_open_times.append(datetime.datetime.now()) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 900000 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("name") + span.end() + + e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=1) + + assert e.export([span]) + assert e.export([span]) + + # We've called export() on the same exporter instance twice, so we expect + # the following things to be true: + # 1. The request.urlopen() function has been called exactly once + assert len(mock_open_times) == 1 + + +def test_flush_to_clear_export_queue_to_http_api(monkeypatch) -> None: + """Test ClearcutSpanExporter to export spans on flush.""" + requests = [] + + def mock_urlopen(request, timeout=0): + requests.append((request, timeout)) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 1 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("name") + span.end() + + e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=3) + + assert e.export([span]) + assert e.export([span]) + assert len(requests) == 0 + + assert e.force_flush() + assert len(requests) == 1 + + +def test_shutdown_to_clear_export_queue_to_http_api(monkeypatch) -> None: + """Test ClearcutSpanExporter to export spans on shutdown.""" + requests = [] + + def mock_urlopen(request, timeout=0): + requests.append((request, timeout)) + resp = clientanalytics_pb2.LogResponse() + resp.next_request_wait_millis = 1 + body = resp.SerializeToString() + return MockResponse(200, body) + + monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen) + + span = tracer.start_span("name") + span.end() + + e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=3) + + assert e.export([span]) + assert e.export([span]) + assert len(requests) == 0 + + e.shutdown() + assert len(requests) == 1