infoq-kafka-ksql copied to clipboard
Code samples to go with InfoQ article
= InfoQ - Apache Kafka and KSQL article - code samples Robin Moffatt [email protected] v0.12, September 5, 2018
== Overview
These are the code samples and snippets that were used in writing link:[this InfoQ article]. You can find a related suplemental article on the Confluent blog,["The Changing Face of ETL"].
== Startup
Run the stack:
docker-compose up -d
Deploy connectors etc:
Check that the connectors are running:
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort
Should show:
mysql-source-demo-customers | RUNNING | RUNNING mysql-source-demo-customers-raw | RUNNING | RUNNING
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort
Should show:
es_sink_ratings-with-customer-data | RUNNING | RUNNING es_sink_unhappy_platinum_customers | RUNNING | RUNNING
Note that the AWS S3 sink requires further setup, so will initially be in the FAILED
To run the KSQL CLI:
docker run --network infoq-kafka-ksql_default --interactive --tty
== Kafka Console Consumer
To run kafka-console-consumer
within Docker setup, use:
docker-compose exec kafka
--bootstrap-server kafka:29092
== Running the Python Slack/Kafka app
Virtualenv is useful for isolating your python libs etc
virtualenv slack-kafka
Source a different file depending on your shell
source ./slack-kafka/bin/
Install the libs needed
pip install slackclient confluent_kafka
Get your API token from
Set your API token as an env var:
export SLACK_API_TOKEN=your-token-goes-here
Run the app:
You'll see echo'd to stdout
each message that's processed and sent to Slack:
End of partition reached UNHAPPY_PLATINUM_CUSTOMERS/0 Received message: {"CLUB_STATUS":"platinum","EMAIL":"[email protected]","STARS":1,"MESSAGE":"Exceeded all my expectations. Thank you !"}
Sending message "[email protected]
just left a bad review :disappointed:
Exceeded all my expectations. Thank you !
Please contact them immediately and see if we can fix the issue right here, right now" to channel unhappy-customers
== Setup Kibana
In a browser, go to Kibana at http://localhost:5601/app/kibana#/management/kibana/indices/ and set an index (any index) as default (click the Star in the top-right)
Import dashboard kibana_objects.json
using the Import button at http://localhost:5601/app/kibana#/management/kibana/objects
image::images/kibana_ix01.png[Kibana indexes]
Load the Kibana dashboard http://localhost:5601/app/kibana#/dashboard/02028f30-424c-11e8-af19-1188a9332246
== Streaming data to S3 from Kafka
Create a file called aws_creds.txt
in the same folder as the docker-compose.yml
. This will be mounted and passed to Kafka Connect on startup. It should look like this:
[default] aws_access_key_id=xxxxxxxxxxxxxxx aws_secret_access_key=yyyyyyyyyyyyyy
Create the target bucket to write to and update scripts/
accordingly with the region and bucket name.
Deploy the connector:
docker-compose exec kafka-connect-cp bash -c '/scripts/'
You should see the sink successfully running:
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort es_sink_ratings-with-customer-data | RUNNING | RUNNING es_sink_unhappy_platinum_customers | RUNNING | RUNNING s3-sink-ratings | RUNNING | RUNNING
In S3 you should see data:
$ aws s3 ls rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ 2018-07-20 13:26:29 569 ratings-with-customer-data+0+0000000000.json 2018-07-20 13:26:31 615 ratings-with-customer-data+0+0000000003.json 2018-07-20 13:26:32 587 ratings-with-customer-data+0+0000000006.json 2018-07-20 13:26:33 592 ratings-with-customer-data+0+0000000009.json
== Streaming data to Google Cloud Storage (GCS) from Kafka
Install the GCS connector:
docker-compose exec kafka-connect-cp bash -c 'confluent-hub install --no-prompt confluentinc/kafka-connect-gcs:5.0.0' docker-compose restart kafka-connect-cp
Download your service account JSON credentials[from GCP] to a file called gcp_creds.json
.This will be mounted and passed to Kafka Connect on startup.
Create the target bucket to write to and update scripts/
accordingly with the region and bucket name.
Deploy the connector:
docker-compose exec kafka-connect-cp bash -c '/scripts/'
You should see the sink successfully running:
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort … gcs-sink-ratings | RUNNING | RUNNING
In GCS you should see data:
$ gsutil ls gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000000.json gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000064.json gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000128.json gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000192.json
You can also send CSV data to GCS for use in Data Studio - see scripts/
. A neater way is probably to go via BigQuery though, since you don't lose the schema that way.
== Streaming data to Google BigQuery from Kafka
Install the BigQuery connector:
docker-compose exec kafka-connect-cp bash -c 'confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.1.0' docker-compose restart kafka-connect-cp
Download your service account JSON credentials[from GCP] to a file called gcp_creds.json
.This will be mounted and passed to Kafka Connect on startup.
Make sure the dataset exists, create it through the Web UI, or CLI (./google-cloud-sdk/bin/bq mk ksql_demo
Update scripts/
with your BQ project name & dataset.
Deploy the connector:
docker-compose exec kafka-connect-cp bash -c '/scripts/'
You should see the sink successfully running:
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/"//g'| sort … gbq-sink-ratings | RUNNING | RUNNING