opentelemetry-python-contrib
opentelemetry-python-contrib copied to clipboard
Instrument HttpResponse body stream.
Is your feature request related to a problem?
Yes. We want to instrument botocore "bedrock-runtime" service response. the result returned contains a "body" which is an StreamingBody object with self._raw_stream as an HttpResponse object. Here is an example response content:
{
"ResponseMetadata": {
"RequestId": "b9358492-e165-420e-bd54-d29f87deff69",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"date": "Tue, 11 Jun 2024 03:42:14 GMT",
"content-type": "application/json",
"content-length": "1508",
"connection": "keep-alive",
"x-amzn-requestid": "b9358492-e165-420e-bd54-d29f87deff69",
"x-amzn-bedrock-invocation-latency": "8620",
"x-amzn-bedrock-output-token-count": "300",
"x-amzn-bedrock-input-token-count": "17"
},
"RetryAttempts": 0
},
"contentType": "application/json",
"body": {
"type": "completion",
"completion": " Here is a simple explanation of black holes for 8th graders:\n\n- A black hole is a region in space where gravity is so strong that nothing can escape from it, even light. This region is called the event horizon.\n\n- Black holes form when very massive stars collapse under their own gravity at the end of their life cycle. This collapse crushes the mass of the star into a very small, dense region forming the black hole. \n\n- The gravity of a black hole is so intense because all of the mass of the original star is concentrated into an extremely small space inside the event horizon. This creates a gravitational field so strong that not even light can escape past the event horizon.\n\n- Because no light can get out, black holes are invisible. We can only detect them by the effect their gravitational field has on nearby stars and gas clouds. As matter gets pulled toward a black hole, it forms an accretion disk that heats up and emits X-rays that we can detect.\n\n- Black holes continue to grow in mass as they suck in more matter from their surroundings. Supermassive black holes are thought to exist at the center of most large galaxies, including our own Milky Way galaxy.\n\n- If you fell into a black hole, you would be ripped apart by tidal forces before reaching the center. From an outside perspective, you would appear frozen at the event horizon forever due to time dilation from the extreme gravity.",
"stop_reason": "max_tokens",
"stop": null
}
we want to extract "stop_reason" from it. But since it is a stream and connect directly from socket, and there is no seek() function for StreamingBody class, it can only be read once. If we read it for instrumentation purpose, the actual customer won't be able to read it again. Is there is way to get the "stop_reason" without reading the HttpResponse?
Describe the solution you'd like We're seeking alternative methods to achieve our instrumentation goals without encountering excessive memory usage.
Describe alternatives you've considered I attempted to create a new stream and put it back into the response body:
raw_stream = result["body"]._raw_stream.read()
response = json.loads(raw_stream.decode("UTF8"))
# gen_ai.response.finish_reason: completion_reason = result.body.results.completionReason
finish_reason = response.get("results")[0].get("completionReason")
if finish_reason:
span.set_attribute(
"gen_ai.response.finish_reason",
finish_reason,
)
stream_csv = io.BytesIO(raw_stream)
stream_csv.seek(0)
result["body"]._raw_stream = stream_csv
It was successful. However, this approach caused memory usage issues, especially for large bodies.
Additional context
We tested using "amazon.titan-text-premier-v1:0" bedrock-runtime model with InvokeModel API call:
client = boto3.client("bedrock-runtime", region_name="us-east-1")
model_id = "amazon.titan-text-premier-v1:0"
user_message = "Describe the purpose of a 'hello world' program in one line."
prompt = f"<s>[INST] {user_message} [/INST]" # Prompt Token
body = json.dumps({
"inputText": prompt,
"textGenerationConfig": {
"maxTokenCount": 3072, # max_tokens
"stopSequences": [],
"temperature": 0.7, # temperature
"topP": 0.9 # top_p
}
})
accept = "application/json"
content_type = "application/json"
response = client.invoke_model(
body=body, modelId=model_id, accept=accept, contentType=content_type
)
response_body = json.loads(response.get("body").read())