DataflowTemplates icon indicating copy to clipboard operation
DataflowTemplates copied to clipboard

PubSub to ElasticSearch Error

Open GustafKisi opened this issue 3 years ago • 24 comments

I'm trying to ingest data to ES with the use of GCP Dataflow and the template PubSub to Elasticsearch. So far I've tried a couple of different deployments which are all on GCP using the trial option over at elastic.co.

The data I'm trying to ingest are metrics from our devices in a simple JSON format. The dataflow is configured by using the Cloud ID for my deployment and a custom UDF to format the data from PubSub. The type of data I leave default, so I'm not using the audit option as some guides sugest.

It all seems to go well, some metrics are ingested but then there is an error. view below:

Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)

This Dataflow is using default configurations so I'd really expect this to work out of the box.

GustafKisi avatar Jan 12 '22 16:01 GustafKisi

I did some investigation on this. I believe the reason is because Elasticsearch has deprecated mapping types in ES 7.0+, so this causes a warning like this emitted for every bulk flush: request [POST https://xxx.europe-west4.gcp.elastic-cloud.com:443/logs-gcp.pubsub-default/_doc/_bulk] returned 1 warnings: [299 Elasticsearch-7.16.3-4e6e4eab2297e949ec994e688dad46290d018022 "[types removal] Specifying types in bulk requests is deprecated."].

I suppose the correct way is to either a) make it configurable, b) add ES version detection, c) just support ES 7+, or last but not least, ignore and supress the warning in checkForErrors() (the code in question is here).

rosmo avatar Feb 01 '22 15:02 rosmo

@Cherepushko I'm trying to build a test version of com.google.cloud.teleport.v2.elasticsearch.templates.PubSubToElasticsearch, but I'm not having too much luck with the mvn compile exec:java stuff on the front page. Would you be able to point to a working incantation to build a Dataflow Flex template from modified source?

rosmo avatar Feb 01 '22 16:02 rosmo

@rosmo Thank you for looking into this!

GustafKisi avatar Feb 02 '22 08:02 GustafKisi

@GustafKisi Could you give the main branch here a try: https://github.com/rosmo/DataflowTemplates ? I've attempted to fix a few issues that cropped up during my testing. You should get it built and running via Cloud Shell for example and these instructions:

cd DataflowTemplates/v2/
export PROJECT=<MY-PROJECT>
export REGION=<YOUR-GCP-REGION>
export IMAGE_NAME=pubsub-to-elasticsearch
export BUCKET_NAME=gs://<MY-DATAFLOW-BUCKET>
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export TEMPLATE_MODULE=googlecloud-to-elasticsearch
export APP_ROOT=/template/pubsub-to-elasticsearch
export COMMAND_SPEC=${APP_ROOT}/resources/pubsub-to-elasticsearch-command-spec.json
export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json

mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
                  -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
                  -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
                  -Dapp-root=${APP_ROOT} \
                  -Dcommand-spec=${COMMAND_SPEC} \
                  -am -pl ${TEMPLATE_MODULE}

# Follow the image spec creation process from here:
# https://github.com/rosmo/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#creating-image-spec

export SUBSCRIPTION="projects/$PROJECT/subscriptions/<MY-SUBSCRIPTION>"
export CONNECTION_URL="<MY-ELASTICSEARCH-CLOUD-ID>"
export ERROR_OUTPUT_TOPIC="projects/$PROJECT/topics/<MY-ERROR-TOPIC>"
export API_KEY=<MY-ELASTICSEARCH-API-KEY>
export NETWORK=<MY-NETWORK>
export SUBNETWORK="regions/$REGION/subnetworks/<MY-SUBNETWORK>"
export TEMP_LOCATION="gs://$BUCKET_NAME/temp"
export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
gcloud beta dataflow flex-template run ${JOB_NAME} \
        --project=${PROJECT} --region=$REGION \
        --template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
        --network=${NETWORK} --subnetwork=${SUBNETWORK} --temp-location=${TEMP_LOCATION} \
        --parameters inputSubscription=${SUBSCRIPTION},connectionUrl=${CONNECTION_URL},apiKey=${API_KEY},errorOutputTopic=${ERROR_OUTPUT_TOPIC},batchSize=10

rosmo avatar Feb 03 '22 14:02 rosmo

@rosmo I can try, would I run this on my local terminal? Anything special I need installed except for gcloud?

GustafKisi avatar Feb 03 '22 14:02 GustafKisi

@GustafKisi I suggest Cloud Shell, as it seems to have most stuff installed. Otherwise you'll have to have the Java JDK and Maven installed.

