data-prepper icon indicating copy to clipboard operation
data-prepper copied to clipboard

[BUG] Unhelpful error message initializing OpenSearch Ingestion, OpenSearch sink

Open Jon-AtAWS opened this issue 1 year ago • 16 comments

See also: https://github.com/opensearch-project/opensearch-java/issues/473

Looks like we maybe fixed this in the java client, but not in the Python client? Or maybe this is a different code path?

I haven't been able to diagnose exactly what's going on and where the failure is. Here's what's in CloudWatch Logs for the OpenSearch sink initialization

2024-02-27T19:37:23.594 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
	at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.model.sink.AbstractSink.initialize(AbstractSink.java:52) ~[data-prepper-api-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.isReady(Pipeline.java:200) ~[data-prepper-core-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:252) ~[data-prepper-core-2.6.1.jar:?]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

FWIW, my pipeline role has:

		{
			"Action": [
				"es:DescribeDomain",
				"es:*"
			],
			"Resource": "arn:aws:es:us-west-2:OBSCURED:domain/OBSCURED",
			"Effect": "Allow"
		},

Jon-AtAWS avatar Feb 27 '24 19:02 Jon-AtAWS

Actually, this may not have anything to do with the Python client, it's either the Java client or Data Prepper. Can you re-route?

Jon-AtAWS avatar Feb 27 '24 19:02 Jon-AtAWS

Hello @Jon-AtAWS , Can you please once update the Pipeline Role with the following permission, and see if you continue to face the issue ?

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "es:DescribeDomain",
            "Resource": "arn:aws:es:us-west-2:{your-account-id}:domain/{domain-name}"
        },
        {
            "Effect": "Allow",
            "Action": "es:ESHttp*",
            "Resource": "arn:aws:es:us-west-2:{your-account-id}:domain/{domain-name}/*"
        }
    ]
}

Utkarsh-Aga avatar Feb 29 '24 06:02 Utkarsh-Aga

Thanks for the fast response!

My pipeline role has (pardon the CDK code)

        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}"],
            "actions": [
                "es:ESHttp*",
                "es:DescribeDomain"
            ]
        }))

And I have verified that the statement is correct in the generated policy. It's not deployed right now, so I can't screenshot the IAM console.

The pipeline works if I map the role to FGAC's all_access role, or add "" and "" for index and cluster level permissions to the OpenSearch role I create. There's some permission that I need in OpenSearch FGAC that's causing this error.

Looking at the code for checkIfIndexExists (the failing call), it looks like it's calling HEAD /index_name. I have tried cluster_composite_ops, and a variety of other cluster level permissions for FGAC. At the index level, I have indices_all, and I've also tried crud, read, and write.

My next attempt will be to add indices_all to cluster level permissions. That doesn't make any sense, really, but maybe it will work?

My goal is to find the minimum permissions needed for the FGAC role to work with DataPrepper/OpenSearch Ingestion. If you already know that, then please let me know!

Jon-AtAWS avatar Feb 29 '24 19:02 Jon-AtAWS

Well, shoot. As I was re-reading that, I realized that my resource is missing a wildcard for the es:ESHttp action. So, I changed to

        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}"],
            "actions": [
                "es:DescribeDomain"
            ]
        }))
        pipeline_policy_doc.add_statements(iam.PolicyStatement(**{
            "effect": iam.Effect.ALLOW,
            "resources": [f"{domain.domain_arn}/*"],
            "actions": [
                "es:ESHttp*",
            ]
        }))

And it worked. I still don't understand why it also worked when I changed to all_access for FGAC, I will try that again. And, I still think that we need a better error message, including the entity that was presenting credentials, and the API that was called. Even better if the error message specifies the permissions that I need (whether IAM or FGAC)

Jon-AtAWS avatar Feb 29 '24 21:02 Jon-AtAWS

Nope, something else made it work.

Here's the error

