File and fallback exporters
I wrote some exporters for internal use. The use case was to fall back to writing to a file if we encountered a network failure. I'm leaving this here to see if there's any appetite for upstreaming it.
from __future__ import annotations
from typing import Sequence
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
class FallbackSpanExporter(SpanExporter):
def __init__(
self,
exporter: SpanExporter,
fallback: SpanExporter,
) -> None:
self.exporter = exporter
self.fallback = fallback
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
res = self.exporter.export(spans)
except Exception:
self.fallback.export(spans)
raise
if res is not SpanExportResult.SUCCESS:
self.fallback.export(spans)
return res
def force_flush(self, timeout_millis: int = 30000) -> bool:
return self.exporter.force_flush(timeout_millis) and self.fallback.force_flush(timeout_millis)
def shutdown(self) -> None:
self.exporter.shutdown()
self.fallback.shutdown()
from __future__ import annotations
import threading
from pathlib import Path
from typing import IO, Sequence
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
encode_spans,
)
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
HEADER = b'OTEL BACKUP FILE\n'
VERSION = b'VERSION 1\n'
class FileSpanExporter(SpanExporter):
def __init__(
self,
file_path: str | Path | IO[bytes],
) -> None:
self.file_path = Path(file_path) if isinstance(file_path, str) else file_path
self._lock = threading.Lock()
self._file: IO[bytes] | None = None
self._wrote_header = False
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
with self._lock:
if not self._file:
if isinstance(self.file_path, Path):
self._file = self.file_path.open('ab')
else:
self._file = self.file_path
if self._file.tell() == 0:
self._file.write(HEADER)
self._file.write(VERSION)
encoded_spans = encode_spans(spans)
size = encoded_spans.ByteSize()
# we can represent up to a 4GB message
self._file.write(size.to_bytes(4, 'big'))
self._file.write(encoded_spans.SerializeToString())
self._file.flush()
return SpanExportResult.SUCCESS
def force_flush(self, timeout_millis: int = 30000) -> bool:
return True
def shutdown(self) -> None:
with self._lock:
if self._file:
self._file.flush()
if self._file is not self.file_path:
# don't close the file if it was passed in
self._file.close()
Would be nice if this could export to the JSON file format, as specified in https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md
Thanks for this, @adriangb!
This seems pretty close to the feature mentioned by @hterik above. Since there is already an experimental spec document, we could add this to our code while prefixing it with an underscore to keep it private while that spec document gets stabilized. Would you open a PR with these changes, please, @adriangb?
I don’t think there’s much similarity given that my implementation explicitly chooses to export to a binary format whereas the spec above chooses JSON. I also wrote a JSON converter to be able to introspect the files but I don’t know why you’d choose that by default given the massive size and performance overhead.
Seems like the easiest is to use otel collector with file exporter support such as: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/fileexporter/README.md