dlt-meta icon indicating copy to clipboard operation
dlt-meta copied to clipboard

no tables are defined by the library of this pipeline . This error usually occurs when there are view definitions

Open bhavnagupta opened this issue 2 years ago • 4 comments

Hello,

I am getting this error while initializing the dlt pipeline for bronze table. Please suggest what might be the cause. My source is eventhub.

"no tables are defined by the library of this pipeline. This error usually occurs when there are view definitions"

Thanks

bhavnagupta avatar Sep 01 '23 21:09 bhavnagupta

Hi, I'm running into the same error. I isolated the error to be associated with when I add an "expect_or_quarantine" rule to my bronze_data_quality_expectations_json_dev. If I remove the quarantine part, everything works as expected. How did you solve the error?

pewoInspari avatar Aug 15 '24 13:08 pewoInspari

@pewoInspari can you share onboarding.json and dqe for bronze tables? There are integration tests which incorporates scenario, if you take a look at this dqe which is mapped to bronze_data_quality_expectations. Also note that you need to provide DQE table details along with DQE json rule:

"bronze_data_quality_expectations_json_it": "{dbfs_path}/integration_tests/conf/dqe/customers/bronze_data_quality_expectations.json",
 "bronze_database_quarantine_it": "{uc_catalog_name}.{bronze_schema}",
 "bronze_quarantine_table": "customers_quarantine",
 "bronze_quarantine_table_properties": {
       "pipelines.reset.allowed": "false",
       "pipelines.autoOptimize.zOrderCols": "id, email"
 }

bronze_quarantine_table_properties is optional. You can run integration tests by following steps here

@bhavnagupta if you can comment regarding your issue and possible fix would be great.

ravi-databricks avatar Aug 15 '24 14:08 ravi-databricks

@ravi-databricks Thanks for the quick reply. Please see my onboarding.json and my bronze_data_quality_expectations_json_dev below. I'm using unity catalogue. onboarding:

[
   {
      "data_flow_id": "100",
      "data_flow_group": "A1",
     "source_system": "FO_Serverless", 
     "source_format": "cloudFiles",
     "source_details": {
       "source_database": "dbo",
       "source_table": "GeneralJournalAccountEntry",
       "source_path_dev": "/Volumes/boumatic_d365fo/raw/d365fo/dbo_GeneralJournalAccountEntry/",
       "source_metadata": {
         "include_autoloader_metadata_column": "True",
         "autoloader_metadata_col_name": "source_metadata",
         "select_metadata_cols": {
             "input_file_name": "_metadata.file_name",
             "input_file_path": "_metadata.file_path"
          }
       }   
     },
     "bronze_database_dev": "boumatic_d365fo.bronze_dev",  
     "bronze_table": "general_journal_account_entry", 
     "bronze_partition_columns": "", 
     "bronze_reader_options": {
       "cloudFiles.format": "parquet",
       "cloudFiles.inferColumnTypes": "true",
       "cloudFiles.rescuedDataColumn": "_rescued_data"
     },
     "bronze_table_properties": {
          "pipelines.autoOptimize.managed": "true",
          "pipelines.autoOptimize.zOrderCols": "RECID,LSN"
     },
     "bronze_data_quality_expectations_json_dev": "/Volumes/boumatic_d365fo/raw/d365fo/bronze_data_quality_expectations.json",
     "bronze_database_quarantine_dev": "boumatic_d365fo.dev",
     "bronze_quarantine_table": "journal_entry_quarantine", 
     "silver_database_dev": "boumatic_d365fo.silver_dev",  
     "silver_table": "general_journal_account_entry",  
     "silver_cdc_apply_changes": {
       "keys": [
         "RECID"
       ],
       "sequence_by": "LSN,LastProcessedChange_DateTime",
       "scd_type": "1"
     },
     "silver_transformation_json_dev": "/Volumes/boumatic_d365fo/raw/d365fo/silver_transformation.json"
   }
]

bronze_data_quality_expectations_json_dev:

 [
{
    "expect_or_drop": {
       "no_rescued_data": "_rescued_data IS NULL",
       "Valid_RECID": "RECID IS NOT NULL",
       "Valid_LastProcessedChange_DateTime": "LastProcessedChange_DateTime IS NOT NULL"
    },
    "expect_or_quarantine": {
       "quarantine_rule": "_rescued_data IS NOT NULL OR RECID IS NULL OR LastProcessedChange_DateTime IS NULL"
    }
 }
  ] 

