opa
opa copied to clipboard
Memory leak in topdown due to global compiled regex/glob pattern cache
Topdown contains two global caches for compiled regex and glob match patterns:
- https://github.com/open-policy-agent/opa/blob/main/topdown/regex.go#L19
- https://github.com/open-policy-agent/opa/blob/main/topdown/glob.go#L14
Using a modified version of stress-opa I was able to cause OPA to use multiple GBs of memory after a minute or two. Obviously this is a pathological case but if we assume that users can pass regex patterns in input, leaks will begin to appear eventually. For workloads with lower request frequency (e.g., 100s of RPS), it could be hours or days.
I verified the heap usage by enabling pprof and also tested removing the cache insertion and saw that the memory usage was stable.
I haven't tested the same issue with glob patterns but presumably we have the same problem there. Though I'm not sure what the curve looks like (e.g., overhead may be higher or lower.)
In terms of solutions... we should benchmark a few representative policies with and without caching. I'm not convinced that the cache actually saves that much time for typical patterns. The cache was included in the original implementation of the regex.match
function and there is no accompanying benchmark that I am aware of. This seems like a case of premature optimization.
Here is the trivial policy that I tested with:
package x
import rego.v1
p if {
regex.match(input.patterns[_], "x")
}
Here is the modified version of stress-opa:
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/open-policy-agent/opa/metrics"
)
func pickInput(i int, x int) interface{} {
return map[string]interface{}{
"input": map[string]interface{}{
"patterns": []interface{}{
fmt.Sprintf("thread_%d_%d", i, x),
},
},
}
}
type result struct {
Total int64
Metrics map[string]int64 `json:"metrics,omitempty"`
}
func run(i int, ch chan<- result) {
client := &http.Client{}
var x int
for {
func() {
var buf bytes.Buffer
x += 1
input := pickInput(i, x)
if err := json.NewEncoder(&buf).Encode(input); err != nil {
panic(err)
}
t0 := time.Now()
resp, err := client.Post("http://localhost:8181/v1/data/x/p?metrics", "application/json", &buf)
if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
panic(err)
}
var r result
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
panic(err)
}
r.Total = int64(time.Since(t0))
ch <- r
}()
}
}
func printHeader(keys []string) {
for i := range keys {
fmt.Printf("%-14s ", keys[i])
}
fmt.Print("\n")
for i := range keys {
fmt.Printf("%-14s ", strings.Repeat("-", len(keys[i])))
}
fmt.Print("\n")
}
func printRow(keys []string, row map[string]interface{}) {
for _, k := range keys {
fmt.Printf("%-14v ", row[k])
}
fmt.Print("\n")
}
func main() {
monitor := make(chan result)
metricKeys := []string{
"rps",
"cli(mean)",
"cli(90%)",
"cli(99%)",
"cli(99.9%)",
"cli(99.99%)",
"opa(mean)",
"opa(90%)",
"opa(99%)",
"opa(99.9%)",
"opa(99.99%)",
}
printHeader(metricKeys)
go func() {
delay := time.Second * 10
ticker := time.NewTicker(delay)
var n int64
m := metrics.New()
tLast := time.Now()
for {
select {
case <-ticker.C:
now := time.Now()
dt := int64(now.Sub(tLast))
rps := int64((float64(n) / float64(dt)) * 1e9)
row := map[string]interface{}{
"rps": rps,
}
hists := []string{"cli", "opa"}
for _, h := range hists {
hist := m.Histogram(h).Value().(map[string]interface{})
keys := []string{"mean", "90%", "99%", "99.9%", "99.99%"}
for i := range keys {
row[fmt.Sprintf("%v(%v)", h, keys[i])] = time.Duration(hist[keys[i]].(float64))
}
}
printRow(metricKeys, row)
tLast = now
n = 0
m = metrics.New()
case r := <-monitor:
m.Histogram("cli").Update(r.Total)
m.Histogram("opa").Update(r.Metrics["timer_server_handler_ns"])
n++
}
}
}()
for i := 0; i < 1; i++ {
go run(i, monitor)
}
eof := make(chan struct{})
<-eof
}