2024-03-01T17:38:09.531 [Thread-11] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
	at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.model.sink.SinkThread.run(SinkThread.java:25) ~[data-prepper-api-2.6.1.jar:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

Here is the role's trust relationship

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "osis-pipelines.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Here is the role's permissions

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Action": "es:DescribeDomain",
			"Resource": "arn:aws:es:us-west-2:XXXXXXXXXXX:domain/X",
			"Effect": "Allow"
		},
		{
			"Action": "es:ESHttp*",
			"Resource": "arn:aws:es:us-west-2:XXXXXXXXXXX:domain/X/*",
			"Effect": "Allow"
		},
		{
			"Action": "dynamodb:DescribeExport",
			"Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X/export/*",
			"Effect": "Allow"
		},
		{
			"Action": [
				"dynamodb:DescribeStream",
				"dynamodb:GetRecords",
				"dynamodb:GetShardIterator"
			],
			"Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X/stream/*",
			"Effect": "Allow"
		},
		{
			"Action": [
				"dynamodb:DescribeContinuousBackups",
				"dynamodb:DescribeTable",
				"dynamodb:ExportTableToPointInTime"
			],
			"Resource": "arn:aws:dynamodb:us-west-2:XXXXXXXXXXXX:table/X",
			"Effect": "Allow"
		},
		{
			"Action": [
				"s3:AbortMultipartUpload",
				"s3:GetObject",
				"s3:PutObject",
				"s3:PutObjectAcl"
			],
			"Resource": "arn:aws:s3:::XXXXXXXXX/X/export/*",
			"Effect": "Allow"
		}
	]
}

Here is the FGAC permissions

{
  "pipeline_write_role": {
    "reserved": false,
    "hidden": false,
    "cluster_permissions": [
      "cluster_all",
      "indices_all"
    ],
    "index_permissions": [
      {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "crud",
          "create_index"
        ]
      }
    ],
    "tenant_permissions": [],
    "static": false
  }
}

Jon-AtAWS avatar Mar 01 '24 17:03 Jon-AtAWS

This:

PUT _plugins/_security/api/roles/pipeline_write_role
{
  "cluster_permissions": ["cluster_monitor", "indices_all"],
  "index_permissions": [
  {
    "index_patterns": [ "*" ],
    "dls": "",
    "fls": [],
    "masked_fields": [],
    "allowed_actions": [
      "indices_all"
    ]
  }]
}

Allowed the sink to initialize

Summarizing

  1. It's NOT the permissions on the IAM role
  2. crud, and create_index are not enough for FGAC

What is the additional permission I need, and can we fix the error message?

Jon-AtAWS avatar Mar 01 '24 18:03 Jon-AtAWS

DataPrepper can only show the message provided by OpenSearch.

kkondaka avatar Mar 05 '24 20:03 kkondaka

It would be good for DP to add context like: "called client's IndexExists method, attempting to validate the non-existence of index 'foo', in order to set the template." Or something like that.

Jon-AtAWS avatar Mar 05 '24 22:03 Jon-AtAWS

@Jon-AtAWS , Yes, I agree with this.

@kkondaka , I think we could accomplish this by adding a try-catch when making the call. Maybe we can even have a special exception for failed requests?

try {
  final BooleanResponse booleanResponse = openSearchClient.indices().exists(
                new ExistsRequest.Builder().index(indexAlias).build());
} catch(Exception ex) {
  throw new OpenSearchRequestException("checking that index exists.", ex);
}

And

class OpenSearchRequestException extends RuntimeExtension {

  @Override
  public String getMessage() {
    return "Failed while" + message + " with error code: " + innerException.getMessage();
  }
}

dlvenable avatar Mar 06 '24 16:03 dlvenable

throw new OpenSearchRequestException("checking that index exists.", ex);

Thanks @dlvenable !

Nitpicking a bit - let's pack as much information in these messages as possible. For instance, in this message, we can add the index name we're checking. If we know the authenticated entity, and its roles, we should add that as well. More information is better!

