streamify icon indicating copy to clipboard operation
streamify copied to clipboard

Command for spark-submit

Open stephenllh opened this issue 2 years ago • 27 comments

I have a question regarding the command for spark-submit:

spark-submit \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
    stream_all_events.py

What is the meaning of the packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2?

stephenllh avatar Apr 21 '22 13:04 stephenllh

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying

It is a required dependency for Kafka and Spark Streaming to work.

ankurchavda avatar Apr 21 '22 14:04 ankurchavda

I see! Can you explain what the colon symbol (:) means?

stephenllh avatar Apr 21 '22 14:04 stephenllh

I guess that is just for separation. It would mean that we want the spark-sql-kafka-0-10_2.12 package for the 3.1.2 version of Spark. So let's say you are running Spark 3.1.1. Then the last value after : will change to 3.1.1.

ankurchavda avatar Apr 21 '22 14:04 ankurchavda

Thanks a lot! I actually have another unrelated question. I have been struggling to find out how EventSim and Kafka work together to create the 4 topics: listen_events, page_view_events, auth_events, status_change_events. Is that hard-coded and only works for the Million Song Dataset? I also saw that each event has different columns, so you assign different Spark schemas for them.

stephenllh avatar Apr 21 '22 14:04 stephenllh

Right. Eventsim is currently coded to generate events for a music streaming website. You can change that but that is written in Scala. So if you have experience with that, why not. The topics are also currently hardcoded. You can check the codebase here

ankurchavda avatar Apr 21 '22 14:04 ankurchavda

I see. How did you check the columns and the data types of each Kafka event before you define the schemas?

stephenllh avatar Apr 21 '22 14:04 stephenllh

From the confluent control center UI. It should be available on port 9021 of your VM. You will have to forward that port to your local machine and open it in the browser.

ankurchavda avatar Apr 21 '22 14:04 ankurchavda

I didn't know you can check those out from the control center. Thanks for being helpful.

stephenllh avatar Apr 21 '22 15:04 stephenllh

No worries. If you check the youtube video I put out a few days back, you should get an idea of how to navigate through the project. You'll find the link in the README

ankurchavda avatar Apr 21 '22 15:04 ankurchavda

Hi. Do you mind answering another question of mine? I hope this is not bothering you.

When I set the environmental variable KAFKA_ADDRESS to be the external IP, the eventsim somehow couldn't create all 4 topics. The logs suggested that it struggles to find the correct IP address. But when I don't set the environmental variable (and thus setting it to localhost by default as per your code), then it works. Do you have any idea why?

stephenllh avatar Apr 26 '22 04:04 stephenllh

Hey, no worries, happy to answer your questions.

I am not sure why this would happen. Eventsim is running on the same VM as Kafka, hence it should pick up directly via that. Eventsim doesn't refer to the KAFKA_ADDRESS variable as such. Have you mentioned the correct IP, and do you have your 9092 port open?

ankurchavda avatar Apr 26 '22 05:04 ankurchavda

Your Kafka broker will be available at that port for other applications to connect.

ankurchavda avatar Apr 26 '22 06:04 ankurchavda

My mistake. It was opened automatically. (Sorry I deleted the comment after I realized that). Now it works. But let me try again after setting the env variable.

stephenllh avatar Apr 26 '22 06:04 stephenllh

Yeah it doesn't work if I set the KAFKA_ADDRESS to the external IP of the VM. It works when I unset the env variable.

This is what I see in the terminal:

06:42:20.083 [kafka-producer-network-thread | producer-3] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-3] Initiating connection to node 34.124.249.114:9092 (id: 1 rack: null) using address /34.124.249.114
06:43:20.086 [main] DEBUG o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-3] Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic page_view_events not present in metadata after 60000 ms.

stephenllh avatar Apr 26 '22 06:04 stephenllh

By the way I just saw this in the Kafka docker compose yaml file

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://${KAFKA_ADDRESS:-localhost}:9092

So the KAFKA_ADDRESS env variable does affect stuff.

stephenllh avatar Apr 26 '22 06:04 stephenllh

Yes, it does. If you want to connect from an external VM, like the Spark cluster in our case, you need to write to the external IP address instead of the localhost. Hence I added the KAFKA_ADDRESS variable in the docker-compose.yml.

ankurchavda avatar Apr 26 '22 07:04 ankurchavda

I just noticed this in the terraform config:

resource "google_compute_firewall" "port_rules" {
  project     = var.project
  name        = "kafka-broker-port"
  network     = var.network
  description = "Opens port 9092 in the Kafka VM for Spark cluster to connect"

  allow {
    protocol = "tcp"
    ports    = ["9092"]
  }

  source_ranges = ["0.0.0.0/0"]
  target_tags   = ["kafka"]

}

