amazon-kinesis-producer
amazon-kinesis-producer copied to clipboard
Upgrade to "Release 0.10.0" - Instability?
We were running our app using release 0.9.0 smoothly for weeks without issues. After upgrading the KPL to release 0.10.0 last week, our app started to fail ("outofmemory") after some hours running OK. We noticed a high cpu and memory usage on the KPL native process itself. We rolled back to version 0.9.0 and it started working flawlessly again. Is there a way to generate a log file for the KPL library so that we can retrieve more info?
Hi Andre,
Unfortunately the native process's logs are fairly sparse at the moment. I typically use valgrind's massif to debug memory issues.
Can you provide some additional details about the problem?
- Which OS is this on, and what kind of hardware?
- How much CPU and memory usage were you seeing?
- Is the java process also experiencing high CPU and memory usage?
- What throughput does the app operate at in terms of user records per second, and how big are the records?
- What is your partition key scheme like? Is it random? Do you use explicit hash keys?
- How many shards do you have in the stream?
- Which configurations did you change from the default, and what values did you set them to?
Thanks!
Hi Kevin, please find below the answers:
Which OS is this on, and what kind of hardware? R: Amazon Linux, t2.medium
How much CPU and memory usage were you seeing? R: 100% CPU and memory usage,
Is the java process also experiencing high CPU and memory usage? Yes
What throughput does the app operate at in terms of user records per second, and how big are the records? R: records are small 1~2kB at about 3000 per second.
What is your partition key scheme like? Is it random? Do you use explicit hash keys? R: we use random partition keys
How many shards do you have in the stream? R: we are using one KPL instance to publish to 4 different streams with 3 shards each.
Which configurations did you change from the default, and what values did you set them to? R: here are our settings on KPL instance: setMetricsLevel("none"); setRequestTimeout(60000); setRecordTtl(120000); setRecordMaxBufferedTime(1000); setAggregationEnabled(true);
We could upgrade our app again to 0.10.0 to gather debug information if you need, just tell me how to do so. Thanks!
Thanks Andre. Let me try to reproduce this. I'll let you know once I have results.
Hi Andre,
I have been unable to reproduce the problem so far, some follow up questions:
- Do you have a number on how much memory the kinesis_producer process was consuming? How much memory does it consume when operating normally with 0.9.0?
- The 3000 records per second, is that per stream or total?
- Does your application respond to back pressure in the record queue? I.e. does it back off from calling
addUserRecord()based on the value ofgetOutstandingRecordsCount()?
Also, I see that you have disabled metrics. It might be helpful to enable it for a test run to see if there are anomalous amounts of errors.
1 - Memory kinesis_producer (0.9.0): VmSize: 366300 kB VmPeak: 404944 kB
2 - 3000 records total divided among 4 streams.
3 - We don't have a back off strategy for now. Since our shards seem enough for the throughput we are observing. We are going to log the getOutstandingRecordsCount() per minute to check if this is actually an issue.
This afternoon we are going to test version 0.10.0 in our test environment to see the outstanding records metric and the KPL stability. I'll keep you informed.
Hi Kevin, we started testing 0.10.0 on a parallel environment exactly with the same data as our production version (0.9.0).
We started to see an increasing number of outstanding records number in our "debug" environment, as follows: OutstandingRecords Count: 12828 295887 291059 531731
Right now, the memory consumption is 100% and CPU is also 100%. We are going to log the outstanding record in our production environment to see how is the behavior over there. Whereas the Production environment is running smoothly with the same code, the only difference is KPL version. Also is important to notice that in the debug environment, the Producer stopped publishing. We'll keep you updated regarding our future tests. If you have any suggested test, feel free to tell us.
Thanks for the updates Andre.
Did this happen right away or after sometime? Was the debug app behaving correctly in the beginning before it starting having problems?
Did you enable CW metrics for the debug app? Do you see errors? You can also get the metrics directly with the metrics methods on KinesisProducer itself.
Can you log the put attempts by adding callbacks to the futures returned by addUserRecord? See below. You may need to change some of the code if you're not using Java 8.
ClickEvent event = inputQueue.take();
String partitionKey = event.getSessionId();
String payload = event.getPayload();
ByteBuffer data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
while (kinesis.getOutstandingRecordsCount() > 1e4) {
Thread.sleep(1);
}
recordsPut.getAndIncrement();
ListenableFuture<UserRecordResult> f =
kinesis.addUserRecord(STREAM_NAME, partitionKey, data);
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
long totalTime = result.getAttempts().stream()
.mapToLong(a -> a.getDelay() + a.getDuration())
.sum();
// Only log with a small probability, otherwise it'll be very spammy
if (RANDOM.nextDouble() < 1e-5) {
log.info(String.format(
"Succesfully put record, partitionKey=%s, payload=%s, " +
"sequenceNumber=%s, shardId=%s, took %d attempts, totalling %s ms",
partitionKey, payload, result.getSequenceNumber(),
result.getShardId(), result.getAttempts().size(), totalTime));
}
}
@Override
public void onFailure(Throwable t) {
if (t instanceof UserRecordFailedException) {
UserRecordFailedException e = (UserRecordFailedException) t;
UserRecordResult result = e.getResult();
String errorList =
StringUtils.join(result.getAttempts().stream()
.map(a -> String.format(
"Delay after prev attempt: %d ms, Duration: %d ms, Code: %s, Message: %s",
a.getDelay(), a.getDuration(), a.getErrorCode(), a.getErrorMessage()))
.collect(Collectors.toList()), "\n");
log.error(String.format(
"Record failed to put, partitionKey=%s, payload=%s, attempts:\n%s",
partitionKey, payload, errorList));
}
};
});
We are planning to debug this problem this week. I'll get in touch soon.
Hi everyone!
I think we may have run into the same problem, even though it's a bit different. We are running a producer version 0.10.0. The CPU usage remains low, but at some point after several days the Docker container, in which we run the service restarted.
After investigating we found out that it slowly consumed all available memory until it died - it wasn't even under much load. And even though the Java heap remained mostly free. We couldn't identify exactly this memory-eating monster, but since there's nothing else running there and Java was okay, I think the KPL child process was to blame. The time when memory eating started coincided with a short deviation in otherwise stable Kinesis statistics. Perhaps a memory leak in C++?
How can we get more information/logs/debug?
Hi everyone!
I work with @andrepintorj and we ran some tests last week.
We ran our test application logging the put attempts as suggested by @kevincdeng. The application ran succesfully during 2 days, logging messages like the following:
INFO: Succesfully put record, partitionKey=13708797-fa50-4209-8c87-252179480e01, payload={<payload>}, sequenceNumber=49552870972749729350142165647149956523225437925766332418, shardId=shardId-000000000000, took 1 attempts, totalling 189 ms
So we decided to stress the shards by writing to it massively: we stopped the application for a few hours and started it again so that we could publish the retained messages in a short period of time. After some hours, the application started to emit some log messages like the following:
SEVERE: Record failed to put, partitionKey=cb19916b-88ff-41c3-81a5-c9c73d5f823e, payload={<payload>}, attempts:
And at some point, we observed the same situation described by @pkopac: the application consumed all available memory until it died.
Hey guys,
Thanks for looking into this. Will it be possible for you guys to share the code that was used when the problem happened? That will help greatly with reproducing the issue. You can email me directly.
Some additional questions about your setup to help me repro:
- Which OS did you use?
- Were you running in EC2? What instance type, which region?
- How many streams were you using, how many shards in each?
- How big were your records, what rate did you put at?
- How did you choose your partition keys? Was it random?
- What configurations did you set for the KPL?
- Does your application respond to back pressure in the record queue? I.e. does it back off from calling addUserRecord() based on the value of getOutstandingRecordsCount()?
@lucasrodcosta: Was there additional text in the second log message? It should show error messages related to each attempt made for the failed record.
@pkopac: What was the deviation you mentioned? For debugging, you can enable the KPL's CloudWatch metrics. You can also get detailed error messages using the Java code snippet earlier in this thread.
Thanks for your replies!
Well, our application is just on one shard and has a really low data flow, so it's probably hard to replicate (once in a week error?). The memory eating took 10 hours until it ate all 1.5 GB, so something broke and then it ate a bit of memory every 5 minutes or so. I'll answer your questions tomorrow, inc. screenshots of statistics with the deviation.
Hi everyone.
@kevincdeng: There are aditional text in error messages. See below:
SEVERE: Delay after prev attempt: 11707 ms, Duration: 0 ms, Code: Expired, Message: Expiration reached while waiting in limiter
SEVERE: Delay after prev attempt: -7376166 ms, Duration: 7394085 ms, Code: Exception, Message: Expired while waiting in HttpClient queue
SEVERE: Delay after prev attempt: 4028 ms, Duration: 0 ms, Code: Expired, Message: Record has reached expiration
The appearing of these messages coincides with the increase of OutstandingRecords.
The negative value in the second log message is unexpected. Do you see many of them?
Related (barely): @kevincdeng Any insight into why Code: Expired, Message: Record has reached expiration exceptions occur. I can see this being an issue in the consumer, but why would a producer say a message has expired?
Is this still a problem with the 0.12.x release?
We are using Maxwell Deamon together with KPL and we are getting constant 'Record failed to put - Expired : Record has reached expiration' exceptions being thrown.
06:52:14,770 ERROR KinesisCallback - Record failed to put - Expired : Record has reached expiration 06:52:14,770 ERROR KinesisCallback - Exception during put
We are still seeing this exception in 0.13.1 release as of now
Logs
Failed to forward kpl Logs to Kinesis Record failed to put - Expired : Record has reached expiration
Is there any stable version available as of now ?