DataflowTemplates
DataflowTemplates copied to clipboard
PubSub to ElasticSearch Error
I'm trying to ingest data to ES with the use of GCP Dataflow and the template PubSub to Elasticsearch. So far I've tried a couple of different deployments which are all on GCP using the trial option over at elastic.co.
The data I'm trying to ingest are metrics from our devices in a simple JSON format. The dataflow is configured by using the Cloud ID for my deployment and a custom UDF to format the data from PubSub. The type of data I leave default, so I'm not using the audit option as some guides sugest.
It all seems to go well, some metrics are ingested but then there is an error. view below:
Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
This Dataflow is using default configurations so I'd really expect this to work out of the box.
I did some investigation on this. I believe the reason is because Elasticsearch has deprecated mapping types in ES 7.0+, so this causes a warning like this emitted for every bulk flush: request [POST https://xxx.europe-west4.gcp.elastic-cloud.com:443/logs-gcp.pubsub-default/_doc/_bulk] returned 1 warnings: [299 Elasticsearch-7.16.3-4e6e4eab2297e949ec994e688dad46290d018022 "[types removal] Specifying types in bulk requests is deprecated."]
.
I suppose the correct way is to either a) make it configurable, b) add ES version detection, c) just support ES 7+, or last but not least, ignore and supress the warning in checkForErrors()
(the code in question is here).
@Cherepushko I'm trying to build a test version of com.google.cloud.teleport.v2.elasticsearch.templates.PubSubToElasticsearch
, but I'm not having too much luck with the mvn compile exec:java
stuff on the front page. Would you be able to point to a working incantation to build a Dataflow Flex template from modified source?
@rosmo Thank you for looking into this!
@GustafKisi Could you give the main branch here a try: https://github.com/rosmo/DataflowTemplates ? I've attempted to fix a few issues that cropped up during my testing. You should get it built and running via Cloud Shell for example and these instructions:
cd DataflowTemplates/v2/
export PROJECT=<MY-PROJECT>
export REGION=<YOUR-GCP-REGION>
export IMAGE_NAME=pubsub-to-elasticsearch
export BUCKET_NAME=gs://<MY-DATAFLOW-BUCKET>
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export TEMPLATE_MODULE=googlecloud-to-elasticsearch
export APP_ROOT=/template/pubsub-to-elasticsearch
export COMMAND_SPEC=${APP_ROOT}/resources/pubsub-to-elasticsearch-command-spec.json
export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC} \
-am -pl ${TEMPLATE_MODULE}
# Follow the image spec creation process from here:
# https://github.com/rosmo/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#creating-image-spec
export SUBSCRIPTION="projects/$PROJECT/subscriptions/<MY-SUBSCRIPTION>"
export CONNECTION_URL="<MY-ELASTICSEARCH-CLOUD-ID>"
export ERROR_OUTPUT_TOPIC="projects/$PROJECT/topics/<MY-ERROR-TOPIC>"
export API_KEY=<MY-ELASTICSEARCH-API-KEY>
export NETWORK=<MY-NETWORK>
export SUBNETWORK="regions/$REGION/subnetworks/<MY-SUBNETWORK>"
export TEMP_LOCATION="gs://$BUCKET_NAME/temp"
export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
gcloud beta dataflow flex-template run ${JOB_NAME} \
--project=${PROJECT} --region=$REGION \
--template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
--network=${NETWORK} --subnetwork=${SUBNETWORK} --temp-location=${TEMP_LOCATION} \
--parameters inputSubscription=${SUBSCRIPTION},connectionUrl=${CONNECTION_URL},apiKey=${API_KEY},errorOutputTopic=${ERROR_OUTPUT_TOPIC},batchSize=10
@rosmo I can try, would I run this on my local terminal? Anything special I need installed except for gcloud?
@GustafKisi I suggest Cloud Shell, as it seems to have most stuff installed. Otherwise you'll have to have the Java JDK and Maven installed.
@rosmo The mvn clean package
is taking a while, is this normal?
Yeah, it'll take 20-30 minutes or something.
@rosmo This is actually somewhat working, there are errors, but I think I need to provide the UDF for this to work properly. I will continue to try a couple configs and report back.
I am now getting errors:
Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id CDxMwH4BJq15BT4CFX9w: failed to parse field [jsonPayload.place_id] of type [long] in document with id 'CDxMwH4BJq15BT4CFX9w'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id RjxMwH4BJq15BT4CHn9V: failed to parse field [jsonPayload.place_id] of type [long] in document with id 'RjxMwH4BJq15BT4CHn9V'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id TjxMwH4BJq15BT4CIX85: failed to parse field [jsonPayload.place_id] of type [long] in document with id 'TjxMwH4BJq15BT4CIX85'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
Document id 9BdMwH4BgcfeaGZUIVvk: failed to parse field [jsonPayload.place_id] of type [long] in document with id '9BdMwH4BgcfeaGZUIVvk'. Preview of field's value: 'unknown' (mapper_parsing_exception)
Caused by: For input string: "unknown" (illegal_argument_exception)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:237)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1518)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1471)
This should be fine, the UDF removes that.
@rosmo The input parameter javascriptTextTransformGcsPath
will not work, could this be fixed by adding it to the command that creates the Image Spec?
I made it work!
With this image spec:
{
"image":"'${TARGET_GCR_IMAGE}'",
"metadata":{
"name":"Pub/Sub to Elasticsearch",
"description":"Replicates data from Pub/Sub topic into an Elasticsearch index",
"parameters":[
{
"name":"inputSubscription",
"label":"The Cloud Pub/Sub subscription to consume from.",
"helpText":"The Cloud Pub/Sub subscription to consume from. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>.",
"paramType":"TEXT",
"isOptional":false
},
{
"name":"connectionUrl",
"label":"Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud",
"helpText":"Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud",
"paramType":"TEXT",
"isOptional":false
},
{
"name":"apiKey",
"label":"Base64 Encoded API Key for access without requiring basic authentication",
"helpText":"Base64 Encoded API Key for access without requiring basic authentication",
"paramType":"TEXT",
"isOptional":false
},
{
"name":"dataset",
"label":"The type of logs sent via Pub/Sub for which we have out of the box dashboard. Known log types values are audit, vpcflow, and firewall. If no known log type is detected, we default to pubsub",
"helpText":"The type of logs sent via Pub/Sub for which we have out of the box dashboard. Known log types values are audit, vpcflow, and firewall. If no known log type is detected, we default to pubsub",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"namespace",
"label":"The namespace for dataset. Default is default",
"helpText":"An arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. Default is default",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"errorOutputTopic",
"label":"Error output topic in Pub/Sub for failed inserts",
"helpText":"Error output topic in Pub/Sub for failed inserts",
"paramType":"PUBSUB_TOPIC",
"isOptional":false
},
{
"name":"elasticsearchUsername",
"label":"Username for Elasticsearch endpoint. Overrides ApiKey option if specified.",
"helpText":"Username for Elasticsearch endpoint. Overrides ApiKey option if specified.",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"elasticsearchPassword",
"label":"Password for Elasticsearch endpoint. Overrides ApiKey option if specified.",
"helpText":"Password for Elasticsearch endpoint. Overrides ApiKey option if specified.",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"batchSize",
"label":"Batch size in number of documents",
"helpText":"Batch size in number of documents. Default: 1000",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"batchSizeBytes",
"label":"Batch size in number of bytes",
"helpText":"Batch size in number of bytes. Default: 5242880 (5mb)",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"maxRetryAttempts",
"label":"Max retry attempts",
"helpText":"Max retry attempts, must be > 0. Default: no retries",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"maxRetryDuration",
"label":"Max retry duration in milliseconds",
"helpText":"Max retry duration in milliseconds, must be > 0. Default: no retries",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"javascriptTextTransformGcsPath",
"label":"Path to cloud storage UDF",
"helpText":"Path to cloud storage UDF, must start with 'gs://'",
"paramType":"TEXT",
"isOptional":true
},
{
"name":"javascriptTextTransformFunctionName",
"label":"Function name to be executed",
"helpText":"Function name to be executed",
"paramType":"TEXT",
"isOptional":true
}
]
},
"sdk_info":{"language":"JAVA"}
}
If this is the proper way to do it I don't know but it seems to work.
I got a proper stream of logs into ES!
I can't begin to explain my happiness, Thank you @rosmo
I'm so grateful, what's left to do?
@GustafKisi Great! Did you add the javascriptTextTransformGcsPath
to the input spec? (I think just passing it via command line would have worked too) I do want to improve the bulk insert a bit more (move to local ID generation and report errors better) still.
@rosmo I did, yes!
Sounds good. Is it OK for us to use this until the main template available is updated?
Yes, absolutely, but do note it's not officially Google version (but the changes are pretty transparent).
@rosmo Hi again, do you have any tips for how to optimize this dataflow?
This is a screenshot from the job:
And here is the config:
This ended up quite the expensive job. I've looked into the command for creating the job, and saw the optional parameter
- --enable-streaming-engine`
Would this help?
Or maybe this one:
-
--worker-machine-type
Which machine type would work? I don't think we need more than 30 Gb Hdd and 4 GB RAM
Hey @GustafKisi, I'm not an expert on Dataflow cost optimization, but certainly the streaming engine can help. You can also look into limiting the amount of worker nodes (max workers), using a E2 instance type for the machine type, reducing disk size (diskSizeGb
). Of course, limiting the amount of logs by log sink configuration or exclusion filters will help as well.
Thanks for the suggestions!
It's a little difficult finding documentation for all commands there are.
Just one question:
diskSizeGb does not work:
unrecognized arguments: --diskSizeGb=30
Is there another way to set this?
@rosmo All has been running smooth over the past days.
The only issue seems to be some ingestions are timing out:
Error message from worker: org.elasticsearch.client.ResponseException: method [POST], host [https://fcb96653d51c47489551f9298f773817.us-east1.gcp.elastic-cloud.com:443], URI [/logs-gcp.pubsub-default/_bulk], status line [HTTP/1.1 504 Gateway Timeout]
{"ok":false,"message":"Post \"https://10.47.63.204:18016/logs-gcp.pubsub-default/_bulk\": net/http: TLS handshake timeout"}
org.elasticsearch.client.RestClient.convertResponse(RestClient.java:302)
org.elasticsearch.client.RestClient.performRequest(RestClient.java:272)
org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1512)
com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1478)
For the diskSizeGb
, I think that arguments needs to go into the --parameters
part. The ingestion errors seem like coming from ES side of things. There are retry settings (maxRetryAttempts
and maxRetryDuration
) which are set to no retries by default), so you may want to tune those a bit.
For the
diskSizeGb
, I think that arguments needs to go into the--parameters
part. The ingestion errors seem like coming from ES side of things. There are retry settings (maxRetryAttempts
andmaxRetryDuration
) which are set to no retries by default), so you may want to tune those a bit.
Right! Thanks!
@rosmo Hello! Is there a way to specify an Elasticsearch index?
Hello @rosmo ! Nevermind the previous question.
Will there be any support for Elasticsearch 8.x?
They are working on Elasticsearch 8.x support here: https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/369 Should be available soon within the UI.
@GustafKisi For me its working now with Elasticsearch 8.x !