pdr-backend
                                
                                 pdr-backend copied to clipboard
                                
                                    pdr-backend copied to clipboard
                            
                            
                            
                        [Lake] ETL has created all bronze tables
Motivation
The role of data_factory.py is to perform a single-step procedure to get the data written to disk. However, this data may need to be manipulated further, such as joined with other data, and then aggregated.
To solve for this, etl.py will be responsible for taking all steps required to generate/compute data towards analytics/intelligence.
- bronze_step()
- silver_step()
- gold_step()
To complete this task, we'll be using all the bronze_tables generated by gql_data_factory:
- pdr_predictions
- pdr_truevals
- pdr_payouts
For now GQLDataFactory will implement a post_fetch step, thtwill join data from truevals and payouts to pdr_predictions. At the end of this post_fetch step, other tables and summaries can be kicked off.
DoD:
- [x] data factories should be responsible for generating all bronze records
- [x] GQL data factory can then execute the incremental post_pdr_predictions table #495
- [ ] Continue using GQLDataFactory or build ETL.py that is responsible for building core analytics tables (bronze=>silver=>gold)
Alternatively, expand gql_data_factory, such that it can handle:
- Post-fetch processing of bronze tables
- Processing of silver tables (enrich, aggregate for time-series analysis)
- Processing of gold tables (metrics, summaries, lookup tables)
    def __init__(self, ppss: PPSS):
        self.ppss = ppss
        # filter by feed contract address
        network = get_sapphire_postfix(ppss.web3_pp.network)
        contract_list = get_all_contract_ids_by_owner(
            owner_address=self.ppss.web3_pp.owner_addrs,
            network=network,
        )
        
        # TODO - make contract_list a constant, filtering happens after full sync
        # 1. configure all tables that will be recorded onto lake
        self.bronze_config = {
            "bronze_pdr_predictions": {
                "fetch_fn": get_pdr_predictions_df,
                "schema": predictions_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
            "bronze_pdr_truevals": {
                "fetch_fn": get_pdr_truevals_df,
                "schema": truevals_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
            "bronze_pdr_payouts": {
                "fetch_fn": get_pdr_payouts_df,
                "schema": payouts_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
            "bronze_pdr_subscriptions": {
                "fetch_fn": get_pdr_subscriptions_df,
                "schema": subscriptions_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
        }
        # 2. configure post_fetch tables that will be created from the raw data
        self.post_bronze_config = {
            "bronze_post_pdr_predictions": {
                "fetch_fn": get_post_predictions_df,
                "schema": bronze_post_predictions_schema,
                "config": {
                    "contract_list": contract_list,
                },
            }
        }
        # 3. configure silver tables that will be created as a result of aggregations
        self.silver_config = {
            "silver_pdr_predictions": {
                "fetch_fn": get_silver_predictions_df,
                "schema": silver_predictions_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
            "silver_pdr_feeds": {
                "fetch_fn": get_silver_feeds_df,
                "schema": silver_feeds_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
            "silver_pdr_predictoors": {
                "fetch_fn": get_silver_predictoors_df,
                "schema": silver_predictoors_schema,
                "config": {
                    "contract_list": contract_list,
                },
            },
        }
Motivation
To prevent re-fetching data from subgraph we're going to maintain our own view of the final state of each prediction.
This means that we'll be joining <truevals, payouts, and prediction> to calculate a local record representing the final_state for each prediction made. To do this, use some data+tests to write/verify the logic is working as expected, and that we're only processing the required data.
Example - Work #1 - Basic Event Flow
- Add a few prediction events
- Test that new trueval events update existing post_prediction records
- Test that new payout events update existing post_prediction records.
- Test that new prediction events are appended, and unnecessary updates are being minimized.
- Test that unnecessary trueval and payout updates are being minimized.
- Test that post_prediction records, and downstream records are updated incrementally.
Example - Work #2 - Larger coverage
- Test the system w/ trueval + payout being backlogged for many predictions
truevals
{
	predictTrueVals(first: 100) {
    id
    slot {
      id
      predictContract {
        id
        secondsPerEpoch
      }
      slot
    }
    trueValue
    block
  }
}
Predictions
{
	predictPredictions(first: 1000 , where: {user_starts_with_nocase: "0xaddy"}) {
                    id
                    timestamp
                    user {
                        id
                    }
                    stake
                    payout {
                        payout
                        trueValue
                        predictedValue
                    }
                    slot {
                        slot
                      trueValues {
                        id
                        trueValue
                      }
                    }
                }
            }
Payouts
{
  predictPayouts(first: 1000 orderDirection: desc orderBy: timestamp) {
    id
    prediction {
      id
      user {
        id
      }
      stake
      slot {
        predictContract {
          token {
            id
          }
        }
        slot
      }
    }
    payout
    predictedValue
    trueValue
    aggregatedPredictedValue
    block
    timestamp
  }
}
I'm going to close this ticket, too broad. We've delivered the first implementation.