When triggering the DLT bronze pipeline in my Databricks UI, I get the following error: com.databricks.pipelines.execution.core.ExecutionFailedException: [DLT ERROR CODE: NO_TABLES_IN_PIPELINE] No tables are defined by the libraries of this pipeline. This error usually occurs when flows defined without defining the table they target, or when all top-level definitions are views. at com.databricks.pipelines.execution.extensions.workspace.WorkspacePipelineGraphLoader.loadGraph(WorkspacePipelineGraphLoader.scala:190) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePipelineGraphLoader.loadGraph(WorkspacePipelineGraphLoader.scala:54) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePipelineExecutionExtension$.loadGraph(WorkspacePipelineExecutionExtension.scala:18) at com.databricks.pipelines.execution.service.DLTComputeRunnableContext.loadGraph(DLTComputeRunnableContext.scala:124) at com.databricks.pipelines.execution.core.UpdateExecution.initializationForUpdates(UpdateExecution.scala:581) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$initializeAndLoadGraphForRegularUpdate$1(UpdateExecution.scala:697) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$3(DeltaPipelinesUsageLogging.scala:123) at com.databricks.pipelines.common.monitoring.OperationStatusReporter.executeWithPeriodicReporting(OperationStatusReporter.scala:120) at com.databricks.pipelines.common.monitoring.OperationStatusReporter$.executeWithPeriodicReporting(OperationStatusReporter.scala:160) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$6(DeltaPipelinesUsageLogging.scala:143) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionContext(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionTags(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:664) at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:582) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperationWithResultTags(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:573) at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:542) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation(DeltaPipelinesUsageLogging.scala:24) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation0(DeltaPipelinesUsageLogging.scala:67) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$1(DeltaPipelinesUsageLogging.scala:135) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation(DeltaPipelinesUsageLogging.scala:113) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation$(DeltaPipelinesUsageLogging.scala:109) at com.databricks.pipelines.execution.core.UpdateExecution.recordPipelinesOperation(UpdateExecution.scala:70) at com.databricks.pipelines.execution.core.UpdateExecution.executeStage(UpdateExecution.scala:425) at com.databricks.pipelines.execution.core.UpdateExecution.initializeAndLoadGraphForRegularUpdate(UpdateExecution.scala:697) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$executeUpdate$1(UpdateExecution.scala:539) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at com.databricks.pipelines.execution.core.UpdateExecution.executeUpdate(UpdateExecution.scala:538) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$3(UpdateExecution.scala:238) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionContext(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionTags(DeltaPipelinesUsageLogging.scala:24) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging$$anon$1.runWithAttributionTags(DeltaPipelinesUsageLogging.scala:85) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.withDbAttributionTags(DeltaPipelinesUsageLogging.scala:92) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.withDbAttributionTags$(DeltaPipelinesUsageLogging.scala:91) at com.databricks.pipelines.execution.core.UpdateExecution.withDbAttributionTags(UpdateExecution.scala:70) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$1(UpdateExecution.scala:214) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.BaseUCContext.$anonfun$runWithNewUCS$1(BaseUCContext.scala:562) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at com.databricks.pipelines.execution.core.BaseUCContext.runWithNewUCS(BaseUCContext.scala:556) at com.databricks.pipelines.execution.core.UCContextCompanion$OptionUCContextHelper.runWithNewUCSIfAvailable(BaseUCContext.scala:1463) at com.databricks.pipelines.execution.core.UpdateExecution.start(UpdateExecution.scala:201) at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.$anonfun$run$2(ExecutionBackend.scala:697) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.CommandContextUtils$.withCommandContext(CommandContextUtils.scala:99) at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.run(ExecutionBackend.scala:693) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:118) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:81) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:80) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:66) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:115) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:118) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

I tried to look into the source code but I cannot see why the dqe table is not found. When querying the bronze_dataflowspec_table, I can see the quanratineTargetDetails as specified in the onboarding.json.

{"database":"boumatic_d365fo.dev","table":"journal_entry_quarantine","partition_columns":""}

Removing all quarantine related configs from the onboarding and bronze_data_quality_expectations_json_dev, removes the error and I can trigger the DLT as expected.

Any help is appreciated. Thank you.

pewoInspari avatar Aug 16 '24 06:08 pewoInspari

@pewoInspari please correct your bronze_data_quality_expectations.json as below after removing square brackets [], Rerun onboarding. After running onboarding please check your bronze dataflowspec table for dataQualityExpectations column if it contains below entry

{
    "expect_or_drop": {
       "no_rescued_data": "_rescued_data IS NULL",
       "Valid_RECID": "RECID IS NOT NULL",
       "Valid_LastProcessedChange_DateTime": "LastProcessedChange_DateTime IS NOT NULL"
    },
    "expect_or_quarantine": {
       "quarantine_rule": "_rescued_data IS NOT NULL OR RECID IS NULL OR LastProcessedChange_DateTime IS NULL"
    }
 }

ravi-databricks avatar Aug 16 '24 16:08 ravi-databricks