I didn't follow everything in your project so I missed this part. Is this the reason why it doesn't work? But without this step, it does open up port 9092 on my VSCode when I SSH to the Kafka VM though. Or they are totally different things?

stephenllh avatar Apr 26 '22 09:04 stephenllh

Hey, when I set the port rules, it worked! It turns out this step is important.

stephenllh avatar Apr 28 '22 10:04 stephenllh

So on VS Code, you are forwarding the port. It is not the same as opening the port. You are able to forward the port to your local computer because you already authenticated when you did an SSH. On the other hand, you open your port to allow connections from some other VM in the network. By default, no ports are accessible to any other VMs inside or outside the network. You need to specify, which port would you like to open to accept connections.

ankurchavda avatar Apr 29 '22 05:04 ankurchavda

Ah I see! This explanation is helpful.

stephenllh avatar Apr 30 '22 05:04 stephenllh

I have another question. I see that you set up a Dataprocs server for Spark streaming. Do you think it makes sense if I do everything on a single VM and hence serve it locally? So I was thinking everything would involve "localhost" and the master node would be "local[*]" etc.

stephenllh avatar May 02 '22 10:05 stephenllh

You can. However, In the real-world Spark would always be running on some kind of a cluster. So to make things run on the cluster would be a wiser choice if you plan to showcase the infra choices in an interview or somewhere else.

ankurchavda avatar May 03 '22 05:05 ankurchavda

I see. Yeah you made a great point.

stephenllh avatar May 03 '22 06:05 stephenllh

By the way, I am not sure this is a mistake on my behalf, but I think I might have found some potential bugs:

  1. The KAFKA_ADDRESS for the Kafka docker-compose YAML file is unnecessary since both Kafka and the EventSim container are in the same virtual machine. Kindly refer to this link.

The relevant change I made is the removal of the environmental variable

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

Otherwise, I couldn't make it work. (Ignore my reply a few days ago, I too thought it worked when I set the env variable to the external IP of the VM, but it actually did not work. My bad.)

  1. For this command,
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 stream_all_events.py

I think there is a mistake in the Spark version. In the Spark README, the version 3.0.3 is installed. When I changed it to:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3 stream_all_events.py

it finally worked.

stephenllh avatar May 03 '22 13:05 stephenllh

Hey, apologies for the late reply. There's a lot going on at this point and I kinda missed this comment.

  1. I had to start writing the message to external IP instead of localhost explicitly so that my Spark Streaming process can connect and stream the messages. Earlier I had that same line as yours in the YAML file. I had to then add the ENV variable for this work. So I am not sure why it's the opposite in your case.
  2. Yes, so I used the Dataproc cluster, which had Spark 3.1.2 installed as default. Hence, my command was for the same.

ankurchavda avatar May 07 '22 05:05 ankurchavda

No worries. But I wonder if it's fine if we talk somewhere other than here? I also miss comments here sometimes. Perhaps Slack is good, but I am fine with anything.

I now have questions regarding the dbt + Airflow part. I am very new to SQL, so I really need your help to understand the logic behind the SQL queries you wrote.

  1. In airflow/dags/sql/auth_events.sql, you put a lot of COALESCE. I assume this is to handle the null values. But how about the 9999999999999 for registration? What does it mean?

  2. I think I understand what is going on in the SQL query below. But the SELECT part at the bottom looks kinda random to me. Can you explain the meaning of the string with multiple Ns, two '0's, and two 'NA's?

{{ config(materialized = 'table') }}

SELECT {{ dbt_utils.surrogate_key(['artistId']) }} AS artistKey,
    *
FROM (
        SELECT 
            MAX(artist_id) AS artistId,
            MAX(artist_latitude) AS latitude,
            MAX(artist_longitude) AS longitude,
            MAX(artist_location) AS location,
            REPLACE(REPLACE(artist_name, '"', ''), '\\', '') AS name
        FROM {{ source('staging', 'songs') }}
        GROUP BY artist_name

        UNION ALL

        SELECT 'NNNNNNNNNNNNNNN',
            0,
            0,
            'NA',
            'NA'
    )

stephenllh avatar May 07 '22 14:05 stephenllh

Sure, feel free to reach out over slack. I assume you must be in the Datatalks slack workspace.

For 1 and 2, both are techniques to handle dimensions where the value is null or unknown. It's kinda hard to explain over chat, but the idea is that there should not be any null dimensions in your fact. A null artist represents nothing and also messes up with aggregates and visualizations. Instead, we add a record in the artists dimension to say that all the null artist ids will now be represented by NNNNNNNNNNNNNNN. Just adds a little clarity to your data and helps you track stuff. I hope this makes sense. You'll come across this when you read about dimensional modelling, it's a fairly common topic.

Here's a short explanation on the Kimball group blog about the same.

ankurchavda avatar May 07 '22 17:05 ankurchavda