polars icon indicating copy to clipboard operation
polars copied to clipboard

Support of vertical fold and scan

Open rongcuid opened this issue 2 years ago • 30 comments

Description

Like .map(), I would like Polars to support similar row-wise fold and scan operations: https://stackoverflow.com/questions/17408880/reduce-fold-or-scan-left-right

As an example, assume the table below:

A B
1 4
2 5
3 6

Here's one example:

def f(acc, r):
    return acc + r["A"] + r["B"]
df.select(pl.foldTop(0, f)) # returns 21
df.select(pl.scanTop(0, f)) # returns [5, 12, 21]
df.select(pl.foldBottom(0, f)) # returns 21
df.select(pl.scanBottom(0, f)) # returns [21, 16, 9]

Of course, I made no attempt in "refining" this API. Ideally it should be able to map multiple columns to multiple new columns. What I included is similar to the pl.reduce function, but that one is horizontal folding, not vertical.

rongcuid avatar Nov 01 '23 13:11 rongcuid

thanks for the request!

any chance you could please include an example of a non-trivial function you'd like to apply please, which can't be easily done using the existing api? (e.g. df.select(pl.sum_horizontal('*').cumsum()))

MarcoGorelli avatar Nov 01 '23 13:11 MarcoGorelli

I would be hesitant in adding this as its performance will inherently be horrible when using Python callbacks. As @MarcoGorelli says, is there a particular example of a fold expression you'd want?

orlp avatar Nov 01 '23 14:11 orlp

performance will inherently be horrible when using Python callbacks

true, but it wouldn't necessarily need to be Python callbacks, right? like list.eval provides a way to define custom functions, so long as one stays within the expressions api

but this sounds like potentially quite a lot of effort, that's why I'm suggesting to first check for a non-trivial use-case

MarcoGorelli avatar Nov 01 '23 14:11 MarcoGorelli

Here's an example: I am trying to implement a recurrence relation of the form y_t = x_t + alpha * y_t-1. Normally I would use a fold to do this. I got it to work by (ab)using emw_mean emw_mean

df.select(
    y=(pl.col("x") / (1 - alpha))
    .ewm_mean(alpha=(1 - alpha), adjust=False)
)

after multiplying the first element (only) by 1-alpha but that is deeply horrible! Maybe there is a better way of doing this without resorting to mapping over the series with python, but I couldn't work one out.

FWiW: Here's a gist with an implementation as a rust expression plugin for forward and reverse recurrence relations of this form.

Dris101 avatar Nov 08 '23 18:11 Dris101

My example of a use case would be within a future simulation where columns would vary based on accumulated results to that point. A simple example: I have a portfolio that is earning a normalvariate(mu, sigma) amount each period. I am also making contributions to this fund. This fund is also being taxed based on how much it might have earned/lost. When this row is tallied, this final tally is carried forward as the starting principal in the next row, and the earning, taxing, and contributing continues. At the end of N periods, the simulation yields a final dataframe and resulting tally. You cannot fill all the columns at the beginning as each row is dependent on the tally from a previous row.

buckleyc avatar Dec 14 '23 14:12 buckleyc

thanks all

I think Polars plugins may be the way to go here to be honest...I'll try to put a simple example together

MarcoGorelli avatar Dec 14 '23 16:12 MarcoGorelli

today I learned that numba ufuncs works in lazy mode - somehow, I'd missed this, and was under the misguided impression that they required you to collect values first

so, you could reimplement a cumulative sum like this:

from numba import guvectorize

df = pl.DataFrame({'a': [1,2, 3,4]})

@guvectorize([(int64[:], int64[:])], '(n)->(n)')
def g(x, res):
    res[0] = x[0]
    for i in range(1, len(x)):
        res[i] = x[i] + res[i-1]

df.with_columns(c=pl.col('a').map_batches(g))

Does guvectorize and/or polars plugins cover your needs @rongcuid / @buckleyc ? If so, I think a good solution would be to just document how to use these effectively

MarcoGorelli avatar Dec 14 '23 19:12 MarcoGorelli

