elasticsearch icon indicating copy to clipboard operation
elasticsearch copied to clipboard

[Transform] We might need a patch transform

Open machadoum opened this issue 1 year ago • 4 comments

Description

TLDR

We need flexible field-level customizable logic when ingesting documents. Could we achieve that with a painless script that receives the old and new documents as parameters?

Context

The Security Entity Analytics team is working on creating entity-based views inside the Security Solution. We are using the entity-manager framework developed by the Obs team: https://github.com/elastic/kibana/pull/183205.

The framework creates two indices. A pivot transform makes the first and has several time series aggregations (every x minutes). The second index contains one document per entity generated by another pivot transform that searches the first index.



The goal of the security solution entity store is to have a single index with several entity properties that can be sorted and filtered. Some fields in the Security Solution entity store come from events that could have been emitted only once, so we must preserve their value in the store forever. Therefore, we must generate the document based on the entire entity's history.

The problem with this solution is that the time series index will grow over time and slow down the second transform. Discussions are happening about ILM policies and using a time-based filter for the second transform, but that wouldn't work for the security solution team.

The proposal

We believe that the current Transform implementation can't solve our problems, and we need to change it. In simple words, we need a pivot transform that should add fields to the existing document instead of overwriting the entity document every time it runs. Different fields could require a different patch logic. Some fields should always be replaced, others should preserve the oldest value permanently, and others could be accumulated as an array.

That way, the security solution entity store could be simplified like:

The POC

To exemplify one possible solution, we implemented a transform POC. It performs poorly and could be severely improved, but it does the job of exemplifying what we need. These are the changes we made:

  • Add a new script field to the pivot transform config
  • Queries the destination index by id to retrieve the current value of the document
  • Execute a painless script that receives as parameters the old and new document
  • Store the script result in the index

You can see the POC code here:

Please keep in mind that it was just a viability exercise. A production-ready implementation will require several improvements, like bathing the search by ID and caching the painless script compilation.

Could something like this be implemented? Would the performance be good enough? Is there a better way to solve the problem that we are missing?

Questions

Could something like this be implemented? Would the performance be good enough? Is there a better way to solve the problem that we are missing?

machadoum avatar Aug 26 '24 11:08 machadoum

Pinging @elastic/ml-core (Team:ML)

elasticsearchmachine avatar Aug 26 '24 11:08 elasticsearchmachine

Do you have any examples of input and expected output? What data in the 3 indices (logs, risk score, asset score) are you trying to extract, and what does the resulting structure look like in the Entity index? I believe I saw some in the onweek demo, are they safe to post here?

FYI, it appears Scripted Metric Aggregation is on a deprecation path in serverless - so at the very least we'd need to come up with an alternative to scripting (or accept that this feature won't work in serverless).

The problem with this solution is that the time series index will grow over time and slow down the second transform. Discussions are happening about ILM policies and using a time-based filter for the second transform, but that wouldn't work for the security solution team.

Do we have any observed growth rates written down anywhere? How fast are the source indices growing, how fast is the interim History index growing, and at what point does that start to slow down the second transform?

prwhelan avatar Aug 26 '24 13:08 prwhelan

@prwhelan regarding Scripted Metric Aggregations, my understanding is that the solution proposed does not actually use scripted metrics, it just happens to use the scripted metric compiler to execute the script: https://github.com/machadoum/elasticsearch/blob/on-week/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java#L235

jaredburgettelastic avatar Aug 26 '24 15:08 jaredburgettelastic

Do you have any examples of input and expected output? What data in the 3 indices (logs, risk score, asset score) are you trying to extract, and what does the resulting structure look like in the Entity index? I believe I saw some in the onweek demo, are they safe to post here?

Logs -> We expect to query all ECS-compatible events inside the security solution data view. Risk Score -> The output document of the Risk Engine. The engine produces documents every hour.

Asset Criticality -> A document that represents an asset criticality assignment made by a user. Documents are created when users interact with the UI.

All these indices contain host.name and user.name fields. And that is how we will aggregate it to create entities.

Here, you can find simplified examples of documents:

// Asset Criticality
{
  "host": {
    "name": "my-host"
  },
 "criticality_level": "low_impact",
 "@timestamp": "2024-08-27T12:51:55.537Z"
}

