pathling icon indicating copy to clipboard operation
pathling copied to clipboard

Cannot get expression from valueQuantity

Open lakime opened this issue 2 years ago • 4 comments

Describe the bug While trying to fetch data from Observation entity - valueQuantity, from data generated via synthea - using databricks - I do receive error Libraries: au.csiro.pathling:library-api:6.3.1 latest pathling installed using pypi

IllegalArgumentException: requirement failed: All input types must be the same except nullable, containsNull, valueContainsNull flags. The expression is: if ((NOT instanceof(assertnotnull(input[0, org.hl7.fhir.r4.model.Observation, true]).getValue, class org.hl7.fhir.r4.model.Quantity) OR isnull(objectcast(assertnotnull(input[0, org.hl7.fhir.r4.model.Observation, true]).getValue, ObjectType(class org.hl7.fhir.r4.model.Quantity))))) null else named_struct(value, staticinvoke(class org.apache.spark.sql.types.Decimal, DecimalType(32,6), apply, if (instanceof(assertnotnull(input[0, org.hl7.fhir.r4.model.Observation, true]).getValue, class org.hl7.fhir.r4.model.Quantity)) objectcast(assertnotnull(input[0, org.hl7.fhir.r4.model.Observation, true]).getValue, ObjectType(class org.hl7.fhir.r4.model.Quantity)) else null.getValueElement.getValue, true, true, true)). The input types found are StructType(StructField(id,StringType,true),StructField(value,DecimalType(32,6),true),StructField(value_scale,IntegerType,true),StructField(comparator,StringType,true),StructField(unit,StringType,true),StructField(system,StringType,true),StructField(code,StringType,true),StructField(_value_canonicalized,StructType(StructField(value,DecimalType(38,0),true),StructField(scale,IntegerType,true)),true),StructField(_code_canonicalized,StringType,true)) StructType(StructField(value,DecimalType(32,6),true)).

If I will remove "valueQuantity" - it works as expected

To Reproduce

Observation - To be checked quantities

observationfhir = json_resources.extract("Observation", columns=[ exp("id", "Identifier"), exp("status", "status"), exp("category.first().coding.first().code", "category"), exp("code.coding.code", "Observation_Code"), exp("code.coding.display", "Observation_Name"), exp("code.text", "Observation_Text"), exp("subject.reference", "Subject_Reference"), exp("encounter.reference", "Encounter_Reference"), exp("valueQuantity.value","Value_Quantity") ] )

observationfhir = observationfhir.withColumn('source',lit('payorq')).withColumn('sourceFile',lit(today)).withColumn('Value_Quantity', col('Value_Quantity').cast("string")) display(observationfhir)

Expected behavior values from FHIR files

lakime avatar Oct 20 '23 19:10 lakime

Thanks @lakime, we have reproduced the issue and are working on a fix.

johngrimes avatar Oct 23 '23 07:10 johngrimes

Probably it is wrong construct - as:

from pathling import PathlingContext, Expression as exp from pyspark.sql.functions import split, explode, col, lit, expr, cast from datetime import date

today = date.today() pc = PathlingContext.create() ndjson_dir = 'dbfs:/mnt/hda/raw/payorq/landing/' json_resources = pc.read.ndjson(ndjson_dir)

I am able to fetch the data using sql query

%sql SELECT valueQuantity.value as Value_Quantity_Value, valueQuantity.unit as Value_Quantity_Unit, FROM bronzeraw.observation;

lakime avatar Oct 23 '23 09:10 lakime

Hi @lakime,

We have done a bit of work to figure out what is happening here.

This is essentially caused by a bug in Spark, or an inability of Spark to deal with the expressions that we generate in certain scenarios. We're working on creating a bug report for this.

This behaviour is specific to reading data directly from a raw FHIR source, such as NDJSON or Bundles.

There are two workarounds, the first one is to simply cache the datasets involved in the query before running extract:

pc = PathlingContext.create()
ndjson_dir = 'dbfs:/mnt/hda/raw/payorq/landing/'
json_resources = pc.read.ndjson(ndjson_dir)

json_resources.read('Observation').cache()

observationfhir = json_resources.extract(  #...

The other workaround is to set the configuration parameter spark.sql.optimizer.nestedSchemaPruning.enabled to false:

spark = (
    SparkSession.builder
    .config("spark.sql.optimizer.nestedSchemaPruning.enabled", "false")
    .getOrCreate()
)

pc = PathlingContext.create(spark)

Perhaps you could try this and let us know if this solves your problem?

johngrimes avatar Oct 27 '23 01:10 johngrimes

Reported to spark as a bug: SPARK-45766

piotrszul avatar Nov 02 '23 00:11 piotrszul