agentops
agentops copied to clipboard
Instrument OpenAI responses API
Instrument OpenAI responses API
Description
Add instrumentation for the OpenAI responses API to track API calls, similar to how other OpenAI API endpoints are currently instrumented. This will allow AgentOps to monitor and analyze responses API usage.
Approach
- Create a new file
responses_wrappers.pyin thethird_party/opentelemetry/instrumentation/openai/shared/directory with wrapper functions for the responses API. - Add the wrapper functions to the
OpenAIV1Instrumentor._instrumentmethod inthird_party/opentelemetry/instrumentation/openai/v1/__init__.py. - Add tests for the responses API instrumentation in
tests/integration/test_openai_instrumentation.py.
Implementation Details
1. Create responses_wrappers.py
Create a new file with wrapper functions for both synchronous and asynchronous API calls:
import logging
import time
from opentelemetry import context as context_api
from opentelemetry.metrics import Counter, Histogram
from agentops.semconv import (
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY,
SpanAttributes,
LLMRequestTypeValues,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.openai.utils import (
_with_tracer_wrapper,
dont_throw,
is_openai_v1,
)
from opentelemetry.instrumentation.openai.shared import (
_set_client_attributes,
_set_request_attributes,
_set_span_attribute,
_set_response_attributes,
is_streaming_response,
should_send_prompts,
model_as_dict,
propagate_trace_context,
)
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
SPAN_NAME = "openai.response"
LLM_REQUEST_TYPE = LLMRequestTypeValues.CHAT
logger = logging.getLogger(__name__)
@_with_tracer_wrapper
def responses_wrapper(tracer, wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
):
return wrapped(*args, **kwargs)
span = tracer.start_span(
SPAN_NAME,
kind=SpanKind.CLIENT,
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
)
_handle_request(span, kwargs, instance)
try:
response = wrapped(*args, **kwargs)
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
raise e
if is_streaming_response(response):
# Handle streaming response
return _build_from_streaming_response(span, response)
else:
_handle_response(response, span)
span.end()
return response
@_with_tracer_wrapper
async def aresponses_wrapper(tracer, wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
):
return await wrapped(*args, **kwargs)
span = tracer.start_span(
SPAN_NAME,
kind=SpanKind.CLIENT,
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
)
_handle_request(span, kwargs, instance)
try:
response = await wrapped(*args, **kwargs)
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.end()
raise e
if is_streaming_response(response):
# Handle streaming response
return _abuild_from_streaming_response(span, response)
else:
_handle_response(response, span)
span.end()
return response
@dont_throw
def _handle_request(span, kwargs, instance):
_set_request_attributes(span, kwargs)
_set_client_attributes(span, instance)
if should_send_prompts():
_set_prompts(span, kwargs)
# Add trace context propagation if enabled
propagate_trace_context(span, kwargs)
@dont_throw
def _handle_response(response, span):
if is_openai_v1():
response_dict = model_as_dict(response)
else:
response_dict = response
_set_response_attributes(span, response_dict)
if should_send_prompts():
_set_completions(span, response_dict)
def _set_prompts(span, kwargs):
if not span.is_recording():
return
# Set input attribute
input_value = kwargs.get("input")
if input_value:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
input_value if isinstance(input_value, str) else str(input_value),
)
# Set instructions attribute
instructions = kwargs.get("instructions")
if instructions:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.1.role",
"system",
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.1.content",
instructions,
)
@dont_throw
def _set_completions(span, response_dict):
if not span.is_recording():
return
# Set output text attribute
output_text = response_dict.get("output_text")
if output_text:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_COMPLETIONS}.0.content",
output_text,
)
2. Update OpenAIV1Instrumentor._instrument method
Add the following code to the _instrument method in third_party/opentelemetry/instrumentation/openai/v1/__init__.py:
from opentelemetry.instrumentation.openai.shared.responses_wrappers import (
responses_wrapper,
aresponses_wrapper,
)
# Add this to the _instrument method
wrap_function_wrapper(
"openai.resources.responses",
"Responses.create",
responses_wrapper(tracer),
)
wrap_function_wrapper(
"openai.resources.responses",
"AsyncResponses.create",
aresponses_wrapper(tracer),
)
3. Add tests for responses API instrumentation
Add the following test to tests/integration/test_openai_instrumentation.py:
@pytest.mark.asyncio
async def test_responses_api_tracking(agentops_session):
"""Test that responses API calls are tracked in session context"""
try:
client = openai.AsyncOpenAI()
response = await client.responses.create(
model="gpt-3.5-turbo",
input="Write a one-line joke"
)
# Verify session tracking
assert session.event_counts["llms"] == 1
assert session.event_counts["errors"] == 0
assert response.output_text is not None
finally:
session.end("SUCCEEDED")
Files to be modified:
- Create new file:
third_party/opentelemetry/instrumentation/openai/shared/responses_wrappers.py - Update:
third_party/opentelemetry/instrumentation/openai/v1/__init__.py - Update:
tests/integration/test_openai_instrumentation.py
Verification Strategy
- Run unit tests to verify that the responses API instrumentation works correctly
- Test both synchronous and asynchronous API calls
- Test with streaming and non-streaming responses
- Verify that the instrumentation correctly captures all relevant attributes
- Verify that the instrumentation works with error handling
Fantastic work done by @tcdent so closing this.