Custom bucket increments
In our new and fancy world of LLMs and OpenAI, we have rate limits by number of message-tokens per minute, which means the bucket needs to be increased by a somewhat arbitrary number. Obviously, I could just call "check_rate" sequentially in a loop a few hundred times, but that seems... A little silly.
Would it be possible to expand the public API a little, and allow us to pass in an optional number-by-which-to-increase-the-number-of-tokens-in-the-bucket?
Just following up with @Trevoke's question above. I also think this would be valuable 🙂
👋 Hi all! For anyone looking for a zero-dependency rate limiter,:ets has everything you need.
Here’s a minimal and efficient example (adapted from plug_attack) that uses the same fixed window counter algorithm as ExRated.
defmodule MyApp.RateLimit do
@moduledoc """
Thin wrapper around `:ets.update_counter/4` and a clean-up process to act as a fixed window rate limiter.
"""
use GenServer
@doc """
Starts the process that creates and cleans the ETS table.
Accepts the following options:
- `:clean_period` for how often to perform garbage collection
"""
@spec start_link([{:clean_period, timeout}]) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@table __MODULE__
@type key :: any
@type scale :: pos_integer
@type limit :: pos_integer
@type increment :: pos_integer
@type count :: non_neg_integer
@doc "Checks the rate-limit for a key"
@spec check_rate(key, scale, limit, increment) :: {:allow, count} | {:deny, limit}
def check_rate(key, scale, limit, increment \\ 1) do
count = hit(key, scale, increment)
if count <= limit, do: {:allow, count}, else: {:deny, limit}
end
@doc "Increments the current count for the key and returns the new count"
@spec hit(key, scale, increment) :: count
def hit(key, scale, increment \\ 1) do
window = window(scale)
full_key = {key, window}
expires_at = (window + 1) * scale
:ets.update_counter(@table, full_key, increment, {full_key, 0, expires_at})
end
# The rest of the API is optional so it's commented out.
# @doc "Sets the new count for the key"
# @spec set(key, scale, count) :: count
# def set(key, scale, count) do
# window = window(scale)
# full_key = {key, window}
# expires_at = (window + 1) * scale
# :ets.update_counter(@table, full_key, {2, 1, 0, count}, {full_key, count, expires_at})
# end
# @doc "Gets the current count for the key"
# @spec get(key, scale, limit) :: count
# def get(key, scale, limit) do
# window = window(scale)
# full_key = {key, window}
# case :ets.lookup(@table, full_key) do
# [{_full_key, count, _expires_at}] -> max(count, limit)
# [] -> 0
# end
# end
# @doc "Sets the current count for the key to zero"
# @spec reset(key, scale) :: count
# def reset(key, scale), do: set(key, scale, 0)
@impl true
def init(opts) do
clean_period = Keyword.fetch!(opts, :clean_period)
:ets.new(@table, [
:named_table,
:set,
:public,
{:read_concurrency, true},
{:write_concurrency, true},
{:decentralized_counters, true}
])
schedule(clean_period)
{:ok, %{clean_period: clean_period}}
end
@impl true
def handle_info(:clean, state) do
:ets.select_delete(@table, [{{{:_, :_}, :_, :"$1"}, [], [{:<, :"$1", {:const, now()}}]}])
schedule(state.clean_period)
{:noreply, state}
end
defp schedule(clean_period) do
Process.send_after(self(), :clean, clean_period)
end
@compile inline: [now: 0]
defp now do
System.system_time(:millisecond)
end
@compile inline: [window: 1]
defp window(scale) do
div(now(), scale)
end
end
And a demo.
iex> MyApp.RateLimit.start_link(clean_period: :timer.minutes(10))
#==> {:ok, #PID<0.114.0>}
iex> MyApp.RateLimit.check_rate("openai:123", :timer.minutes(1), _limit = 100000, _tokens = 1000)
#==> {:allow, 1000}
iex> MyApp.RateLimit.check_rate("openai:123", :timer.minutes(1), _limit = 100000, _tokens = 1000)
#==> {:allow, 2000}
iex> MyApp.RateLimit.check_rate("openai:123", :timer.minutes(1), _limit = 100000, _tokens = 1000)
#==> {:allow, 3000}