tempo icon indicating copy to clipboard operation
tempo copied to clipboard

asofJoin extremely slow when result has 99 or more columns

Open rzsquirrel opened this issue 3 years ago • 7 comments

asofJoins that result in a dataframe with >=99 columns seem to cause unnecessary spark operations compared to asofJoins that result in <99 columns. These extraneous operations are associated with a massive increase in runtime. Below is an exported source from a databricks notebook that describes and replicates the issue. With 10,000 rows, the example with 99 columns took more than 10x the time to run compared to the example with 98 columns. Performing asofJoins with larger datasets (billions of rows, hundreds of columns) is pretty much infeasible. Tests were conducted on a databricks cluster with the following: DBR 9.1 LTS ML | Spark 3.1.2 | Scala 2.12.

# Databricks notebook source
# MAGIC %md
# MAGIC 
# MAGIC # Tempo Inefficiency Bug
# MAGIC 
# MAGIC There seems to be a bug where performing an asofJoin with too many columns runs differently under the hood in a way that is more than 10x slower than an asofJoin with fewer columns. This notebook replicates the bug.

# COMMAND ----------

# MAGIC %md
# MAGIC ### Install tempo

# COMMAND ----------

# MAGIC %pip install pip==20.2.4

# COMMAND ----------

# MAGIC %pip install -e git+https://github.com/databrickslabs/tempo.git#"egg=tempo&#subdirectory=python"

# COMMAND ----------

import re

import numpy as np
import pandas as pd

import tempo

# COMMAND ----------

# MAGIC %md
# MAGIC 
# MAGIC ### Define Utility Functions

# COMMAND ----------

def to_tsdf(df):
  return tempo.TSDF(df.withColumnRenamed('packet_seq', 'event_ts'), ts_col="event_ts", partition_cols=["partition_col"])

# COMMAND ----------

def make_df(nrows, ncols, name):
  """
  Utility to make dataframes of arbitrary size.
  We will need partition_col as a partition column and event_ts as the time-series column, mimicking the schema that we typically use.
  """
  mat = np.random.randint(low=0, high=3, size=(nrows, ncols))
  df = pd.DataFrame(mat)
  df = df.rename(columns={0:'event_ts', 1:'partition_col'})
  df = df.rename(columns={x:f"{str(x)}_{name}" for x in range(2,ncols)})
  df = df.sort_values(by=['partition_col', 'event_ts'])
  return spark.createDataFrame(df)

# COMMAND ----------

# MAGIC %md
# MAGIC ### Example 1 (no bug)
# MAGIC We asofJoin a 50-column df with a 49-column df for a result containing 50 + 49 - 1 = 98 columns (the event_ts column is counted twice). Look at the spark ui to see what a normal asofJoin does under the hood.

# COMMAND ----------

# NUM_ROWS = 10  # runs fast and let's you see the difference in spark ui
NUM_ROWS = 10000  # noticeable runtime difference with this many rows

# COMMAND ----------

tsdf1 = to_tsdf(make_df(NUM_ROWS, 50, 'left'))
tsdf2 = to_tsdf(make_df(NUM_ROWS, 49, 'right'))

# COMMAND ----------

tsdf3 = tsdf1.asofJoin(tsdf2)
print(len(tsdf3.df.columns))
df = tsdf3.df.toPandas()

# COMMAND ----------

# MAGIC %md
# MAGIC ### Example 2 (bugged)
# MAGIC Now we try two 50-feature dataframes for a result with 99 columns. The runtime for this will be much longer than the previous asofJoin, despite only having one additional column. (This difference is more noticeable with higher values of NUM_ROWS.)
# MAGIC 
# MAGIC My guess is that tempo hits some threshold of 100 under the hood, at which point something dumb happens. The spark ui should show something quite different compared to before. (Note the giant stack of `Project` and `RunningWindowFunction` blocks in the second spawned job.)

# COMMAND ----------

tsdf1 = to_tsdf(make_df(NUM_ROWS, 50, 'left'))
tsdf2 = to_tsdf(make_df(NUM_ROWS, 50, 'right'))  # we now have one more feature column

# COMMAND ----------

tsdf3 = tsdf1.asofJoin(tsdf2)
print(len(tsdf3.df.columns))
df = tsdf3.df.toPandas()

# COMMAND ----------

# MAGIC %md
# MAGIC 
# MAGIC A potential workaround for a tempo user might be to split the right dataframe column-wise and perform two asofJoins using only the time-series + partition cols of the left dataframe. This would pick the needed rows out of the right dataframe, at which point we could perform normal joins to get the final result.

rzsquirrel avatar Nov 04 '21 19:11 rzsquirrel

Thanks for the note @rzsquirrel , this is actually due to catalyst not being enabled with there are 100+ columns and is a Spark-related effect. Putting the values in a struct might be able to help but will take a look at your suggestion here.

rportilla-databricks avatar Nov 04 '21 19:11 rportilla-databricks

Thanks for the quick response! My suggestion at the end is more of a hacky workaround from the perspective of a tempo user. If there's a fix that can be made to tempo to allow fast asofJoins with larger dataframes it would be greatly appreciated.

rzsquirrel avatar Nov 04 '21 20:11 rzsquirrel

@rzsquirrel , as a workaround, can you try updating this parameter to a column count higher than 100? spark.sql.codegen.maxFields This is why codegen fails to be enabled - I'm wondering if this works as a short-term workaround and we can plan for a long-term fix.

rportilla-databricks avatar Nov 04 '21 20:11 rportilla-databricks

@rzsquirrel , as a workaround, can you try updating this parameter to a column count higher than 100? spark.sql.codegen.maxFields This is why codegen fails to be enabled - I'm wondering if this works as a short-term workaround and we can plan for a long-term fix.

This works!!! thank you 🙏

rzsquirrel avatar Nov 04 '21 20:11 rzsquirrel

hmmn. I set spark.sql.codegen.maxFields to 5000 and it seems to fix some cases but I noticed the following when doing asofJoins: 105 cols joined into 95 cols: good 99 cols joined into 99 cols: good 99 cols joined into 100 cols: bad 98 cols joined into 100 cols: bad 100 cols joined into 98 cols: good 70 cols joined into 100 cols: good Where "bad" means I saw those unnecessary spark operations going on. Interestingly, the run times seem quite a bit better even when those extra operations appear, but I should probably leave this open for visibility until we have the long-term fix.

rzsquirrel avatar Nov 04 '21 21:11 rzsquirrel

@rzsquirrel , coming back to this issue, can you provide the timing on your tests above? When joining 100 cols to 100 columns, I'm getting a 20s runtime for the AS OF join. We've made some improvements on some of the internal implementation. But longer term, the Databricks photon engine should improve the catalyst limitations. 200 cols joined to 200 cols also is reasonable (59 seconds). Is there an ideal runtime you are benchmarking against?

rportilla-databricks avatar Apr 06 '22 19:04 rportilla-databricks

I remember that it went from a matter of seconds to several minutes going from 98 to 99 columns on the small 10k rows example. My expectation was only that adding one extra column shouldn't matter a lot. We have an implementation that performs asofJoins only on the ts column before doing standard left joins on the already asof-joined ts column values. I think the numbers you state there seem reasonable.

rzsquirrel avatar Apr 13 '22 03:04 rzsquirrel

Closing as resolved.

tnixon avatar Apr 25 '23 16:04 tnixon