@MarcoGorelli , your post was educational and informative, as I was unfamiliar with @guvectorize. (Thanks.) However, I still could not find a way to make this help. The df is still dependent on a tally (e.g., sum_horizontal()) of one row to have a starting value for the next row. Your example pre-populates the 'a' column, whereas a financial simulation is going to start with a beginning balance (a[0]), but no predefined values below. a[0] is used to calculate interest earnings, taxes deducted, fees, etc. Then this row is tallied, and the result is used as the starting balance in the next row (a[1]. Rinse, repeat. With this in mind, do you have an idea of how best to proceed?

buckleyc avatar Dec 15 '23 14:12 buckleyc

could you show an example input with expected output please?

MarcoGorelli avatar Dec 15 '23 15:12 MarcoGorelli

@MarcoGorelli , Here is some example code I cobbled from a larger project. This is using a for loop for the logic it would be nice to incorporate (within a fold?). I hope this shows the idea of semi-complex interrelation of supplied inputs and other variable results, with the rolling tally (i.e., closing balance of a row becomes the opening balance of the following row).

number_of_years: int = 40
year_start: int = 2021
balance_start: float = 1_000_000.00
yearly_expenses_start: float = 60_000.00

cola_mu_precomputed: float = 0.02657464810181409
cola_sigma_precomputed: float = 0.017510460689663887

yearly_incomes = np.round(np.random.normal(100_000, 15_000, number_of_years), 2)
cost_of_living_adjustment = np.random.normal(cola_mu_precomputed, cola_sigma_precomputed, number_of_years)
yearly_gain_rate = np.random.normal(0.06, 0.0333, number_of_years)
investment_contribution = np.round(np.random.normal(5_000, 1_000, number_of_years), 2)

federal_tax_bracket_by_year: dict = {
    2024: {
        'rate': [0.10, 0.12, 0.22, 0.24, 0.32, 0.35, 0.37],
        'joint':    [0, 23200, 94300, 201050, 383900, 487450, 731200],
        'single':     [0, 11_600, 47_150, 100_525, 191_950, 243_725, 609_350],
    },
}
federal_standard_deduction_by_year: dict = {  # joint, single, household, elderly/blind joint, elderly/blind single
    2024: {'joint': 29200, 'single': 14600, },
}

filing_status = 'single'

def income_tax_liability_by_bracket(
        income: float, filing_status: str = 'single', year: int = None,
        cumulative_cola: float = 1.0, ) -> np.ndarray:
    latest_year: int = next(iter(federal_tax_bracket_by_year))
    if not year:
        if date.today().year in federal_tax_bracket_by_year.keys():
            year = date.today().year
        else:
            year = latest_year  # use the latest year in federal_tax_bracket dict
    if year not in federal_tax_bracket_by_year.keys():
        year = latest_year
    rates = np.array(federal_tax_bracket_by_year[year]['rate'], dtype=float)
    brackets = np.array(federal_tax_bracket_by_year[year][filing_status], dtype=float)
    brackets *= cumulative_cola
    brackets = np.append(brackets, np.inf)
    income_fill: np.ndarray = np.broadcast_to(np.array([income]), (brackets.shape[0] - 1, 1))
    amount_per_bracket: np.ndarray = np.clip(income_fill.transpose(), brackets[:-1], brackets[1:]) - brackets[:-1]
    taxes_per_bracket: np.ndarray = rates * amount_per_bracket
    return taxes_per_bracket

journal_entries = []
balance_opening = balance_start
cumulative_cola = 1.0
for i, y in enumerate(range(year_start, year_start + number_of_years)):
    investment_gain = np.round(balance_opening * yearly_gain_rate[i], 2)
    yearly_expenses = np.round(yearly_expenses_start * cumulative_cola, 2)
    investment_withdrawal = max(0, yearly_expenses - yearly_incomes[i])
    yearly_income_tax = np.round(np.sum(income_tax_liability_by_bracket(yearly_incomes[i] + investment_withdrawal, filing_status, None, cumulative_cola)),2)
    balance_closing = balance_opening + investment_gain + investment_contribution[i] - investment_withdrawal - yearly_income_tax
    journal_entries.append(
        [
            date(year_start + i, 1, 1),
            balance_opening,
            investment_gain,
            yearly_incomes[i],
            investment_contribution[i],
            yearly_expenses,
            investment_withdrawal,
            yearly_income_tax,
            balance_closing,
        ]
    )
    balance_opening = balance_closing
    cumulative_cola *= (1 + cost_of_living_adjustment[i])
    
schema = {'year': pl.Date, 
          'opening balance': pl.Float64, 'investment gain': pl.Float64, 'incomes': pl.Float64, 'contributions': pl.Float64, 
          'expenses': pl.Float64, 'investment withdrawal': pl.Float64, 'income tax': pl.Float64, 'closing balance': pl.Float64, 
         }
df = pl.DataFrame(data=journal_entries, schema=schema)
df

buckleyc avatar Dec 20 '23 01:12 buckleyc

Thanks - I don't have time to try rewriting that at the moment, but I've added a page on cumulative operations to a Polars Plugins tutorial I'm writing: https://marcogorelli.github.io/polars-plugins-tutorial/cum_sum/

The example there uses scan:

    let out: Int64Chunked = ca
        .into_iter()
        .scan(None, |state: &mut Option<i64>, current: Option<i64>| {
            let sum = match (*state, current) {
                (Some(state_inner), Some(current)) => {
                    *state = Some(state_inner + current);
                    *state
                }
                (None, Some(current)) => {
                    *state = Some(current);
                    *state
                }
                (_, None) => None,
            };
            Some(sum)
        })
        .collect_trusted();

Is this enough for you to customise according to your needs?

I think any solution is probably going to require a plugin, if it's meant to be flexible enough to solve the custom cases people here are asking about

MarcoGorelli avatar Jan 15 '24 18:01 MarcoGorelli