firestore: bulkwriter does not write all documents to the collection
Client
Firestore
Environment
go 1.23.4
Code and Dependencies
Sample function that can trigger the observed behaviour. Call the method with an array containing thousands of objects.
func (c *Client) BulkCreate(ctx context.Context, collID string, src any) error {
if reflect.TypeOf(src).Kind() != reflect.Slice {
return errors.New("bulk create: src must be a slice")
}
w := c.delegate.BulkWriter(ctx)
defer w.End() // This makes sure the last elements are flushed correctly
docs := reflect.ValueOf(src)
collRef := c.delegate.Collection(collID)
for i := 0; i < docs.Len(); i++ {
_, err := w.Create(collRef.NewDoc(), docs.Index(i).Interface())
if err != nil {
return fmt.Errorf("bulk create: %w", err)
}
}
return nil
}
go.mod
module modname
go 1.23.0
require (
cloud.google.com/go/firestore v1.17.0
)
Expected behavior
All the items in the slice are persisted to the target collection by the bulkwriter. If a document fails to be inserted, an error is thrown.
Actual behavior
Some documents are not created and no error is thrown.
Additional context
Changing the body of the function as follow fixes the problem.
bwj, err := w.Create(collRef.NewDoc(), docs.Index(i).Interface())
if err != nil {
return fmt.Errorf("bulk create: %w", err)
}
// The maxBatchSize for the bulkwriter is 20 after which the bulkwriter tries to write to the database
// Wait for the write results before continuing. This prevents silent loss of data.
if (i+1)%maxBatchSize == 0 {
if _, err := bwj.Results(); err != nil {
return fmt.Errorf("bulk create document: %w", err)
}
}
My theory is that the Add function of the bundler throws errors but those ignored by the BulkWriter's write function (see code screenshots below)
Hi @bhshkh , did you have a chance to look into this? I really hope I'm wrong about this, because the idea of random entries being dropped without warning is quite scary.
If you can reproduce this on your side, what would be the preferred solution? Throwing errors in the middle of the batch would be an option but it leaves the writer with the burden to buffer messages hoping not to get an error.
I also tried waiting for the results of the job with every write but unfortunately it blocks until the entry is written to the database. Maybe there could be an option to return a value that blocks only when the maximum memory available has been reached, forcing the producer to stop sending messages until the memory is freed. This is akin to apply a back-pressure on the producer.
It would be more elegant than doing it only when if (i+1)%maxBatchSize == 0, because this implies internal knowledge of the implementation of the bulk writer.
STATUS UPDATE: I am not able to reproduce the issue on my end in the 3 tries I have made. I will run the code a few more times to see if issue reproduces. Using below code to reproduce:
package main
import (
"context"
"errors"
"fmt"
"reflect"
"time"
"cloud.google.com/go/firestore"
"github.com/google/uuid"
)
type Client struct {
delegate *firestore.Client
}
type Foo struct {
Bar string
Baz string
Qux string
}
func main() {
ctx := context.Background()
fc, err := firestore.NewClientWithDatabase(ctx, "golang-firestore", "bulkcreate")
if err != nil {
fmt.Printf("failed to create firestore client: %v", err)
return
}
c := Client{
delegate: fc,
}
foos := []Foo{}
for range 10000000 {
foos = append(foos, Foo{
Bar: "foo",
Baz: "bar",
Qux: "qux",
})
}
collName := "BulkCreate-" + uuid.New().String()
fmt.Println("Created foos. Writing "+fmt.Sprint(len(foos))+" documents to collection: ", collName, "...")
err = c.BulkCreate(ctx, collName, foos)
if err != nil {
fmt.Printf("failed to bulk create: %v", err)
return
}
coll := c.delegate.Collection(collName)
aq := coll.NewAggregationQuery().WithCount("numWritten")
ar, err := aq.Get(ctx)
if err != nil {
fmt.Printf("failed to get aggregation result: %v", err)
return
}
fmt.Printf("Rows written: %v\n", ar["numWritten"])
}
func (c *Client) BulkCreate(ctx context.Context, collID string, src any) error {
if reflect.TypeOf(src).Kind() != reflect.Slice {
return errors.New("bulk create: src must be a slice")
}
w := c.delegate.BulkWriter(ctx)
defer func() {
startTime := time.Now()
fmt.Println("Starting flush at: " + startTime.String())
w.End() // This will Flush all the remaining documents at the end
endTime := time.Now()
fmt.Println("Flush ended at: " + endTime.String() + " Duration: " + endTime.Sub(startTime).String())
}()
docs := reflect.ValueOf(src)
collRef := c.delegate.Collection(collID)
for i := 0; i < docs.Len(); i++ {
fmt.Printf(".")
if i%1000 == 0 {
fmt.Printf("\n")
}
_, err := w.Create(collRef.NewDoc(), docs.Index(i).Interface())
if err != nil {
return fmt.Errorf("bulk create: %w", err)
}
}
return nil
}
Hi @bhshkh , thanks for taking the time to look into this. What I saw in the code in that silently dropping rows can happen when memory limits are reached. Is there a way to modify this test to increase the size of the objects and reduce the available memory?
Any update?
I'm having a very similar problem when creating new objects in my collection using bulkWriter. In my case I'm trying to create approximately 200 documents within a commit (bulkWriter.Flush()). The strangest thing is that some documents are simply not persisted, making it impossible to track via logs what went wrong.
One point of attention in my case is that I have some data encrypted outside the utf8 standard being stored, could this lead to problems like this?
go 1.22.1 / Firestore.
Hi @bhshkh , thanks for taking the time to look into this. What I saw in the code in that silently dropping rows can happen when memory limits are reached. Is there a way to modify this test to increase the size of the objects and reduce the available memory?
I tried to reduce the memory using GOMEMLIMT but that is only a soft limit. I will figure out some other way to reduce the available memory
We're facing this issue as well - we ingest CSV data into Firestore using Cloud Run jobs
@bhshkh did you have a chance to limit the memory available to try to force the described behaviour? Or maybe use really large objects in the test to fill in the memory?
Does a code analysis already show why this error could be happening?
As I wrote previously I assume the problem to be caused by line below, which ignores error messages sent by the Add function, I didn't see any other obvious explanation for ignoring lines.