druid
druid copied to clipboard
emit processed bytes metric
Description
Currently there is no way to know how much data is processed by task during ingestion. This PR adds ingest/events/processedBytes
metric to emit number of bytes read since last emission time.
-
This PR adds
InputStats
class which is present in all task types and acts as holder for task level counts like processed bytes in this case. Thus standardized metrics throughout the task types can be added in future and emitted usingInputStatsMonitor
which is automatically initialized for all tasks -
This PR provides convenient wrapper class named
CountableInputEntity
which can warp anyInputEntity
to count number of bytes processed through thatInputEntity
, thus its easier for new implementations to emit this metric just by wrapping the base input entity in this while creatingInputEntityIteratingReader
-
Since Kafka and Kinesis does not use
InputEntity
, therefore processed bytes is increment directly inSeekableStreamIndexTaskRunner
as it has access toInputStats
-
This does not support Firehoses
This PR has:
- [x] been self-reviewed.
- [x] added documentation for new or modified features or behaviors.
- [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [x] been tested in a test Druid cluster.
Key changed/added classes in this PR
-
InputStats
-
InputStatsMonitor
-
CountableInputEntity
-
AbstractBatchIndexTask
-
SeekableStreamIndexTask
Hi @pjain1, have you checked https://github.com/apache/druid/issues/10352? This proposal describes some idea on ingestion metrics.
@jihoonson The proposal looks great, I did this PR because bytes ingested is not available for streaming or batch tasks. I see your proposal only includes metrics for batch tasks, probably we can do another PR for emitting bytes ingested for kafka/kinesis directly through FireDepartment
.
I believe the bytes ingested reported through your proposed changes will be all types of InputEntity
and will include the bytes read for determining partitions as well indexing ?
@jihoonson The proposal looks great, I did this PR because bytes ingested is not available for streaming or batch tasks. I see your proposal only includes metrics for batch tasks, probably we can do another PR for emitting bytes ingested for kafka/kinesis directly through
FireDepartment
.
Yeah, it sounds reasonable to me to add bytes ingested for both streaming and batch. My proposal only talks about metrics for batch, but I have been also thinking about the metrics system for ingestion in general. Currently, both batch and streaming ingestion uses FireDepartmentMetrics
which was designed for RealtimeIndexTask
which is now deprecated. I'm not sure if it's good to continue to use it. Rather, I think batch and streaming tasks should use different classes for their own metrics since you will want to see different metrics per your ingestion type (ex, there is no handoff in batch ingestion while handoff time could be important in streaming ingestion). To do so, I have been thinking to add new classes each of which defines all metrics useful for batch and streaming ingestion, respectively. I'm not sure why we should keep metrics separately in different classes such as the bytes ingested in InputStats
added in this PR. Does this make sense?
I believe the bytes ingested reported through your proposed changes will be all types of
InputEntity
and will include the bytes read for determining partitions as well indexing ?
Yes, correct. More precisely, most of metrics including both the bytes read and the bytes written will be available for individual phase (determining partitions, indexing, etc) as well as for the overall metrics across all phases.
To do so, I have been thinking to add new classes each of which defines all metrics useful for batch and streaming ingestion, respectively. I'm not sure why we should keep metrics separately in different classes such as the bytes ingested in InputStats added in this PR. Does this make sense?
I added this class with the vision that more metrics can be added in future regarding ingestion as this class is available at task and InputSource/InputEntity level as well. Also I don't see FireDepartmentMetrics
being used in batch tasks apart from SinglePhaseSubTask
, so I though a new framework of InputStatsMonitor
which is enabled by default with InputStats
can be used which can be expanded to include ingestion stats similar to one being reported by kafka indexing task.
@jihoonson are you actively working on your proposal ? do you think you can reuse the InputStats
strategy from here ?
@jihoonson are you actively working on your proposal ? do you think you can reuse the
InputStats
strategy from here ?
@pjain1 sorry, I forgot about this PR. I could reuse it, but I'm still not sure why they are in a separate class vs having all those metrics in one place. Are you thinking a case where you want to selectively disable the new metric? If so, when would you want to do it? Even in that case, I would rather think about another way to selectively enable/disable metrics instead of having each metric in different classes.
In the current implementation (before this PR), what doesn't make sense to me is sharing the same metrics between batch and realtime tasks because what we want to see for them will be pretty different even though some of them can be shared. So, IMO, probably it will probably be best to add new classes each of which have all metrics for batch and realtime tasks, respectively.
As long as we get the metrics about how many raw bytes are processed from the source (including scans for determining shard specs) I think I am ok with any approach you follow. It doesn't necessarily be the code from this PR, I am already using this code internally so thought if its reused there would be less conflicts but totally up to you. Thanks
Apologize, I accidentally clicked the button which published my previous comment incomplete. Updated it now.
Makes sense, so as long as https://github.com/apache/druid/pull/10407#issuecomment-713016099 is satisfied things seems good to me.
@pjain1 thanks. Yes, per-phase metrics and total metrics will be available for raw input bytes. Other than the issue we have talked, this PR makes sense to me. I don't think my proposal necessarily blocks this PR or vice versa. I just wanted to make sure what design is best for us. I can review this PR probably this week.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.
@pjain1 I have started taking a look at this PR. Deepest apologies that it has not been reviewed yet. Any chances, you can help resolve the conflicts ?
This issue is no longer marked as stale.
hey @somu-imply, I can look into resolving conflicts over the weekend. However, @jihoonson was working on something similar iirc so not sure whats the status of it and if this is needed anymore.
@pjain1 thanks, I have resolved some conflicts and have made this PR up-to-date. #12750 has the changes. I'll appreciate if you can help add to it and review it
Closing this as #13520 is already merged. Thanks a lot for the work on this @somu-imply and @pjain1 !