hudi
hudi copied to clipboard
[SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta
"Hello, I am currently exploring the new Functional Index features as outlined in the RFC.
Below is an example of my code, but I am unsure if I am correctly implementing the functional index. I'm not familiar with the exact syntax, and I would greatly appreciate your assistance.
My Hudi tables have been created, and I need guidance on how to properly create a functional index on the 'ts' (timestamp) column. Any help you can provide would be highly appreciated."
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
from pyspark.sql import SparkSession
import uuid
import random
print("Imports loaded ")
except Exception as e:
print("error",e)
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.state(),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
random.choice(["2021", "2022", "2023"]),
faker.month()
) for _ in range(samples)
]
@staticmethod
def 1get_eCommerce_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.uuid4(),
str(faker.random_int(min=1, max=10)),
str(faker.random_int(min=10, max=100)),
str(faker.unix_time())
) for _ in range(samples)
]
ecommerce_data = DataGenerator.get_eCommerce_data(samples=500)
ecommerce_columns = ["transaction_id", "product_id", "quantity", "price", "ts"]
ecommerce_df = spark.createDataFrame(data=ecommerce_data, schema=ecommerce_columns)
ecommerce_df.show(2)
ecommerce_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
functional_index_sql = """
CREATE FUNCTION INDEX ts_functional_idx
ON ecommerce_table
USING COLUMN_STATS (ts)
"""
spark.sql(functional_index_sql)
Not sure how to use it in python
177 raise
ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near 'ts_functional_idx'.(line 2, pos 26)
== SQL ==
CREATE FUNCTION INDEX ts_functional_idx
--------------------------^^^
ON ecommerce_table
USING COLUMN_STATS (ts)
@yihua @codope codope
Hi @soumilshah1995 , thanks for giving it a try! Currently, the FUNCTION
keyword is not integrated. I need to update the RFC with the exact syntax which can be found here in the SQL DDL docs - https://hudi.apache.org/docs/next/sql_ddl#create-index-experimental
We are tracking the issue to simplify the syntax. Ideally, we want users to be able to just say CREATE INDEX func_index_abc on xyz_hudi_table USING column_stats(hour(ts))
without using FUNCTION
keyword or provide extra options to specify the function. We will have it in 1.0 GA. Feel free to reach out to me directly on Hudi Slack if you're more interested in this feature.
would it work with Spark SQL on Hudi '1.0.0-beta1' or do we need to wait for '1.0.0-beta2' ? can you please provide example in pyspark how to use it I cannot seem to get it to work
Tried
+--------------------+--------------------+--------+-----+----------+
| transaction_id| product_id|quantity|price| ts|
+--------------------+--------------------+--------+-----+----------+
|8ad35260-10ce-468...|977d50db-9970-41c...| 2| 24|1121221702|
|2cbe47c6-c7fc-472...|1f2bceff-8056-44e...| 1| 62|1527868645|
+--------------------+--------------------+--------+-----+----------+
Code
path = 'file:///Users/soumilnitinshah/Downloads/hudidb/ecommerce_table'
spark.read.format("hudi").load(path).createOrReplaceTempView("ecommerce_table")
# Assuming the functional index syntax follows the proposed RFC syntax
functional_index_sql = """
CREATE INDEX IF NOT EXISTS ts_functional_idx
ON TABLE ecommerce_table
USING column_stats(ts)
OPTIONS (func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(functional_index_sql)
Error Logs
Py4JJavaError: An error occurred while calling o34.sql.
: java.lang.ClassCastException: class org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias cannot be cast to class org.apache.spark.sql.catalyst.analysis.ResolvedTable (org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias and org.apache.spark.sql.catalyst.analysis.ResolvedTable are in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$$anonfun$apply$58.applyOrElse(Analyzer.scala:3671)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$$anonfun$apply$58.applyOrElse(Analyzer.scala:3668)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$.apply(Analyzer.scala:3668)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$.apply(Analyzer.scala:3667)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Here is Full code for references
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Sample data
data = [
['2023-09-20 03:58:59', '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'],
['2023-09-19 08:46:34', 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", StringType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
path = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index'
hudi_options = {
'hoodie.table.name': 'hudi_table_func_index',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.table.metadata.enable': 'true',
'hoodie.datasource.write.partitionpath.field': 'location'
}
df.write.format("hudi").options(**hudi_options).mode("append").save(path)
# Register the Hudi table
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_table_func_index")
# Create the functional index
functional_index_sql = """
CREATE INDEX ts_hour
ON hudi_table_func_index
USING column_stats(ts)
OPTIONS (func='hour');
"""
spark.sql(functional_index_sql)
Error
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
Cell In[4], line 12
4 # Create the functional index
5 functional_index_sql = """
6 CREATE INDEX ts_hour
7 ON hudi_table_func_index
8 USING column_stats(ts)
9 OPTIONS (func='hour');
10 """
---> 12 spark.sql(functional_index_sql)
File ~/anaconda3/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File ~/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File ~/anaconda3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
AnalysisException: hudi_table_func_index is a temp view. 'CREATE INDEX' expects a table..; line 3 pos 8
It should work with 1.0.0-beta1 with spark-sql. In the DDL page, we have provided an example with hour
function. You can replace it with from_unixtime
. This is what I tried and it works.
DROP TABLE IF EXISTS hudi_table;
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
tblproperties (primaryKey = 'uuid')
PARTITIONED BY (city)
location 'file:///tmp/hudi_func_index';
-- disable small file handling so the each insert creates new file --
set hoodie.parquet.small.file.limit=0;
-- records with ts in [2023-09-18, 2023-09-24] --
INSERT INTO hudi_table VALUES (1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco');
INSERT INTO hudi_table VALUES (1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco');
INSERT INTO hudi_table VALUES (1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco');
INSERT INTO hudi_table VALUES (1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco');
INSERT INTO hudi_table VALUES (1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo');
INSERT INTO hudi_table VALUES (1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo');
INSERT INTO hudi_table VALUES (1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai');
INSERT INTO hudi_table VALUES (1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
SELECT city, fare, rider, driver, from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM hudi_table;
san_francisco 33.9 rider-D driver-L 2023-09-18
san_francisco 27.7 rider-C driver-M 2023-09-19
san_francisco 93.5 rider-E driver-O 2023-09-22
san_francisco 19.1 rider-A driver-K 2023-09-20
sao_paulo 43.4 rider-G driver-Q 2023-09-22
sao_paulo 34.15 rider-F driver-P 2023-09-24
chennai 17.85 rider-J driver-T 2023-09-19
chennai 41.06 rider-I driver-S 2023-09-20
-- create index --
CREATE INDEX datestr ON hudi_table USING column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd');
SELECT city, fare, rider, driver FROM hudi_table WHERE from_unixtime(ts, 'yyyy-MM-dd') > '2023-09-20';
san_francisco 93.5 rider-E driver-O
sao_paulo 43.4 rider-G driver-Q
sao_paulo 34.15 rider-F driver-P
Time taken: 0.428 seconds, Fetched 3 row(s)
Here is the entirety of my code, and I'm encountering persistent issues. I attempted to contact you through a personal message on Slack for more rapid communication, hoping to expedite the resolution process. Please inform me if I've made any errors, as I'm still in the early stages of learning and might be overlooking certain aspects. I'm eager to learn, progress, and eventually contribute my knowledge to the Hudi community.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Sample data
data = [
['2023-09-20 03:58:59', '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'],
['2023-09-19 08:46:34', 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", StringType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
path = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index'
hudi_options = {
'hoodie.table.name': 'hudi_table_func_index',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.table.metadata.enable': 'true',
'hoodie.datasource.write.partitionpath.field': 'location'
}
df.write.format("hudi").options(**hudi_options).mode("append").save(path)
# Register the Hudi table
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_table_func_index")
# Create the functional index
functional_index_sql = """
CREATE INDEX ts_functional_index
ON hudi_table_func_index(ts)
USING column_stats
OPTIONS (func='from_unixtime', format='yyyy-MM-dd HH:mm:ss');
"""
spark.sql(functional_index_sql)
Error
---------------------------------------------------------------------------
ParseException Traceback (most recent call last)
Cell In[2], line 12
4 # Create the functional index
5 functional_index_sql = """
6 CREATE INDEX ts_functional_index
7 ON hudi_table_func_index(ts)
8 USING column_stats
9 OPTIONS (func='from_unixtime', format='yyyy-MM-dd HH:mm:ss');
10 """
---> 12 spark.sql(functional_index_sql)
File ~/anaconda3/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File ~/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File ~/anaconda3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
ParseException:
Operation not allowed: CREATE INDEX.(line 2, pos 4)
== SQL ==
CREATE INDEX ts_functional_index
----^^^
ON hudi_table_func_index(ts)
USING column_stats
OPTIONS (func='from_unixtime', format='yyyy-MM-dd HH:mm:ss');
@soumilshah1995 There is a bit of issue with your query I guess. I tried below code for which the parsing works. Although facing some other problem. No plan for CreateIndex hudi_func_index_01a19d91_699f_4ead_afe1_82a232f91efa_datestr, column_stats, false,
Can you try once.
# Sample data
data = [
[1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'],
[1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", LongType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
hudi_options = {
'hoodie.table.name': TABLE_NAME,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.write.partitionpath.field': 'location',
'hoodie.parquet.small.file.limit':'0'
}
df.write.format("hudi").options(**hudi_options).mode("append").save(PATH)
spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME)
spark.sql(f"""SELECT from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM {TABLE_NAME}""").show()
spark.sql(f"""CREATE INDEX {TABLE_NAME}_datestr ON {TABLE_NAME} USING column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')""")
Code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
data = [
[1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'],
[1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", StringType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
path = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index'
hudi_options = {
'hoodie.table.name': 'hudi_table_func_index',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.table.metadata.enable': 'true',
'hoodie.datasource.write.partitionpath.field': 'location',
'hoodie.parquet.small.file.limit':'0'
}
df.write.format("hudi").options(**hudi_options).mode("append").save(path)
PATH = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index'
TABLE_NAME = "hudi_table_func_index"
spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME)
spark.sql(f"""SELECT from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM {TABLE_NAME}""").show()
spark.sql(f"""CREATE INDEX {TABLE_NAME}_datestr ON {TABLE_NAME} USING column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')""")
Error
+----------+
| datestr|
+----------+
|2023-09-19|
|2023-09-19|
+----------+
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[7], line 7
4 spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME)
6 spark.sql(f"""SELECT from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM {TABLE_NAME}""").show()
----> 7 spark.sql(f"""CREATE INDEX {TABLE_NAME}_datestr ON {TABLE_NAME} USING column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')""")
File ~/anaconda3/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File ~/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File ~/anaconda3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File ~/anaconda3/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o34.sql.
: java.lang.ClassCastException: class org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias cannot be cast to class org.apache.spark.sql.catalyst.analysis.ResolvedTable (org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias and org.apache.spark.sql.catalyst.analysis.ResolvedTable are in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$$anonfun$apply$58.applyOrElse(Analyzer.scala:3671)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$$anonfun$apply$58.applyOrElse(Analyzer.scala:3668)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$.apply(Analyzer.scala:3668)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$.apply(Analyzer.scala:3667)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
@soumilshah1995 @codope Create JIRA to track this issue - https://issues.apache.org/jira/browse/HUDI-7117
Thanks
@danny0405 @codope When will version 1.0.0 be released? After the beta version, it has not been updated for a long time.
@zyclove we'll do a beta2 release in Feb which will include fix for this issue.
Great
is this release out ?
This is likely not an issue, but a gap in understanding the feature. The issue is that spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME)
creates a temporary view (similar to the one that is created using CREATE TEMPorary VIEW ...
) and it is neither a table and not a hudi managed table. Hence the following CREATE INDEX ...
statement to create a functional fails as the object on which the index is being created is not a hudi managed table.
Instead of creating a temporary view, one can use saveAsTable(...)
method on the DataFrameWriter object to create a hudi managed table and then create functional index on those tables. Please see if this works for you:
df.write.format("hudi").options(**hudi_options).option("path", "/external/table/path").mode("append").saveAsTable("table_name")
spark.sql(s"CREATE INDEX hudi_table_func_index_datestr ON table_name USING column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
let me go ahead and try this
Still getting error here is full code and error message please let me know what I am missing here
Code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
data = [
[1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'],
[1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", StringType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
path = 'file:///Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/'
hudi_options = {
'hoodie.table.name': 'hudi_table_func_index',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.table.metadata.enable': 'true',
'hoodie.datasource.write.partitionpath.field': 'location',
'hoodie.parquet.small.file.limit':'0'
}
df.write.format("hudi").options(**hudi_options).option("path", path).mode("append").saveAsTable("table_name")
query = """
CREATE INDEX hudi_table_func_index_datestr
ON table_name
USING column_stats(ts)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(query)
Error
24/03/27 08:26:00 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_]
24/03/27 08:26:00 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/.hoodie/metadata
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/072fb7a1-bbd2-481a-9c7a-3ab3e3a70e68-0_11-31-0_20240327082222863.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[13], line 8
1 query = """
2 CREATE INDEX hudi_table_func_index_datestr
3 ON table_name
4 USING column_stats(ts)
5 OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
6 """
----> 8 spark.sql(query)
File /opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File /opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File /opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o34.sql.
: org.apache.hudi.exception.HoodieMetadataException: Failed to index partition [func_index_hudi_table_func_index_datestr]
at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:179)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.index(HoodieSparkCopyOnWriteTable.java:308)
at org.apache.hudi.client.BaseHoodieWriteClient.index(BaseHoodieWriteClient.java:1003)
at org.apache.hudi.HoodieSparkFunctionalIndexClient.create(HoodieSparkFunctionalIndexClient.java:100)
at org.apache.spark.sql.hudi.command.CreateIndexCommand.run(IndexCommands.scala:53)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieMetadataException: func_index_ bootstrap failed for file:/Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/.hoodie/metadata
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:453)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:293)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:192)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:109)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:95)
at org.apache.hudi.table.HoodieSparkTable.getMetadataWriter(HoodieSparkTable.java:103)
at org.apache.hudi.table.HoodieTable.getIndexingMetadataWriter(HoodieTable.java:915)
at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:147)
... 47 more
Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/072fb7a1-bbd2-481a-9c7a-3ab3e3a70e68-0_11-31-0_20240327082222863.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Thanks for trying it out. This seems like a different issue (rather than the spark analysis error that you were seeing earlier).
The error reported now is that the base files for the table are not found (in the specified location/path) i.e "file:/072fb7a1-bbd2-481a-9c7a-3ab3e3a70e68-0_11-31-0_20240327082222863.parquet" is not being found to build the index. I think you should give the correct path
for the external table. Something like df.write.format("hudi").options(**hudi_options).option("path","/full/path/to/external/table/location/").mode("append").saveAsTable("table_name")
I did give a path
path = 'file:///Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/'
do you want me to change the path and try ?
Here is Full code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import os
import sys
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
data = [
[1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'],
[1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", StringType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
path = 'file:///Users/soumilshah/Desktop/hudidemo/'
hudi_options = {
'hoodie.table.name': 'hudi_table_func_index',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.table.metadata.enable': 'true',
'hoodie.datasource.write.partitionpath.field': 'location',
'hoodie.parquet.small.file.limit':'0'
}
df.write.format("hudi").options(**hudi_options).option("path", path).mode("append").saveAsTable("table_name")
spark.read.format("hudi").load(path)
TABLE_NAME = "table_name"
spark.sql(f"""SELECT from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM {TABLE_NAME}""").show()
"""
+----------+
| datestr|
+----------+
|2023-09-19|
|2023-09-19|
+----------+
"""
query = """
CREATE INDEX hudi_table_func_index_datestr
ON table_name
USING column_stats(ts)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(query)
Error
24/03/27 14:14:10 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_]
24/03/27 14:14:10 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[14], line 8
1 query = """
2 CREATE INDEX hudi_table_func_index_datestr
3 ON table_name
4 USING column_stats(ts)
5 OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
6 """
----> 8 spark.sql(query)
File /opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File /opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File /opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o34.sql.
: org.apache.hudi.exception.HoodieMetadataException: Failed to index partition [func_index_hudi_table_func_index_datestr]
at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:179)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.index(HoodieSparkCopyOnWriteTable.java:308)
at org.apache.hudi.client.BaseHoodieWriteClient.index(BaseHoodieWriteClient.java:1003)
at org.apache.hudi.HoodieSparkFunctionalIndexClient.create(HoodieSparkFunctionalIndexClient.java:100)
at org.apache.spark.sql.hudi.command.CreateIndexCommand.run(IndexCommands.scala:53)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieMetadataException: func_index_ bootstrap failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:453)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:293)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:192)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:109)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:95)
at org.apache.hudi.table.HoodieSparkTable.getMetadataWriter(HoodieSparkTable.java:103)
at org.apache.hudi.table.HoodieTable.getIndexingMetadataWriter(HoodieTable.java:915)
at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:147)
... 47 more
Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
24/03/27 14:14:10 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
The above lines indicate that the basepath for the file was created in "file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata", but the the functional-index build process is trying to read a data file from "file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet". Please try removing the 'file:///' prefix from the path and retry i.e path should be /Users/soumilshah/Desktop/hudidemo/
If you still see the problem, please open a new issue. As I mentioned earlier, the issue you are seeing now is not the same as the original issue that you reported (trying to build index on a temporary view).
Hi I am still seeing same issue even after removing file:///
Error
24/03/28 07:36:06 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_]
24/03/28 07:36:06 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/2c1b72c3-5de3-4f19-b7dc-e67b3a345c53-0_5-25-0_20240328073553477.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[4], line 8
1 query = """
2 CREATE INDEX hudi_table_func_index_datestr
3 ON table_name
4 USING column_stats(ts)
5 OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
6 """
----> 8 spark.sql(query)
File /opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
1441 finally:
1442 if len(kwargs) > 0:
File /opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File /opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o34.sql.
: org.apache.hudi.exception.HoodieMetadataException: Failed to index partition [func_index_hudi_table_func_index_datestr]
at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:179)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.index(HoodieSparkCopyOnWriteTable.java:308)
at org.apache.hudi.client.BaseHoodieWriteClient.index(BaseHoodieWriteClient.java:1003)
at org.apache.hudi.HoodieSparkFunctionalIndexClient.create(HoodieSparkFunctionalIndexClient.java:100)
at org.apache.spark.sql.hudi.command.CreateIndexCommand.run(IndexCommands.scala:53)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieMetadataException: func_index_ bootstrap failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:453)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:293)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:192)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:109)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:95)
at org.apache.hudi.table.HoodieSparkTable.getMetadataWriter(HoodieSparkTable.java:103)
at org.apache.hudi.table.HoodieTable.getIndexingMetadataWriter(HoodieTable.java:915)
at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:147)
... 47 more
Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/2c1b72c3-5de3-4f19-b7dc-e67b3a345c53-0_5-25-0_20240328073553477.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
I believe there's no need to create a new GitHub issue. The title clearly indicates the issue with beta1, a matter that Sagar and Aditya have confirmed previously. I'd suggest keeping the existing issue open for easier tracking, ensuring that all conversation threads remain consolidated within this GitHub thread.
Thanks for trying all the suggestions. I am unable to reproduce this in my environment. I will spend some time next week to setup the exact environment/versions you are using. In the mean time, would it be possible for you to run your tests against the latest master branch of Hudi and reproduce this? From the stack-trace below, I am not sure why the index build process is stripping the full path of the data file in your environment.
24/03/28 07:36:06 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_]
24/03/28 07:36:06 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/2c1b72c3-5de3-4f19-b7dc-e67b3a345c53-0_5-25-0_20240328073553477.parquet.
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
Vinay, the problem I'm facing is that I struggle with building the JAR myself due to java versions conflicts . That's why I've resorted to using the packaged version to incorporate Hudi 1.0.0-beta. I recall Sagar mentioning that there were some issues with it. He did assure me that they were resolved in the master branch, and he also hinted that the official release would be coming soon. Given these remarks, I'm inclined to think that this version still has some bugs
I've attached the chats for your reference.
Regarding our options:
Option A) I could wait for the beta 2 release. Option B) If it's possible, could you send me the Jar files for Spark 3.4? That way, I can give it a try.
I'm fine with waiting, but if you can provide the Jar files, that works too. Let me know what works best for you.
Code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from pyspark.sql.functions import hour, col
from datetime import datetime, timedelta
import os
import sys
import random
# Configuration
HUDI_VERSION = '1.0.0-beta2'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Generate mock event data
def generate_event_data(num_events):
event_types = ["click", "view", "purchase", "signup"]
start_time = datetime(2023, 1, 1)
data = []
for i in range(num_events):
event = {
"event_id": i + 1,
"user_id": random.randint(1, 100),
"event_type": random.choice(event_types),
"timestamp": (start_time + timedelta(hours=random.randint(0, 5000))).strftime("%Y-%m-%d %H:%M:%S")
}
data.append(event)
return data
# Create DataFrame
num_events = 10000
events_data = generate_event_data(num_events)
df = spark.createDataFrame(events_data)
df.show()
# Write DataFrame to Hudi table
table_name = "web_events"
path = f'file:///Users/soumilshah/Desktop/{table_name}/'
df.write.format("hudi") \
.option("hoodie.table.name", table_name) \
.option("hoodie.datasource.write.recordkey.field", "event_id") \
.option("hoodie.datasource.write.partitionpath.field", "") \
.option("hoodie.datasource.write.precombine.field", "timestamp") \
.option("hoodie.table.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.option("path", path) \
.mode("overwrite") \
.saveAsTable(table_name)
# Create functional index on timestamp column
query_create_ts_datestr = """
CREATE INDEX IF NOT EXISTS ts_datestr ON web_events
USING column_stats(timestamp)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(query_create_ts_datestr).show()
# Query data for a specific date
spark.sql("""
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show()
# Explain query plan for date-based query
spark.sql("""
EXPLAIN
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show(truncate=False)
# Create functional index on hour of timestamp
query_create_ts_hour = """
CREATE INDEX ts_hour ON web_events
USING column_stats(timestamp)
OPTIONS(func='hour')
"""
spark.sql(query_create_ts_hour)
# Query data aggregated by hour
spark.sql("""
SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count
FROM web_events
GROUP BY hour(timestamp)
""").show()
Questions How do I verify that the query is using the functional index?
spark.sql("""
EXPLAIN
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show(truncate=False)
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Physical Plan ==\n*(1) Project [event_type#126, user_id#128L, event_id#125L]\n+- *(1) Filter (isnotnull(timestamp#127) AND (date_format(cast(timestamp#127 as timestamp), yyyy-MM-dd, Some(America/New_York)) = 2023-06-17))\n +- *(1) ColumnarToRow\n +- FileScan parquet spark_catalog.default.web_events[event_id#125L,event_type#126,timestamp#127,user_id#128L] Batched: true, DataFilters: [isnotnull(timestamp#127), (date_format(cast(timestamp#127 as timestamp), yyyy-MM-dd, Some(Americ..., Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/Users/soumilshah/Desktop/web_events], PartitionFilters: [], PushedFilters: [IsNotNull(timestamp)], ReadSchema: struct<event_id:bigint,event_type:string,timestamp:string,user_id:bigint>\n\n|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
spark.sql("""
EXPLAIN
SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count
FROM web_events
GROUP BY hour(timestamp)
""").show(truncate=False)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Physical Plan ==\nAdaptiveSparkPlan isFinalPlan=false\n+- HashAggregate(keys=[_groupingexpression#242], functions=[count(1)])\n +- Exchange hashpartitioning(_groupingexpression#242, 200), ENSURE_REQUIREMENTS, [plan_id=240]\n +- HashAggregate(keys=[_groupingexpression#242], functions=[partial_count(1)])\n +- Project [hour(cast(timestamp#217 as timestamp), Some(America/New_York)) AS _groupingexpression#242]\n +- FileScan parquet spark_catalog.default.web_events[timestamp#217] Batched: true, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/Users/soumilshah/Desktop/web_events], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:string>\n\n|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
If someone could help me understand how to verify that functional indexes are working, I would greatly appreciate it. I'm not an expert in Spark UI, so any guidance or pointers would be very helpful.
@ad1happy2go
just updating this thread I did small test
Before Index Creation
spark.read.format("hudi") \
.option("hoodie.enable.data.skipping", "true") \
.option("hoodie.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.load(path) \
.createOrReplaceTempView("snapshots")
spark.sql("""
SELECT * FROM snapshots
""").printSchema()
result = spark.sql("""
SELECT
event_type, user_id, event_id
FROM
snapshots
WHERE
date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""")
result.show()
result.explain(True)
After creating Index
query_create_ts_datestr = f"""
CREATE INDEX IF NOT EXISTS ts_datestr
ON
web_events
USING
column_stats(timestamp)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
result = spark.sql(query_create_ts_datestr)
result = spark.sql("""
SELECT
event_type, user_id, event_id
FROM
web_events
WHERE
date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""")
result.show()
result.explain(True)
I do see difference in query time its faster what else I should see to ensure this is working as expected ?
agar, could you guide me on which metrics I should focus on? Specifically, should I be looking at query execution time or the total amount of data scanned? What are the key metrics I should pay attention to?
Hi @soumilshah1995 , there are two things you can look at. First when you do EXPLAIN ANALYZE, the plan should show that it is using HoodieFileIndex
. Second, the "number of files read" in Spark UI should show lesser number of files if any files were skipped using functional index (or any index for that matter). Another way is to enable debug logs for org.apache.hudi package when you launch spark-sql, and then upon executing the query, you will see something called "skipping ratio" which tells you the percentage of files skipped. Note that if the files are already pruned due to parition pruning, and then all of those pruned files need to be scanned as per the query predicate, then skipping ratio can be 0. Only when there is additional file pruning on top of partition pruning, you will find that skipping ratio is positive.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from pyspark.sql.functions import hour, col
from datetime import datetime, timedelta
import os
import sys
import random
# Configuration
HUDI_VERSION = '1.0.0-beta2'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Generate mock event data
def generate_event_data(num_events):
event_types = ["click", "view", "purchase", "signup"]
start_time = datetime(2023, 1, 1)
data = []
for i in range(num_events):
event = {
"event_id": i + 1,
"user_id": random.randint(1, 100),
"event_type": random.choice(event_types),
"timestamp": (start_time + timedelta(hours=random.randint(0, 5000))).strftime("%Y-%m-%d %H:%M:%S")
}
data.append(event)
return data
# Create DataFrame
num_events = 10000
events_data = generate_event_data(num_events)
df = spark.createDataFrame(events_data)
df.show()
# Write DataFrame to Hudi table
table_name = "web_events"
path = f'file:///Users/soumilshah/Desktop/{table_name}/'
df.write.format("hudi") \
.option("hoodie.table.name", table_name) \
.option("hoodie.datasource.write.recordkey.field", "event_id") \
.option("hoodie.datasource.write.partitionpath.field", "") \
.option("hoodie.datasource.write.precombine.field", "timestamp") \
.option("hoodie.table.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.option("path", path) \
.mode("overwrite") \
.saveAsTable(table_name)
# Create functional index on timestamp column
query_create_ts_datestr = """
CREATE INDEX IF NOT EXISTS ts_datestr ON web_events
USING column_stats(timestamp)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(query_create_ts_datestr).show()
# Query data for a specific date
spark.sql("""
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show()
# Explain query plan for date-based query
spark.sql("""
EXPLAIN
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show(truncate=False)
# Create functional index on hour of timestamp
query_create_ts_hour = """
CREATE INDEX ts_hour ON web_events
USING column_stats(timestamp)
OPTIONS(func='hour')
"""
spark.sql(query_create_ts_hour)
# Query data aggregated by hour
spark.sql("""
SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count
FROM web_events
GROUP BY hour(timestamp)
""").show()
Is this a good example to observe a reduction in the number of files read before and after creating a functional index? If not, what changes should I make to the dataset or my query method to clearly see a decrease in the number of files read? I look forward to your guidance on this.
spoke to Sudha she gave me some nice feedback I will try those items