rosmo avatar Feb 03 '22 14:02 rosmo

@rosmo The mvn clean package is taking a while, is this normal?

GustafKisi avatar Feb 03 '22 14:02 GustafKisi

Yeah, it'll take 20-30 minutes or something.

rosmo avatar Feb 03 '22 15:02 rosmo

@rosmo This is actually somewhat working, there are errors, but I think I need to provide the UDF for this to work properly. I will continue to try a couple configs and report back.

Screenshot 2022-02-03 at 16 57 32

I am now getting errors:

Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id CDxMwH4BJq15BT4CFX9w: failed to parse field [jsonPayload.place_id] of type [long] in document with id 'CDxMwH4BJq15BT4CFX9w'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id RjxMwH4BJq15BT4CHn9V: failed to parse field [jsonPayload.place_id] of type [long] in document with id 'RjxMwH4BJq15BT4CHn9V'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id TjxMwH4BJq15BT4CIX85: failed to parse field [jsonPayload.place_id] of type [long] in document with id 'TjxMwH4BJq15BT4CIX85'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id 9BdMwH4BgcfeaGZUIVvk: failed to parse field [jsonPayload.place_id] of type [long] in document with id '9BdMwH4BgcfeaGZUIVvk'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)

This should be fine, the UDF removes that.

GustafKisi avatar Feb 03 '22 15:02 GustafKisi

@rosmo The input parameter javascriptTextTransformGcsPath will not work, could this be fixed by adding it to the command that creates the Image Spec?

GustafKisi avatar Feb 03 '22 16:02 GustafKisi

I made it work!

With this image spec:

{
  "image":"'${TARGET_GCR_IMAGE}'",
  "metadata":{
    "name":"Pub/Sub to Elasticsearch",
    "description":"Replicates data from Pub/Sub topic into an Elasticsearch index",
    "parameters":[
        {
            "name":"inputSubscription",
            "label":"The Cloud Pub/Sub subscription to consume from.",
            "helpText":"The Cloud Pub/Sub subscription to consume from. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>.",
            "paramType":"TEXT",
            "isOptional":false
        },
        {
            "name":"connectionUrl",
            "label":"Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud",
            "helpText":"Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud",
            "paramType":"TEXT",
            "isOptional":false
        },
        {
            "name":"apiKey",
            "label":"Base64 Encoded API Key for access without requiring basic authentication",
            "helpText":"Base64 Encoded API Key for access without requiring basic authentication",
            "paramType":"TEXT",
            "isOptional":false
        },
        {
            "name":"dataset",
            "label":"The type of logs sent via Pub/Sub for which we have out of the box dashboard. Known log types values are audit, vpcflow, and firewall. If no known log type is detected, we default to pubsub",
            "helpText":"The type of logs sent via Pub/Sub for which we have out of the box dashboard. Known log types values are audit, vpcflow, and firewall. If no known log type is detected, we default to pubsub",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"namespace",
            "label":"The namespace for dataset. Default is default",
            "helpText":"An arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. Default is default",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"errorOutputTopic",
            "label":"Error output topic in Pub/Sub for failed inserts",
            "helpText":"Error output topic in Pub/Sub for failed inserts",
            "paramType":"PUBSUB_TOPIC",
            "isOptional":false
        },
        {
            "name":"elasticsearchUsername",
            "label":"Username for Elasticsearch endpoint. Overrides ApiKey option if specified.",
            "helpText":"Username for Elasticsearch endpoint. Overrides ApiKey option if specified.",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"elasticsearchPassword",
            "label":"Password for Elasticsearch endpoint. Overrides ApiKey option if specified.",
            "helpText":"Password for Elasticsearch endpoint. Overrides ApiKey option if specified.",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"batchSize",
            "label":"Batch size in number of documents",
            "helpText":"Batch size in number of documents. Default: 1000",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"batchSizeBytes",
            "label":"Batch size in number of bytes",
            "helpText":"Batch size in number of bytes. Default: 5242880 (5mb)",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"maxRetryAttempts",
            "label":"Max retry attempts",
            "helpText":"Max retry attempts, must be > 0. Default: no retries",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"maxRetryDuration",
            "label":"Max retry duration in milliseconds",
            "helpText":"Max retry duration in milliseconds, must be > 0. Default: no retries",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"javascriptTextTransformGcsPath",
            "label":"Path to cloud storage UDF",
            "helpText":"Path to cloud storage UDF, must start with 'gs://'",
            "paramType":"TEXT",
            "isOptional":true
        },
        {
            "name":"javascriptTextTransformFunctionName",
            "label":"Function name to be executed",
            "helpText":"Function name to be executed",
            "paramType":"TEXT",
            "isOptional":true
        }
    ]
  },
  "sdk_info":{"language":"JAVA"}
}

