feathr icon indicating copy to clipboard operation
feathr copied to clipboard

ADLS/WASB/S3 enabled configurations are not honored

Open xiaoyongzhu opened this issue 2 years ago • 4 comments

Currently we have configurations in the YAML file to configure whether users use ADLS/WASB/S3 to ingest offline stores etc. However they are not honored (i.e. Feathr will still try to read S3 section even it's set to False). We'd better fix this.

xiaoyongzhu avatar Jun 09 '22 21:06 xiaoyongzhu

Can you add more details to the ticket please?

jainr avatar Jul 14 '22 00:07 jainr

Ideally all the offline stores, online stores, etc. should be pluggable (i.e. if they are not configured, Feathr should just ignore them).

However this is not the case, for example if I'm using the following code:


from feathr import AvroJsonSchema
from feathr import KafKaSource
from feathr import KafkaConfig
from typing import List
import os
import random
from datetime import datetime, timedelta

from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING,
                    DerivedFeature, Feature, FeatureAnchor, HdfsSource,
                    TypedKey, ValueType, WindowAggTransformation)
from feathr import FeathrClient
from pyspark.sql import DataFrame
import os
from pathlib import Path
import pytest
from feathr import MaterializationSettings, RedisSink
import tempfile





yaml_config = """
# DO NOT MOVE OR DELETE THIS FILE

# This file contains the configurations that are used by Feathr
# All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of this config file.
# For example, `feathr_runtime_location` for databricks can be overwritten by setting this environment variable:
# SPARK_CONFIG__DATABRICKS__FEATHR_RUNTIME_LOCATION
# Another example would be overwriting Redis host with this config: `ONLINE_STORE__REDIS__HOST`
# For example if you want to override this setting in a shell environment:
# export ONLINE_STORE__REDIS__HOST=feathrazure.redis.cache.windows.net

# version of API settings
api_version: 1
project_config:
  project_name: 'project_feathr_integration_test'
  # Information that are required to be set via environment variables.

offline_store:
  # paths starts with abfss:// or abfs://
  # ADLS_ACCOUNT and ADLS_KEY should be set in environment variable if this is set to true
  adls:
    adls_enabled: true

  # paths starts with wasb:// or wasbs://
  # WASB_ACCOUNT and WASB_KEY should be set in environment variable
  wasb:
    wasb_enabled: true


  # jdbc endpoint
  jdbc:
    jdbc_enabled: true
    jdbc_database: 'feathrtestdb'
    jdbc_table: 'feathrtesttable'

  # snowflake endpoint
  snowflake:
    url: "dqllago-ol19457.snowflakecomputing.com"
    user: "feathrintegration"
    role: "ACCOUNTADMIN"

spark_config:
  # choice for spark runtime. Currently support: azure_synapse, databricks
  # The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.
  spark_cluster: 'databricks'
  # configure number of parts for the spark output for feature generation job
  spark_result_output_parts: '1'

  azure_synapse:
    dev_url: 'https://feathrazuretest3synapse.dev.azuresynapse.net'
    pool_name: 'spark3'
    # workspace dir for storing all the required configuration files and the jar resources
    workspace_dir: 'abfss://[email protected]/feathr_test_workspace'
    executor_size: 'Small'
    executor_num: 4
    # Feathr Job configuration. Support local paths, path start with http(s)://, and paths start with abfs(s)://
    # this is the default location so end users don't have to compile the runtime again.
    # feathr_runtime_location: wasbs://[email protected]/feathr-assembly-0.5.0-SNAPSHOT.jar
    feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.5.0.jar"
  databricks:
    # workspace instance
    workspace_instance_url: 'https://adb-2474129336842816.16.azuredatabricks.net/'
    workspace_token_value: ''
    # config string including run time information, spark version, machine size, etc.
    # the config follows the format in the databricks documentation: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs
    config_template: {"run_name":"FEATHR_FILL_IN","new_cluster":{"spark_version":"9.1.x-scala2.12","num_workers":2,"spark_conf":{"FEATHR_FILL_IN":"FEATHR_FILL_IN"},"instance_pool_id":"0403-214809-inlet434-pool-l9dj3kwz"},"libraries":[{"jar":"FEATHR_FILL_IN"}],"spark_jar_task":{"main_class_name":"FEATHR_FILL_IN","parameters":["FEATHR_FILL_IN"]}}
    # Feathr Job location. Support local paths, path start with http(s)://, and paths start with dbfs:/
    work_dir: 'dbfs:/feathr_getting_started'
    # this is the default location so end users don't have to compile the runtime again.
    feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.5.0.jar"

online_store:
  redis:
    # Redis configs to access Redis cluster
    host: 'feathrazuretest3redis.redis.cache.windows.net'
    port: 6380
    ssl_enabled: True

feature_registry:
  # The API endpoint of the registry service
  api_endpoint: "https://feathr-sql-registry.azurewebsites.net/api/v1"

monitoring:
  database:
    sql:
      url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres'
      user: "demo"


"""

import os
os.environ['REDIS_PASSWORD'] = '='
os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = ''
os.environ['KAFKA_SASL_JAAS_CONFIG'] = 'Endpoint=sb://feathrazureci.servicebus.windows.net/;SharedAccessKeyName=feathrcipolicy;SharedAccessKey=\"=\"'



# write this configuration string to a temporary location and load it to Feathr
tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)
with open(tmp.name, "w") as text_file:
    text_file.write(yaml_config)

client = FeathrClient(config_path=tmp.name)
schema = AvroJsonSchema(schemaStr="""
{
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
            "name": "datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"}
        }
    ]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
                            kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
                                                    topics=["feathrcieventhub"],
                                                    schema=schema)
                            )

driver_id = TypedKey(key_column="driver_id",
                        key_column_type=ValueType.INT64,
                        description="driver id",
                        full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
                                    source=stream_source,
                                    features=[Feature(name="f_modified_streaming_count",
                                                    feature_type=INT32,
                                                    transform="trips_today + 1",
                                                    key=driver_id),
                                            Feature(name="f_modified_streaming_count2",
                                                    feature_type=INT32,
                                                    transform="trips_today + 2",
                                                    key=driver_id)]
                                    )
client.build_features(anchor_list=[kafkaAnchor])


redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=10000)
settings = MaterializationSettings(name="kafkaSampleDemo",
                                sinks=[redisSink],
                                feature_names=['f_modified_streaming_count']
                                )
client.materialize_features(settings)
client.wait_job_to_finish(timeout_sec=600)

Feathr will still try to look for offline_store__snowflake__url, or offline_store__snowflake__user etc. although I didn't configure that at all.

We should use this config:

  wasb:
    wasb_enabled: true

and if this is set to False, we should not look for those env variables.

xiaoyongzhu avatar Jul 14 '22 08:07 xiaoyongzhu

A good example is for monitoring:

monitoring:
  database:
    sql:
      url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres'
      user: "demo"

If the end user is not using the monitoring config, the configs are not appended to the spark job:

https://github.com/linkedin/feathr/blob/main/feathr_project/feathr/client.py#L609-L612

xiaoyongzhu avatar Jul 14 '22 08:07 xiaoyongzhu

Solved by PR: https://github.com/feathr-ai/feathr/pull/545

enya-yx avatar Sep 21 '22 07:09 enya-yx