// Risk Score
{
  "host": {
     "name": "my-host",
     "risk": {
     "calculated_level": "Low",
     "calculated_score": 70,
  },
  ...
}

// logs*
{
  "host": {
     "name": "my-host",
     "os": {
       "kernel": "5.15.0-1062-gcp",
       "codename": "focal",
       "name": "fedora",
       "family": "debian",
       "type": "linux",
       "version": "20.04.6 LTS (Focal Fossa)",
       "platform": "fedora"
    },
    "ip": [ "10.142.3.13" ],
      ...
  },
...
}

The final entity document will aggregate all host fields into one single document.

Do we have any observed growth rates written down anywhere? How fast are the source indices growing, how fast is the interim History index growing, and at what point does that start to slow down the second transform?

The first transform will have a time filter of approximately 5 minutes, so the amount of data it queries should be constant. But the second transform query doesn't have this filter. Because of that, every time the first transform runs, it will create one document per entity, and those documents will dramatically increase the index size over time. Let's assume that a middle-size cluster has 10.000 entities, and we run the transform once a minute. After one year, the second transform has to query and perform aggregations on 10.000 * 60 * 24 * 365 = 5.25 billion documents every time it runs.

machadoum avatar Aug 27 '24 13:08 machadoum

@dgieselaar gave an idea that can solve for the problem described above in a different way, by adding an option to run scripted upserts within the pivot transform. Below is my proposal for how this could be done, and I would like feedback on the feasibility.

Today, Pivot Transforms use deterministic IDs for the documents they write, along with the Bulk API with index operations, in order to overwrite documents with each run of a transform.

Because the IDs are deterministic, we can introduce a new configurable option when creating Pivot transforms called upsert_script, which, when provided, could run Update operations in the Bulk API, as a scripted upsert (which is already supported by the API), instead of Index operations.

The Pivot transform could then perform an associated update operation when indexing documents, which could then look something like the following, as an example:

{
  "scripted_upsert": true,
  "script": {
    "source": """
        if (params.newDocument["user.asset.criticality"] != null) {
          ctx._source["user.asset.criticality"] = params.newDocument["user.asset.criticality"]
        }
    """,
    "params": {
      "newDocument": {
        "user.asset.criticality": "high_impact"
      }
    }
  },
  "upsert": {}
}

Some notes on the above:

  • The newDocument param that is passed in would be the document the transform would normally be indexing. In other words, the transform needs to pass in what the pivot transform generated for that particular run as a param.
  • Logically, what the script above means is "if an user.asset.criticality value is passed in as a param from the transform, store it on my document. If no value is provided, do nothing, which keeps the old value stored." The latter effectively means leaving the existing value for user.asset.criticality on the destination document, which is a primary use case of this approach.
  • We would need scripted_upsert to be set to true, because we of course want the initial run of a transform to generate a document. This means that, if you provide an upsert_script to a pivot transform, the fields of the initial storage of a document must be properly managed by the script as well, not just update scenarios.
  • As shown in the Elastic docs, the upsert field is not necessary (set to an empty object), because scripted_upsert is set to true.

Some potential concerns:

  • Currently, I believe that the largest concern with such an approach is that the Transform may no longer be able to do its "change detection" appropriately. I don't believe this affects checkpoints, but because a script is updating the destination index document, I believe this would affect any field-level change detection performed in subsequent queries after the checkpoint query. To be clear, I don't think this will have a functional problem, but will likely have some effect on performance. I do not know what such an impact would be.
  • In general, update operations are more costly than index operations. However, it should be noted that this proposal simply gives the option to perform update operations. Most transforms don't need the flexibility proposed here, but for times when this functionality is required, it will be more costly than index operations.

@dgieselaar and @prwhelan , please provide your thoughts. This does presuppose a solution to a performance problem, but I want to be clear that it's not just performance problems that such a solution would solve for. This upsert-script-based approach gives functional flexibility that we currently just don't have with Transforms today, and would be very valuable for Solutions even outside of this particular problem/use case. This is a concrete feature request that would solve more than one challenge that we currently face in using transforms.

cc @tommyers-elastic , this is the current GitHub issue tracking this, unless we create a specific issue to implement the scripted upsert functionality.

jaredburgettelastic avatar Sep 04 '24 20:09 jaredburgettelastic

Currently, I believe that the largest concern with such an approach is that the Transform may no longer be able to do its "change detection" appropriately. I don't believe this affects checkpoints, but because a script is updating the destination index document, I believe this would affect any field-level change detection performed in subsequent queries after the checkpoint query.

I'm not sure what this means, can you elaborate?

Other than that, this makes sense to me. I'm unsure about the performance implications of a scripted upsert. I can imagine doing this for > 10k events would become a performance concern but will defer to the Elasticsearch folks.

dgieselaar avatar Sep 05 '24 07:09 dgieselaar

@dgieselaar I don't know too much about the internals of transforms to speak to it very well, that part was simply a thought after reading this part of the Transform docs about Transform checkpoints:

Identifies which entities and/or time buckets have changed.

The transform searches to see which entities or time buckets have changed between the last and the new checkpoint. The transform uses the values to synchronize the source and destination indices with fewer operations than a full re-run.

jaredburgettelastic avatar Sep 10 '24 16:09 jaredburgettelastic

Just thinking out loud - it may be possible to make use of this (and assess performance) by adding the scripted_upsert in an ingest processor which would be applied to the dest index, and operate on new fields. @jaredburgettelastic have you explored this?

sophiec20 avatar Oct 10 '24 16:10 sophiec20

👋 Hey there @sophiec20 !

I'm not sure I fully understand what is available today, so forgive any ignorance on my part.

A primary challenge raised above is that transforms always perform index operations (not update), and therefore I don't believe an ingest pipeline can make use of any previous values on a given document, as the ingest pipeline is only working with the newly generated data coming from the transform.

Are you thinking there is some other way that an ingest pipeline can use a scripted_upsert, instead of the proposal above where the transform would perform an update operation?

jaredburgettelastic avatar Oct 14 '24 18:10 jaredburgettelastic