If this is the proper way to do it I don't know but it seems to work.

I got a proper stream of logs into ES! Screenshot 2022-02-03 at 18 26 10

I can't begin to explain my happiness, Thank you @rosmo

I'm so grateful, what's left to do?

GustafKisi avatar Feb 03 '22 17:02 GustafKisi

@GustafKisi Great! Did you add the javascriptTextTransformGcsPath to the input spec? (I think just passing it via command line would have worked too) I do want to improve the bulk insert a bit more (move to local ID generation and report errors better) still.

rosmo avatar Feb 04 '22 08:02 rosmo

@rosmo I did, yes!

Sounds good. Is it OK for us to use this until the main template available is updated?

GustafKisi avatar Feb 04 '22 08:02 GustafKisi

Yes, absolutely, but do note it's not officially Google version (but the changes are pretty transparent).

rosmo avatar Feb 04 '22 09:02 rosmo

@rosmo Hi again, do you have any tips for how to optimize this dataflow?

This is a screenshot from the job:

image

And here is the config: Screenshot 2022-02-07 at 13 57 02

This ended up quite the expensive job. I've looked into the command for creating the job, and saw the optional parameter

  • --enable-streaming-engine`

Would this help?

Or maybe this one:

  • --worker-machine-type

Which machine type would work? I don't think we need more than 30 Gb Hdd and 4 GB RAM

GustafKisi avatar Feb 07 '22 13:02 GustafKisi

Hey @GustafKisi, I'm not an expert on Dataflow cost optimization, but certainly the streaming engine can help. You can also look into limiting the amount of worker nodes (max workers), using a E2 instance type for the machine type, reducing disk size (diskSizeGb). Of course, limiting the amount of logs by log sink configuration or exclusion filters will help as well.

rosmo avatar Feb 07 '22 13:02 rosmo

Thanks for the suggestions!

It's a little difficult finding documentation for all commands there are.

Just one question:

diskSizeGb does not work: unrecognized arguments: --diskSizeGb=30

Is there another way to set this?

GustafKisi avatar Feb 07 '22 13:02 GustafKisi

@rosmo All has been running smooth over the past days.

The only issue seems to be some ingestions are timing out:

Error message from worker: org.elasticsearch.client.ResponseException: method [POST], host [https://fcb96653d51c47489551f9298f773817.us-east1.gcp.elastic-cloud.com:443], URI [/logs-gcp.pubsub-default/_bulk], status line [HTTP/1.1 504 Gateway Timeout]
{"ok":false,"message":"Post \"https://10.47.63.204:18016/logs-gcp.pubsub-default/_bulk\": net/http: TLS handshake timeout"}

        org.elasticsearch.client.RestClient.convertResponse(RestClient.java:302)
        org.elasticsearch.client.RestClient.performRequest(RestClient.java:272)
        org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1512)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1478)

GustafKisi avatar Feb 14 '22 08:02 GustafKisi

For the diskSizeGb, I think that arguments needs to go into the --parameters part. The ingestion errors seem like coming from ES side of things. There are retry settings (maxRetryAttempts and maxRetryDuration) which are set to no retries by default), so you may want to tune those a bit.

rosmo avatar Feb 14 '22 09:02 rosmo

For the diskSizeGb, I think that arguments needs to go into the --parameters part. The ingestion errors seem like coming from ES side of things. There are retry settings (maxRetryAttempts and maxRetryDuration) which are set to no retries by default), so you may want to tune those a bit.

Right! Thanks!

GustafKisi avatar Feb 14 '22 12:02 GustafKisi

@rosmo Hello! Is there a way to specify an Elasticsearch index?

GustafKisi avatar Mar 17 '22 09:03 GustafKisi

Hello @rosmo ! Nevermind the previous question.

Will there be any support for Elasticsearch 8.x?

GustafKisi avatar Mar 29 '22 14:03 GustafKisi

They are working on Elasticsearch 8.x support here: https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/369 Should be available soon within the UI.

felix-lessoer avatar May 31 '22 08:05 felix-lessoer

@GustafKisi For me its working now with Elasticsearch 8.x !

felix-lessoer avatar Jun 15 '22 08:06 felix-lessoer