feast
feast copied to clipboard
ValueError: Stream feature views need a stream source, expected one of {'KafkaSource', 'PushSource'} or CUSTOM_SOURCE, got KinesisSource:
Expected Behavior
Deploying infrastructure for project
Current Behavior
ValueError: Stream feature views need a stream source, expected one of {'PushSource', 'KafkaSource'} or CUSTOM_SOURCE, got KinesisSource: birthdates_streams_source instead
Steps to reproduce
from datetime import timedelta
from feast import (
FileSource,
KinesisSource,
CustomSource
)
from feast.data_format import JsonFormat
from feast_poc.config import DATA_CATALOG
candidate_base_features_source = FileSource(
name="candidates_base_features",
path=str(DATA_CATALOG.get("candidate_base_features")),
timestamp_field="updated_at", # use for ttl
created_timestamp_column="created_at",
)
# Kinesis
birthdates_streams_source = KinesisSource(
name="birthdates_streams_source",
stream_name="job_applications",
timestamp_field="event_timestamp",
batch_source=candidate_base_features_source,
record_format=JsonFormat(
schema_json="candidate_id integer, event_timestamp timestamp, birthdate timestamp"
),
# watermark_delay_threshold=timedelta(minutes=5),
region='eu-west-1'
)
from datetime import timedelta
from pyspark.sql import DataFrame
from feast import (
FeatureView,
Field,
)
from feast.stream_feature_view import stream_feature_view
from feast.types import Float32, Int32, UnixTimestamp, String
from data_sources import *
from entities import *
# Defining the first set of features
candidate_base_features_view = FeatureView(
name="candidate_base_features_view",
description="Birthdate features",
entities=[candidate],
ttl=timedelta(days=365), # The time that the features in the feature view should be cached for
schema=[
Field(name="country_id", dtype=Int32),
Field(name="lat", dtype=Float32),
Field(name="lng", dtype=Float32),
Field(name="birthdate", dtype=UnixTimestamp)
],
source=candidate_base_features_source,
online=True,
tags={"team": "data_labs"},
owner="[email protected]"
)
@stream_feature_view(
entities=[candidate],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="birthdate", dtype=String)
],
timestamp_field="event_timestamp",
online=True,
source=birthdates_streams_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
func = udf (lambda x: datetime.strptime(x, '%Y-%m/%d'), DateType())
return (
df.withColumn("birthdate", func(col('first')))
)
Specifications
- Version: Feast SDK Version: "feast 0.24.1"
- Platform: Mac M1
- Subsystem:
Possible Solution
The code in the Kinesis documentation doesn't work for me neither. Should it be managed here?
https://github.com/feast-dev/feast/blob/175fd256e0d21f6539f68708705bddf1caa3d975/sdk/python/feast/stream_feature_view.py#L30
Hello! @CarlosVecina @diegofjb Yes, currently we only did testing for kafka and push source. If you would like to use a kinesis source, add a KinesisSource exactly where Carlos mentioned!
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.