Agile_Data_Code_2 icon indicating copy to clipboard operation
Agile_Data_Code_2 copied to clipboard

Chapter 2: Exercises

Open geopolitis opened this issue 5 years ago • 8 comments

Please find below all the out of the Exercise. The step 10 is successful, the step 11 fails. Also fails the "Loading and Inspecting Parquet Files" find the output below.

================= Exercises First count the number of job titles for each executive, then create a pyspark.sql.Row for this result. In [10]:

records = csv_lines.map(lambda line: line.split(',')) records.collect() ​ groups = records.groupBy(lambda x: x[0]) counts = groups.map(lambda x: (x[0], len(x[1]))) new_rows = counts.map(lambda x: Row(name=x[0], count=x[1])) new_rows.collect() Out[10]: [Row(count=1, name='Florian Liebert'), Row(count=2, name='Russell Jurney'), Row(count=1, name='Don Brown'), Row(count=1, name='Steve Jobs'), Row(count=1, name='Donald Trump')] In [11]:

rows.toDF().select("name","count").show()

Py4JJavaError Traceback (most recent call last) /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else:

Py4JJavaError: An error occurred while calling o181.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'count' given input columns: [company, name, title];; 'Project [name#1, 'count] +- LogicalRDD [company#0, name#1, title#2]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2884)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

AnalysisException Traceback (most recent call last) in ----> 1 rows.toDF().select("name","count").show()

/home/vagrant/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1038 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1039 """ -> 1040 jdf = self._jdf.select(self._jcols(*cols)) 1041 return DataFrame(jdf, self.sql_ctx) 1042

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:

/home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve 'count' given input columns: [company, name, title];;\n'Project [name#1, 'count]\n+- LogicalRDD [company#0, name#1, title#2]\n"

===================

Loading and Inspecting Parquet Files Using the SparkSession to load files as DataFrames and inspecting their contents... In [19]:

Load the parquet file containing flight delay records

on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet') ​

Register the data for Spark SQL

on_time_dataframe.registerTempTable("on_time_performance") ​

Check out the columns

on_time_dataframe.columns

Py4JJavaError Traceback (most recent call last) /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else:

Py4JJavaError: An error occurred while calling o385.parquet. : org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/vagrant/Agile_Data_Code_2/data/on_time_performance.parquet; at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:626) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:349) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:559) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

AnalysisException Traceback (most recent call last) in 1 # Load the parquet file containing flight delay records ----> 2 on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet') 3 4 # Register the data for Spark SQL 5 on_time_dataframe.registerTempTable("on_time_performance")

/home/vagrant/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths) 289 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] 290 """ --> 291 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) 292 293 @ignore_unicode_prefix

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:

/home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Path does not exist: file:/home/vagrant/Agile_Data_Code_2/data/on_time_performance.parquet;'

geopolitis avatar Aug 13 '19 20:08 geopolitis

@geopolitis Looking at this now...

rjurney avatar Aug 19 '19 19:08 rjurney

I pushed a fix, the name for count/tital wasn't being called right.

rjurney avatar Aug 28 '19 01:08 rjurney

Still fixing...

rjurney avatar Aug 28 '19 18:08 rjurney

@geopolitis This is fixed.

rjurney avatar Sep 05 '19 19:09 rjurney

@rjurney a few of the issues have been fixed, however, seem like the issues below insists and also after a few steps is missing the file /data/simple_flight_delay_features.jsonl.bz2 "Building a Classifier Model to Predict Flight Delays Loading Our Data"

Could you help?

rows.toDF().select("name","count").show()

Py4JJavaError Traceback (most recent call last) /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else:

Py4JJavaError: An error occurred while calling o181.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'count' given input columns: [company, name, title];; 'Project [name#1, 'count] +- LogicalRDD [company#0, name#1, title#2]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2884)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

AnalysisException Traceback (most recent call last) in ----> 1 rows.toDF().select("name","count").show()

/home/vagrant/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1038 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1039 """ -> 1040 jdf = self._jdf.select(self._jcols(*cols)) 1041 return DataFrame(jdf, self.sql_ctx) 1042

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:

/home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve 'count' given input columns: [company, name, title];;\n'Project [name#1, 'count]\n+- LogicalRDD [company#0, name#1, title#2]\n"

geopolitis avatar Oct 07 '19 14:10 geopolitis

@rjurney regarding the /data/simple_flight_delay_features.jsonl.bz2 "Building a Classifier Model to Predict Flight Delays Loading Our Data"

I found a solution. The script in Chapter08 (ch08/download_data.sh ) downloads that files. However you need those files in chapter 02.

geopolitis avatar Oct 07 '19 15:10 geopolitis

I fixed the download issue in https://github.com/rjurney/Agile_Data_Code_2/commit/1a55d930d3a12c6259673ce92706ccd7c4a0d281

Run git pull origin master and you should get the update.

rjurney avatar Oct 07 '19 18:10 rjurney

I'm looking at the rest...

rjurney avatar Oct 07 '19 18:10 rjurney

@geopolitis I fixed all these to make the class go in 2021.

rjurney avatar Aug 10 '22 18:08 rjurney