infoq-kafka-ksql
infoq-kafka-ksql copied to clipboard
Code samples to go with InfoQ article
= InfoQ - Apache Kafka and KSQL article - code samples Robin Moffatt [email protected] v0.12, September 5, 2018
== Overview
These are the code samples and snippets that were used in writing link:https://www.infoq.com/articles/democratizing-stream-processing-kafka-ksql-part2[this InfoQ article]. You can find a related suplemental article on the Confluent blog, https://www.confluent.io/blog/changing-face-etl["The Changing Face of ETL"].
== Startup
Run the stack:
[source,bash]
docker-compose up -d
Deploy connectors etc:
[source,bash]
./scripts/setup.sh
Check that the connectors are running:
[source,bash]
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort
Should show:
[source,bash]
mysql-source-demo-customers | RUNNING | RUNNING mysql-source-demo-customers-raw | RUNNING | RUNNING
[source,bash]
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort
Should show:
[source,bash]
es_sink_ratings-with-customer-data | RUNNING | RUNNING es_sink_unhappy_platinum_customers | RUNNING | RUNNING
Note that the AWS S3 sink requires further setup, so will initially be in the FAILED
state.
== KSQL
To run the KSQL CLI:
[source,bash]
docker run --network infoq-kafka-ksql_default --interactive --tty
confluentinc/cp-ksql-cli:5.0.0-beta180702222458
http://ksql-server:8088
== Kafka Console Consumer
To run kafka-console-consumer
within Docker setup, use:
[source,bash]
docker-compose exec kafka
kafka-console-consumer
--bootstrap-server kafka:29092
--topic UNHAPPY_PLATINUM_CUSTOMERS
== Running the Python Slack/Kafka app
Virtualenv is useful for isolating your python libs etc
[source,bash]
virtualenv slack-kafka
Source a different file depending on your shell
source ./slack-kafka/bin/activate.fish
Install the libs needed
[source,bash]
pip install slackclient confluent_kafka
Get your API token from https://api.slack.com/custom-integrations/legacy-tokens
Set your API token as an env var:
[source,bash]
export SLACK_API_TOKEN=your-token-goes-here
Run the app:
[source,bash]
python python_kafka_notify.py
You'll see echo'd to stdout
each message that's processed and sent to Slack:
[source,bash]
End of partition reached UNHAPPY_PLATINUM_CUSTOMERS/0 Received message: {"CLUB_STATUS":"platinum","EMAIL":"[email protected]","STARS":1,"MESSAGE":"Exceeded all my expectations. Thank you !"}
Sending message "[email protected]
just left a bad review :disappointed:
Exceeded all my expectations. Thank you !
Please contact them immediately and see if we can fix the issue right here, right now" to channel unhappy-customers
image::images/slack01.png[]
== Setup Kibana
In a browser, go to Kibana at http://localhost:5601/app/kibana#/management/kibana/indices/ and set an index (any index) as default (click the Star in the top-right)
Import dashboard kibana_objects.json
using the Import button at http://localhost:5601/app/kibana#/management/kibana/objects
image::images/kibana_ix01.png[Kibana indexes]
Load the Kibana dashboard http://localhost:5601/app/kibana#/dashboard/02028f30-424c-11e8-af19-1188a9332246
== Streaming data to S3 from Kafka
Create a file called aws_creds.txt
in the same folder as the docker-compose.yml
. This will be mounted and passed to Kafka Connect on startup. It should look like this:
[source,bash]
[default] aws_access_key_id=xxxxxxxxxxxxxxx aws_secret_access_key=yyyyyyyyyyyyyy
Create the target bucket to write to and update scripts/create-s3-sink.sh
accordingly with the region and bucket name.
Deploy the connector:
[source,bash]
docker-compose exec kafka-connect-cp bash -c '/scripts/create-s3-sink.sh'
You should see the sink successfully running:
[source,bash]
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort es_sink_ratings-with-customer-data | RUNNING | RUNNING es_sink_unhappy_platinum_customers | RUNNING | RUNNING s3-sink-ratings | RUNNING | RUNNING
In S3 you should see data:
[source,bash]
$ aws s3 ls rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ 2018-07-20 13:26:29 569 ratings-with-customer-data+0+0000000000.json 2018-07-20 13:26:31 615 ratings-with-customer-data+0+0000000003.json 2018-07-20 13:26:32 587 ratings-with-customer-data+0+0000000006.json 2018-07-20 13:26:33 592 ratings-with-customer-data+0+0000000009.json
image::images/s3_bucket_ratings.png[]
== Streaming data to Google Cloud Storage (GCS) from Kafka
Install the GCS connector:
[source,bash]
docker-compose exec kafka-connect-cp bash -c 'confluent-hub install --no-prompt confluentinc/kafka-connect-gcs:5.0.0' docker-compose restart kafka-connect-cp
Download your service account JSON credentials https://console.cloud.google.com/apis/credentials[from GCP] to a file called gcp_creds.json
.This will be mounted and passed to Kafka Connect on startup.
Create the target bucket to write to and update scripts/create-gcs-sink.sh
accordingly with the region and bucket name.
Deploy the connector:
[source,bash]
docker-compose exec kafka-connect-cp bash -c '/scripts/create-gcs-sink.sh'
You should see the sink successfully running:
[source,bash]
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort … gcs-sink-ratings | RUNNING | RUNNING
In GCS you should see data:
[source,bash]
$ gsutil ls gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000000.json gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000064.json gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000128.json gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000192.json
image::images/gcs_bucket_ratings.png[]
You can also send CSV data to GCS for use in Data Studio - see scripts/create-gcs-sink-csv.sh
. A neater way is probably to go via BigQuery though, since you don't lose the schema that way.
image::images/gcp_datastudio.png[]
== Streaming data to Google BigQuery from Kafka
Install the BigQuery connector:
[source,bash]
docker-compose exec kafka-connect-cp bash -c 'confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.1.0' docker-compose restart kafka-connect-cp
Download your service account JSON credentials https://console.cloud.google.com/apis/credentials[from GCP] to a file called gcp_creds.json
.This will be mounted and passed to Kafka Connect on startup.
Make sure the dataset exists, create it through the Web UI, or CLI (./google-cloud-sdk/bin/bq mk ksql_demo
).
Update scripts/create-gbq-sink.sh
with your BQ project name & dataset.
Deploy the connector:
[source,bash]
docker-compose exec kafka-connect-cp bash -c '/scripts/create-gbq-sink.sh'
You should see the sink successfully running:
[source,bash]
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort … gbq-sink-ratings | RUNNING | RUNNING
image::images/gbq01.png[]