secor copied to clipboard
Scalability problem because empty temp dirs are not being deleted
We run a two fairly large message ingestion systems with dozens of topics and total of millions of messages per day (3M messages per day and 10Million messages each). Secor nodes (2-4 nodes on each system) would run OK when started but would have progressively bad performance with growing CPU and growing disk space until they crash. (space is reaching 60% but sometimes disk full error is reported despite 40% more available)
We found the root cause for this behavior. Secor spools files to local location as set by secor.local.path parameter.
The files are automatically purged, but the directories stay reaching millions of items on each node. This causes the secor process to slow down the IO access and use more and more CPU for each IO operation until the secor crashes with Mkdirs failed to create:
2020-10-29 14:04:07,413 [Thread-10] (com.pinterest.secor.consumer.Consumer:183) ERROR Thread failed
java.lang.RuntimeException: Failed to write message ParsedMessage{topic='some-topic', kafkaPartition=29, offset=11250388, kafkaKey=, payload={"@timestamp":"2020-10-29T14:04:01.157Z","beat":{"hostname":"ip-10-0-28-98","name":"ip-10-0-28-98","version":"5.0.1"},"input_type":"log","message":"{"secor":{"intermediatePath":"PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2"},"s32es":{"input_s3":{"bucket":"rem-secor-prod-eu.intermediate","key":"PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2"},"output_s3":{"bucket":"rem-secor-prod-eu.completed","key":"some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2/drsd_1c98f3c1a4c28caf93c39eacc03486e2.bml"},"elasticsearch":{"msg, timestamp=1603980241157, headers=[], mPartitions=[PATH_START, org, type, 1.3, me8, some_Inc, some_org, 2020-10-29, NaN-NaN-NaN, 2.1.0, 1601182555, 023130120130132, 1c98f3c1a4c28caf93c39eacc03486e2]}
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(
Caused by: Mkdirs failed to create /tmp/secor_data/64_21/some-topic/PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2 (exists=false, cwd=file:/opt/secor)
at org.apache.hadoop.fs.ChecksumFileSystem.create(
at org.apache.hadoop.fs.ChecksumFileSystem.create(
at org.apache.hadoop.fs.FileSystem.create(
at org.apache.hadoop.fs.FileSystem.create(
at org.apache.hadoop.fs.FileSystem.create(
at org.apache.hadoop.fs.FileSystem.create(
We delete the directories via external command: find /tmp/secor_data/ -type d -empty -delete this causes CPU as well as space growth to stabilize.
Is there any secor parameters to affect that behavior? one system is version 0.26, another 0.29, both display the same symptoms.
here are our startup parameters:
Launching the secor java command line: java -ea -Xmx1194m -Daws.access.key=XXXXXXXXXX-Daws.secret.key=XXXXXXXXXXXXXXXXX -Daws.region=us-east-1 -Dzookeeper.quorum=0.zk.service.consul:2181,1.zk.service.consul:2181,2.zk.service.consul:2181 -Dkafka.zookeeper.path=/ -Dsecor.s3.bucket=mobileye.secor.intermediate -Dsecor.s3.filesystem=s3n -Dsecor.s3.path=me8 -Dsecor.kafka.topic_filter=.* -Dsecor.message.parser.class=com.pinterest.secor.parser.RsdParser -Dsecor.local.path=/tmp/secor_data -Dsecor.compression.codec= -Dsecor.max.file.age.seconds=30 -Dsecor.max.file.size.bytes=50000 -Dostrich.port=19999 -Dsecor.kafka.topic_blacklist=secor.restore -Dsecor.upload.manager.class=com.pinterest.secor.uploader.S3UploadManager -Dlog4j.configuration=file:///opt/secor/ -cp secor.jar:lib/* com.pinterest.secor.main.ConsumerMain
2020-11-01 21:04:33,522 [Thread-5] (org.apache.kafka.common.config.AbstractConfig:347) INFO ConsumerConfig values: = true = 5000 auto.offset.reset = earliest bootstrap.servers = [kafka.service.consul:9092] check.crcs = true client.dns.lookup = default = ip-10-0-10-97_44_14 client.rack = = 540000 = 60000 = false exclude.internal.topics = true fetch.max.bytes = 52428800 = 500 fetch.min.bytes = 1 = secor_backup = null = 3000 interceptor.classes = [] = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 = 300000 max.poll.records = 500 = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 = 1000 = 50 = 30000 = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Do you want to take a look at these two PRs?
On Sun, Nov 1, 2020 at 1:07 PM David [email protected] wrote:
We run a two fairly large message ingestion systems with dozens of topics and total of millions of messages per day (3M messages per day and 10Million messages each). Secor nodes (2-4 nodes on each system) would run OK when started but would have progressively bad performance with growing CPU and growing disk space until they crash. (space is reaching 60% but sometimes disk full error is reported despite 40% more available)
[image: image]
We found the root cause for this behavior. Secor spools files to local location as set by secor.local.path parameter.
The files are automatically purged, but the directories stay reaching millions of items on each node. This causes the secor process to slow down the IO access and use more and more CPU for each IO operation until the secor crashes with Mkdirs failed to create:
2020-10-29 14:04:07,413 [Thread-10] (com.pinterest.secor.consumer.Consumer:183) ERROR Thread failed java.lang.RuntimeException: Failed to write message ParsedMessage{topic='some-topic', kafkaPartition=29, offset=11250388, kafkaKey=, payload={"@timestamp":"2020-10-29T14:04:01.157Z","beat":{"hostname":"ip-10-0-28-98","name":"ip-10-0-28-98","version":"5.0.1"},"input_type":"log","message":"{"secor":{"intermediatePath":"PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2"},"s32es":{"input_s3":{"bucket":"rem-secor-prod-eu.intermediate","key":"PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2"},"output_s3":{"bucket":"rem-secor-prod-eu.completed","key":"some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2/drsd_1c98f3c1a4c28caf93c39eacc03486e2.bml"},"elasticsearch":{"msg, timestamp=1603980241157, headers=[], mPartitions=[PATH_START, org, type, 1.3, me8, some_Inc, some_org, 2020-10-29, NaN-NaN-NaN, 2.1.0, 1601182555, 023130120130132, 1c98f3c1a4c28caf93c39eacc03486e2]} at com.pinterest.secor.consumer.Consumer.consumeNextMessage( at Caused by: Mkdirs failed to create /tmp/secor_data/64_21/some-topic/PATH_START/some_path/1601182555/023130120130132/1c98f3c1a4c28caf93c39eacc03486e2 (exists=false, cwd=file:/opt/secor) at org.apache.hadoop.fs.ChecksumFileSystem.create( at org.apache.hadoop.fs.ChecksumFileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at$DelimitedTextFileWriter.( at at com.pinterest.secor.util.ReflectionUtil.createFileWriter( at com.pinterest.secor.common.FileRegistry.getOrCreateWriter( at com.pinterest.secor.writer.MessageWriter.write( at com.pinterest.secor.consumer.Consumer.consumeNextMessage( ... 1 more 2020-10-29 14:04:07,417 [Thread-6] (com.pinterest.secor.consumer.Consumer:183) ERROR Thread failed java.lang.RuntimeException: Failed to write message ParsedMessage{topic='some-topic', kafkaPartition=21, offset=11256596, kafkaKey=, payload={"@timestamp":"2020-10-29T14:04:02.250Z","beat":{"hostname":"ip-10-0-28-98","name":"ip-10-0-28-98","version":"5.0.1"},"input_type":"log","message":"{some message"} at com.pinterest.secor.consumer.Consumer.consumeNextMessage( at Caused by: /tmp/secor_data/64_17/some_path/1601182555/032000001233320/f1bd4a1d9491ae772eafed2373bf1a41/.1_21_00000000000011256548.crc (No space left on device) at Method) at at at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.( at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.( at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode( at org.apache.hadoop.fs.RawLocalFileSystem.create( at org.apache.hadoop.fs.RawLocalFileSystem.create( at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.( at org.apache.hadoop.fs.ChecksumFileSystem.create( at org.apache.hadoop.fs.ChecksumFileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at$DelimitedTextFileWriter.( at at com.pinterest.secor.util.ReflectionUtil.createFileWriter( at com.pinterest.secor.common.FileRegistry.getOrCreateWriter( at com.pinterest.secor.writer.MessageWriter.write( at com.pinterest.secor.consumer.Consumer.consumeNextMessage( ... 1 more
We delete the directories via external command: find /tmp/secor_data/ -type d -empty -delete this causes CPU as well as space growth to stabilize.
Is there any secor parameters to affect that behavior? one system is version 0.26, another 0.29, both display the same symptoms.
here are our startup parameters:
Launching the secor java command line: java -ea -Xmx1194m -Daws.access.key=AKIAIP7ETWZ35I3A2KOQ -Daws.secret.key=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 -Daws.region=us-east-1 -Dzookeeper.quorum=0.zk.service.consul:2181,1.zk.service.consul:2181,2.zk.service.consul:2181 -Dkafka.zookeeper.path=/ -Dsecor.s3.bucket=mobileye.secor.intermediate -Dsecor.s3.filesystem=s3n -Dsecor.s3.path=me8 -Dsecor.kafka.topic_filter=.* -Dsecor.message.parser.class=com.pinterest.secor.parser.RsdParser - -Dsecor.local.path=/tmp/secor_data -Dsecor.compression.codec= -Dsecor.max.file.age.seconds=30 -Dsecor.max.file.size.bytes=50000 -Dostrich.port=19999 -Dsecor.kafka.topic_blacklist=secor.restore -Dsecor.upload.manager.class=com.pinterest.secor.uploader.S3UploadManager -Dlog4j.configuration=file:///opt/secor/ -cp secor.jar:lib/* com.pinterest.secor.main.ConsumerMain
2020-11-01 21:04:33,522 [Thread-5] (org.apache.kafka.common.config.AbstractConfig:347) INFO ConsumerConfig values: = true = 5000 auto.offset.reset = earliest bootstrap.servers = [kafka.service.consul:9092] check.crcs = true client.dns.lookup = default = ip-10-0-10-97_44_14 client.rack = = 540000 = 60000 = false exclude.internal.topics = true fetch.max.bytes = 52428800 = 500 fetch.min.bytes = 1 = secor_backup = null = 3000 interceptor.classes = [] = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 = 300000 max.poll.records = 500 = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 = 1000 = 50 = 30000 = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub, or unsubscribe .
Hi Henry - thank you for the answer.
#1443 doesn't look relevant, but #1449 seems to do exactly what is needed: "Deleting whole path, instead of files inside"
The problem is, it merged to master on July 12 2020 and released as 0.29 release on July 18. We use 0.29 and it still doesn't work (e.g. deletes only files but not folders).
Am I'm missing something? Thank you for your help! David
Do you see this line in your log file?"Shut down hook with priority {} added to shut down hook registry", priority);
On Wed, Nov 4, 2020 at 12:27 AM David [email protected] wrote:
Hi Henry - thank you for the answer.
#1443 doesn't look relevant, but #1449 seems to do exactly what is needed: "Deleting whole path, instead of files inside"
The problem is, it merged to master on July 12 2020 and released as 0.29 release on July 18. We use 0.29 and it still doesn't work (e.g. deletes only files but not folders).
Am I'm missing something? Thank you for your help! David
— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe .
I guess it shouldn't be related to shutdown hooks, as they are called only when secor is shutting down. Deletion after upload:
is called for each LogFilePath.
public void deletePath(LogFilePath path) throws IOException {
TopicPartitionGroup topicPartition = new TopicPartitionGroup(path.getTopic(),
HashSet<LogFilePath> paths = mFiles.get(topicPartition);
if (paths.isEmpty()) {
StatsUtil.clearLabel("secor.size." + topicPartition.getTopic() + "." +
StatsUtil.clearLabel("secor.modification_age_sec." + topicPartition.getTopic() + "." +
It looks like it is deleting only files, and keeps directories. I guess it can't delete directories, as they are named by partitions, and multiple consumer threads can write to same partition? Maybe local files shouldn't be partitioned, so after deletion there are no remaining folders?
Re: Log prints by Henry:
No "Shut down hook with priority" in any of the secor 0.29 logs files.
Re: pdambrauskas This makes complete sense - this is what we are observing. as a workaround we run os command
find /tmp/secor_data/ -type d -empty -delete
to delete only empty dirs. can this create problems? If not, can the equivalent be run from deletePath function to cleanup?
Thanks you for your input guys 👍
I think it is not 100% safe, since we can run into situation where one thread is writing to the same partition, while other thread is deleting the folder at the same time. It is highly unlikely but i guess it is possible, since check if folder is empty and delete action is not atomic operation.
My suggestion would be naming local files in some different way, to avoid creating multiple folders, something like:
, and then, when uploading replacing ___
with /
, but lets see what @HenryCaiHaiying has to comment on this. Maybe I'm missing something.
It is strange that you are not receiving Shut down hook with priority
log message, it should be triggered for each consumer thread. Is your logger configured to log INFO
messages? Are you sure you are running 0.29 version?
I'm positive we run 0.29 I was running in WARN. I switched to INFO and here is the hook message at the start of the log:
2020-11-05 08:20:28,895 [main] (com.pinterest.secor.common.ShutdownHookRegistry:50) INFO Shut down hook with priority 10 added to shut down hook registry
I think we can remove the folder if it's more than X days old and you can configure that X depending on whether your late arrivals will be X days old. It can be done either inside the java code or as an external cron script.
You must be creating lots of small folders, we usually don't see that problem.
On Thu, Nov 5, 2020 at 12:27 AM David [email protected] wrote:
I'm positive we run 0.29 I was running in WARN. I switched to INFO and is the hook message at the start of the log:
2020-11-05 08:20:28,895 [main] (com.pinterest.secor.common.ShutdownHookRegistry:50) INFO Shut down hook with priority 10 added to shut down hook registry
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or unsubscribe .
We create about 2 thousand folders per minute on each node right now. This problem usually crashes machine in 2-3 days. The time between message posted by publisher into kafka and secor consuming it usually doesn't exceed 2-3 minutes, however might occasionally be a few hours in case of outages. Will the 5-minute delete interval create issue in such a case? My understanding is that secor take message from kafka, writes it to disk creating the folder, and once the message is uploaded, the message is deleted from disk leaving the folder empty. Am I wrong here? how long does the message can sit on the filesystem? Why would we want to keep the empty folder? Is it ever being reused? In our naming model each s3 path (and tmp path accordingly) is unique to a message.
Since the flow is very significant, we need not days but minutes to expire the empty folders (we use 5 minutes now). We use cron right now - but the idea is to hide that complexity, so that we won't need to manage it. We just set the folder deleting timeout (you can set it in whatever units you like, just need to be able to set smaller time period equal to 5-10 minutes) and forget it. Of course this is only a wish - you guys decide how you implement it. We just say how it will work for high-volume systems like ours. Thank you guys for your time - we appreciate you taking an effort and created great open source system 👍 have a great weekend (the rest of it), David
I would like to add some info on this problem.
I've started to get
java.lang.RuntimeException: Failed to write message max bad :11.0 at com.pinterest.secor.consumer.Consumer.handleWriteError( at com.pinterest.secor.consumer.Consumer.consumeNextMessage( at Caused by: Mkdirs failed to create file:/mnt/secor_data/message_logs/partition/1_29/prod-og-monitoring_agg_impressions_fr/dt=2021-04-04/hr=04 (exists=false, cwd=file:/opt/secor) at org.apache.hadoop.fs.ChecksumFileSystem.create( at org.apache.hadoop.fs.ChecksumFileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.hadoop.fs.FileSystem.create( at org.apache.parquet.hadoop.util.HadoopOutputFile.create( at org.apache.parquet.hadoop.ParquetFileWriter.<init>( at org.apache.parquet.hadoop.ParquetWriter.<init>( at org.apache.parquet.hadoop.ParquetWriter$ at$AvroParquetFileWriter.<init>( at at com.pinterest.secor.util.ReflectionUtil.createFileWriter( at com.pinterest.secor.common.FileRegistry.getOrCreateWriter( at com.pinterest.secor.writer.MessageWriter.write( at com.pinterest.secor.consumer.Consumer.writeMessage( at com.pinterest.secor.consumer.Consumer.consumeNextMessage(
It's not related to permission but to the fact that secor doesn't remove some old file(local file no on S3) I mean it remove some but not all so the disk are full after some time. For example on the file system I found some file from January