feast icon indicating copy to clipboard operation
feast copied to clipboard

ValueError: Stream feature views need a stream source, expected one of {'KafkaSource', 'PushSource'} or CUSTOM_SOURCE, got KinesisSource:

Open diegofjb opened this issue 3 years ago • 2 comments

Expected Behavior

Deploying infrastructure for project

Current Behavior

ValueError: Stream feature views need a stream source, expected one of {'PushSource', 'KafkaSource'} or CUSTOM_SOURCE, got KinesisSource: birthdates_streams_source instead 

Steps to reproduce

from datetime import timedelta

from feast import (
    FileSource,
    KinesisSource,
    CustomSource
)
from feast.data_format import JsonFormat
from feast_poc.config import DATA_CATALOG

candidate_base_features_source = FileSource(
    name="candidates_base_features",
    path=str(DATA_CATALOG.get("candidate_base_features")),
    timestamp_field="updated_at", # use for ttl
    created_timestamp_column="created_at",
)

# Kinesis
birthdates_streams_source = KinesisSource(
    name="birthdates_streams_source",
    stream_name="job_applications",
    timestamp_field="event_timestamp",
    batch_source=candidate_base_features_source,
    record_format=JsonFormat(
        schema_json="candidate_id integer, event_timestamp timestamp, birthdate timestamp"
    ),
    # watermark_delay_threshold=timedelta(minutes=5),
    region='eu-west-1'
)

from datetime import timedelta
from pyspark.sql import DataFrame

from feast import (
    FeatureView,
    Field,
)
from feast.stream_feature_view import stream_feature_view
from feast.types import Float32, Int32, UnixTimestamp, String

from data_sources import *
from entities import *

# Defining the first set of features
candidate_base_features_view = FeatureView(
    name="candidate_base_features_view",
    description="Birthdate features",
    entities=[candidate],
    ttl=timedelta(days=365), # The time that the features in the feature view should be cached for
    schema=[
        Field(name="country_id", dtype=Int32),
        Field(name="lat", dtype=Float32),
        Field(name="lng", dtype=Float32),
        Field(name="birthdate", dtype=UnixTimestamp)
        ],    
    source=candidate_base_features_source,
    online=True,
    tags={"team": "data_labs"},
    owner="[email protected]"
)


@stream_feature_view(
    entities=[candidate],
    ttl=timedelta(seconds=8640000000),
    mode="spark",
    schema=[
        Field(name="birthdate", dtype=String)
    ],
    timestamp_field="event_timestamp",
    online=True,
    source=birthdates_streams_source,
)
def driver_hourly_stats_stream(df: DataFrame):
    from datetime import datetime
    from pyspark.sql.functions import col, udf
    from pyspark.sql.types import DateType

    func =  udf (lambda x: datetime.strptime(x, '%Y-%m/%d'), DateType())

    return (
        df.withColumn("birthdate", func(col('first')))
    )

Specifications

  • Version: Feast SDK Version: "feast 0.24.1"
  • Platform: Mac M1
  • Subsystem:

Possible Solution

diegofjb avatar Sep 09 '22 12:09 diegofjb

The code in the Kinesis documentation doesn't work for me neither. Should it be managed here?

https://github.com/feast-dev/feast/blob/175fd256e0d21f6539f68708705bddf1caa3d975/sdk/python/feast/stream_feature_view.py#L30

CarlosVecina avatar Sep 09 '22 14:09 CarlosVecina

Hello! @CarlosVecina @diegofjb Yes, currently we only did testing for kafka and push source. If you would like to use a kinesis source, add a KinesisSource exactly where Carlos mentioned!

kevjumba avatar Sep 09 '22 16:09 kevjumba

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Jan 10 '23 05:01 stale[bot]