feathr
feathr copied to clipboard
ADLS/WASB/S3 enabled configurations are not honored
Currently we have configurations in the YAML file to configure whether users use ADLS/WASB/S3 to ingest offline stores etc. However they are not honored (i.e. Feathr will still try to read S3 section even it's set to False). We'd better fix this.
Can you add more details to the ticket please?
Ideally all the offline stores, online stores, etc. should be pluggable (i.e. if they are not configured, Feathr should just ignore them).
However this is not the case, for example if I'm using the following code:
from feathr import AvroJsonSchema
from feathr import KafKaSource
from feathr import KafkaConfig
from typing import List
import os
import random
from datetime import datetime, timedelta
from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING,
DerivedFeature, Feature, FeatureAnchor, HdfsSource,
TypedKey, ValueType, WindowAggTransformation)
from feathr import FeathrClient
from pyspark.sql import DataFrame
import os
from pathlib import Path
import pytest
from feathr import MaterializationSettings, RedisSink
import tempfile
yaml_config = """
# DO NOT MOVE OR DELETE THIS FILE
# This file contains the configurations that are used by Feathr
# All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of this config file.
# For example, `feathr_runtime_location` for databricks can be overwritten by setting this environment variable:
# SPARK_CONFIG__DATABRICKS__FEATHR_RUNTIME_LOCATION
# Another example would be overwriting Redis host with this config: `ONLINE_STORE__REDIS__HOST`
# For example if you want to override this setting in a shell environment:
# export ONLINE_STORE__REDIS__HOST=feathrazure.redis.cache.windows.net
# version of API settings
api_version: 1
project_config:
project_name: 'project_feathr_integration_test'
# Information that are required to be set via environment variables.
offline_store:
# paths starts with abfss:// or abfs://
# ADLS_ACCOUNT and ADLS_KEY should be set in environment variable if this is set to true
adls:
adls_enabled: true
# paths starts with wasb:// or wasbs://
# WASB_ACCOUNT and WASB_KEY should be set in environment variable
wasb:
wasb_enabled: true
# jdbc endpoint
jdbc:
jdbc_enabled: true
jdbc_database: 'feathrtestdb'
jdbc_table: 'feathrtesttable'
# snowflake endpoint
snowflake:
url: "dqllago-ol19457.snowflakecomputing.com"
user: "feathrintegration"
role: "ACCOUNTADMIN"
spark_config:
# choice for spark runtime. Currently support: azure_synapse, databricks
# The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.
spark_cluster: 'databricks'
# configure number of parts for the spark output for feature generation job
spark_result_output_parts: '1'
azure_synapse:
dev_url: 'https://feathrazuretest3synapse.dev.azuresynapse.net'
pool_name: 'spark3'
# workspace dir for storing all the required configuration files and the jar resources
workspace_dir: 'abfss://[email protected]/feathr_test_workspace'
executor_size: 'Small'
executor_num: 4
# Feathr Job configuration. Support local paths, path start with http(s)://, and paths start with abfs(s)://
# this is the default location so end users don't have to compile the runtime again.
# feathr_runtime_location: wasbs://[email protected]/feathr-assembly-0.5.0-SNAPSHOT.jar
feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.5.0.jar"
databricks:
# workspace instance
workspace_instance_url: 'https://adb-2474129336842816.16.azuredatabricks.net/'
workspace_token_value: ''
# config string including run time information, spark version, machine size, etc.
# the config follows the format in the databricks documentation: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs
config_template: {"run_name":"FEATHR_FILL_IN","new_cluster":{"spark_version":"9.1.x-scala2.12","num_workers":2,"spark_conf":{"FEATHR_FILL_IN":"FEATHR_FILL_IN"},"instance_pool_id":"0403-214809-inlet434-pool-l9dj3kwz"},"libraries":[{"jar":"FEATHR_FILL_IN"}],"spark_jar_task":{"main_class_name":"FEATHR_FILL_IN","parameters":["FEATHR_FILL_IN"]}}
# Feathr Job location. Support local paths, path start with http(s)://, and paths start with dbfs:/
work_dir: 'dbfs:/feathr_getting_started'
# this is the default location so end users don't have to compile the runtime again.
feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.5.0.jar"
online_store:
redis:
# Redis configs to access Redis cluster
host: 'feathrazuretest3redis.redis.cache.windows.net'
port: 6380
ssl_enabled: True
feature_registry:
# The API endpoint of the registry service
api_endpoint: "https://feathr-sql-registry.azurewebsites.net/api/v1"
monitoring:
database:
sql:
url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres'
user: "demo"
"""
import os
os.environ['REDIS_PASSWORD'] = '='
os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = ''
os.environ['KAFKA_SASL_JAAS_CONFIG'] = 'Endpoint=sb://feathrazureci.servicebus.windows.net/;SharedAccessKeyName=feathrcipolicy;SharedAccessKey=\"=\"'
# write this configuration string to a temporary location and load it to Feathr
tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)
with open(tmp.name, "w") as text_file:
text_file.write(yaml_config)
client = FeathrClient(config_path=tmp.name)
schema = AvroJsonSchema(schemaStr="""
{
"type": "record",
"name": "DriverTrips",
"fields": [
{"name": "driver_id", "type": "long"},
{"name": "trips_today", "type": "int"},
{
"name": "datetime",
"type": {"type": "long", "logicalType": "timestamp-micros"}
}
]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
topics=["feathrcieventhub"],
schema=schema)
)
driver_id = TypedKey(key_column="driver_id",
key_column_type=ValueType.INT64,
description="driver id",
full_name="nyc driver id")
kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
source=stream_source,
features=[Feature(name="f_modified_streaming_count",
feature_type=INT32,
transform="trips_today + 1",
key=driver_id),
Feature(name="f_modified_streaming_count2",
feature_type=INT32,
transform="trips_today + 2",
key=driver_id)]
)
client.build_features(anchor_list=[kafkaAnchor])
redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=10000)
settings = MaterializationSettings(name="kafkaSampleDemo",
sinks=[redisSink],
feature_names=['f_modified_streaming_count']
)
client.materialize_features(settings)
client.wait_job_to_finish(timeout_sec=600)
Feathr will still try to look for offline_store__snowflake__url
, or offline_store__snowflake__user
etc. although I didn't configure that at all.
We should use this config:
wasb:
wasb_enabled: true
and if this is set to False, we should not look for those env variables.
A good example is for monitoring:
monitoring:
database:
sql:
url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres'
user: "demo"
If the end user is not using the monitoring config, the configs are not appended to the spark job:
https://github.com/linkedin/feathr/blob/main/feathr_project/feathr/client.py#L609-L612
Solved by PR: https://github.com/feathr-ai/feathr/pull/545