Jon-AtAWS avatar Mar 06 '24 16:03 Jon-AtAWS

throw new OpenSearchRequestException("checking that index exists.", ex);

Thanks @dlvenable !

Nitpicking a bit - let's pack as much information in these messages as possible. For instance, in this message, we can add the index name we're checking. If we know the authenticated entity, and its roles, we should add that as well. More information is better!

@Jon-AtAWS , This is great feedback. We could quite easily include the index name. The role name is a bit more challenging, but it may be provided back in the OpenSearch response.

dlvenable avatar Mar 06 '24 17:03 dlvenable

@Jon-AtAWS Based on stacktrace, the failure happened while checking if the index exists which requires indices:admin/exists FGAC permission. I don't see this permission being explicit part of any action group.

krishna-ggk avatar Mar 07 '24 17:03 krishna-ggk

I now have this policy

{
  "pipeline_write_role": {
    "reserved": false,
    "hidden": false,
    "cluster_permissions": [
      "indices:admin/exists",
      "cluster_monitor",
      "cluster_composite_ops"
    ],
    "index_permissions": [
      {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "indices:admin/exists",
          "crud"
        ]
      }
    ],
    "tenant_permissions": [],
    "static": false
  }
}

And receiving this error:

2024-03-07T23:16:55.564 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] authentication/authorization failure
	at org.opensearch.client.transport.aws.AwsSdk2Transport.parseResponse(AwsSdk2Transport.java:473) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.executeSync(AwsSdk2Transport.java:392) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.transport.aws.AwsSdk2Transport.performRequest(AwsSdk2Transport.java:192) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.client.opensearch.indices.OpenSearchIndicesClient.exists(OpenSearchIndicesClient.java:507) ~[opensearch-java-2.8.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.NoIsmPolicyManagement.checkIfIndexExistsOnServer(NoIsmPolicyManagement.java:50) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.checkAndCreateIndex(AbstractIndexManager.java:268) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager.setupIndex(AbstractIndexManager.java:225) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitializeInternal(OpenSearchSink.java:231) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doInitialize(OpenSearchSink.java:193) ~[opensearch-2.6.1.jar:?]
	at org.opensearch.dataprepper.model.sink.AbstractSink.initialize(AbstractSink.java:52) ~[data-prepper-api-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.isReady(Pipeline.java:200) ~[data-prepper-core-2.6.1.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:252) ~[data-prepper-core-2.6.1.jar:?]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]

Jon-AtAWS avatar Mar 07 '24 23:03 Jon-AtAWS

This policy is successful (changing to "indices_all" at index level)

PUT _plugins/_security/api/roles/pipeline_write_role
{
    "cluster_permissions": [
        "indices:admin/exists",
        "cluster_monitor",
        "cluster_composite_ops"
    ],
    "index_permissions": [
    {
        "index_patterns": [
          "*"
        ],
        "dls": "",
        "fls": [],
        "masked_fields": [],
        "allowed_actions": [
          "indices_all",
          "crud"
        ]
    }
    ],
    "tenant_permissions": []
}

The docs (https://opensearch.org/docs/latest/security/access-control/default-action-groups/) are not very helpful here. indices_all action group is documented as "Grants all permissions on the index. Equates to indices:*". So, I can't really tell which permission I added that enabled the sink to initialize. And, the error doesn't tell me which permission I need.

Jon-AtAWS avatar Mar 07 '24 23:03 Jon-AtAWS

If you have indices_all, ideally you should not require crud. Are you trying to breakdown indices_all further to scope it down for ingestion?

krishna-ggk avatar Mar 12 '24 08:03 krishna-ggk

Yes, I am trying to figure out minimum permissions. I understand that I don't need crud, I just threw indices_all in there (as an edit to indices:admin/exists) to verify that the exists perm is not sufficient.

Jon-AtAWS avatar Mar 12 '24 16:03 Jon-AtAWS