bigquery:storage write client doesn't work for arrow format
Client
Environment
go version go1.23.4 darwin/arm64 Tried with arrow12 to arrow18
Code and Dependencies
- Following is the code I ran which isn't working
import (
arrow15 "github.com/apache/arrow/go/v15/arrow"
arrow15array "github.com/apache/arrow/go/v15/arrow/array"
arrow15ipc "github.com/apache/arrow/go/v15/arrow/ipc"
arrow15memory "github.com/apache/arrow/go/v15/arrow/memory"
)
func writeToBQClient(record arrow15.Record, writeStreamName string, appendStream storagepb.BigQueryWrite_AppendRowsClient) error {
mem := arrow15memory.NewGoAllocator()
defer mem.Close()
// Serialize schema using IPC format (just the schema, no data)
var schemaBuf bytes.Buffer
schemaWriter := arrow15ipc.NewWriter(&schemaBuf, arrow15ipc.WithSchema(record.Schema()), arrow15ipc.WithAllocator(mem))
err := schemaWriter.Close() // This writes just the schema without any records
if err != nil {
log.Logger().WithError(err).Error("Failed to serialize schema")
return err
}
schemaData := schemaBuf.Bytes()
// Serialize record batch using IPC format
var recordBuf bytes.Buffer
recordWriter := arrow15ipc.NewWriter(&recordBuf, arrow15ipc.WithSchema(record.Schema()), arrow15ipc.WithAllocator(mem))
err = recordWriter.Write(record)
if err != nil {
log.Logger().WithError(err).Error("Failed to write Arrow record")
recordWriter.Close()
return err
}
err = recordWriter.Close()
if err != nil {
log.Logger().WithError(err).Error("Failed to close Arrow writer")
return err
}
recordData := recordBuf.Bytes()
log.Logger().Infof("Schema IPC data size: %d bytes", len(schemaData))
log.Logger().Infof("Record IPC data size: %d bytes", len(recordData))
request := &storagepb.AppendRowsRequest{
WriteStream: writeStreamName,
Rows: &storagepb.AppendRowsRequest_ArrowRows{
ArrowRows: &storagepb.AppendRowsRequest_ArrowData{
WriterSchema: &storagepb.ArrowSchema{
SerializedSchema: schemaData,
},
Rows: &storagepb.ArrowRecordBatch{
SerializedRecordBatch: recordData,
},
},
},
}
// Send the request
err = appendStream.Send(request)
if err != nil {
log.Logger().WithError(err).Error("Failed to send AppendRows request")
return err
}
err = appendStream.CloseSend()
if err != nil {
log.Logger().WithError(err).Error("Failed to close AppendRows request")
return err
}
return nil
}
func Main() {
// Create Arrow v12 record with single column c0 of type string and value "tushartg"
pool := arrow15memory.NewGoAllocator()
// Create schema with single string column
schema := arrow15.NewSchema([]arrow15.Field{
{Name: "c0", Type: arrow15.BinaryTypes.String, Nullable: true},
}, nil)
// Create record builder
builder := arrow15array.NewRecordBuilder(pool, schema)
defer builder.Release()
// Add single row with value "tushartg"
builder.Field(0).(*arrow15array.StringBuilder).Append("tushartg")
// Build the record
record := builder.NewRecord()
defer record.Release()
storageWriteClient, err := bqstorage.NewBigQueryWriteClient(ctx, c.authOption)
if err != nil {
log.Logger().WithError(err).Error("Failed to create fresh BigQuery Storage write client")
return err
}
defer storageWriteClient.Close()
stream, err := storageWriteClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", c.projectID, schema, tableName),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
return err
}
appendStream, err := storageWriteClient.AppendRows(ctx)
if err != nil {
log.Logger().WithError(err).Error("Failed to create AppendRows stream")
return err
}
// Write to BigQuery and handle response synchronously
err = writeToBQClient(record, stream.Name, appendStream)
if err != nil {
log.Logger().WithError(err).Error("Failed to write to BigQuery client")
return err
}
// Receive responses
for {
resp, err := appendStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Logger().WithError(err).Error("Failed to receive AppendRows response")
return err
}
if resp.GetError() != nil {
log.Logger().Errorf("BigQuery error: %v", resp.GetError())
return errors.NewErrorf(errors.Internal, "BigQuery error: %v", resp.GetError())
}
}
}
Error received
ERROR: BigQuery error: code:3 message:"Header-type of flatbuffer-encoded Message is not RecordBatch. Entity: projects/PROJECT/datasets/DATASET/tables/TABLENAME/streams/_default" details:{[type.googleapis.com/google.rpc.DebugInfo]:{detail:"INVALID_ARGUMENT: Header-type of flatbuffer-encoded Message is not RecordBatch. [type.googleapis.com/util.MessageSetPayload='[cloud.helix.vortex.VortexStatus] { error_code: CONVERTER_PARSE_ERROR component: CONVERTER is_public_error: true }']"}}
Expected behavior
Data gets ingested into table
Actual behavior
Error: Header-type of flatbuffer-encoded Message is not RecordBatch
Additional context
I want to know how to serialize the arrow records basically. Can you please help improve documentation?
thanks for the report @tushartg, I'll work on writing a quick sample for that, I haven't tried yet to use the arrow format with the Go SDK. Will keep you posted.
@tushartg I found the issue, which is related to the arrow-go ipc.Writer always serializing the schema data to the buffer, which is not expected on the BigQuery side. BigQuery expects only RecordBatches, so it fails when it sees a Schema in the first message. Currently the Arrow lib in Go doesn't provide a way to skip writing the header, I did some modifications locally, but might need to send a PR to the arrow lib to add support for that.
In the meantime, there is a not that nice solution, which is to skip the initial payload that contains the schema information. Here is my full test code that successfully writes data using arrow. It's a slightly modification of your code, I might write another sample to add to our samples repository.
package main
import (
"bytes"
"context"
"fmt"
"io"
"log"
"log/slog"
"cloud.google.com/go/bigquery"
storage "cloud.google.com/go/bigquery/storage/apiv1"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/ipc"
"github.com/apache/arrow/go/v15/arrow/memory"
)
// table configurations
var (
projectID = "<YOUR_PROJECT_ID>"
datasetID = "<YOUR_DATASET_ID>"
tableID = "<YOUR_TABLE_ID>"
)
func writeToBQClient(schema *arrow.Schema, records []arrow.Record, writeStreamName string, appendStream storagepb.BigQueryWrite_AppendRowsClient) error {
// Serialize schema using IPC format (just the schema, no data)
var schemaBuf bytes.Buffer
schemaWriter := ipc.NewWriter(&schemaBuf, ipc.WithSchema(schema))
err := schemaWriter.Close() // This writes just the schema without any records
if err != nil {
slog.Error("Failed to serialize schema", "error", err)
return err
}
schemaData := schemaBuf.Bytes()
// Serialize record batch using IPC format
var recordBuf bytes.Buffer
recordWriter := ipc.NewWriter(&recordBuf, ipc.WithSchema(schema))
for _, record := range records {
err = recordWriter.Write(record)
if err != nil {
slog.Error("Failed to write Arrow record", "error", err)
return err
}
}
err = recordWriter.Close()
if err != nil {
slog.Error("Failed to close Arrow writer", "error", err)
return err
}
recordData := recordBuf.Bytes()
// Workaround to skip schema data
// currently, ipc.Writer writes the schema data to the header of the file
// which makes BigQuery Write API to reject it, as it wants just RecordBatches
// this reads the size of the schema payload minus the 8 bytes of EOS content
recordData = recordData[len(schemaData)-8:]
slog.Info("Schema IPC data size", "bytes", len(schemaData))
slog.Info("Record IPC data size", "bytes", len(recordData))
request := &storagepb.AppendRowsRequest{
WriteStream: writeStreamName,
Rows: &storagepb.AppendRowsRequest_ArrowRows{
ArrowRows: &storagepb.AppendRowsRequest_ArrowData{
WriterSchema: &storagepb.ArrowSchema{
SerializedSchema: schemaData,
},
Rows: &storagepb.ArrowRecordBatch{
SerializedRecordBatch: recordData,
},
},
},
}
// Send the request
err = appendStream.Send(request)
if err != nil {
slog.Error("failed to send AppendRows request", "error", err)
return err
}
err = appendStream.CloseSend()
if err != nil {
slog.Error("failed to close AppendRows request: %v")
return err
}
return nil
}
func insertData(ctx context.Context, wc *storage.BigQueryWriteClient, projectID, datasetID, tableID string) error {
// Create Arrow record with single column c0 of type string and value "bar"
pool := memory.NewGoAllocator()
// Create schema with single string column
schema := arrow.NewSchema([]arrow.Field{
{Name: "c0", Type: arrow.BinaryTypes.String, Nullable: true},
}, nil)
// Create record builder
builder := array.NewRecordBuilder(pool, schema)
defer builder.Release()
// Add single row with value "bar"
builder.Field(0).(*array.StringBuilder).Append("bar")
// Build the record
record := builder.NewRecord()
defer record.Release()
stream, err := wc.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
return fmt.Errorf("failed to create write stream: %v", err)
}
appendStream, err := wc.AppendRows(ctx)
if err != nil {
slog.Error("Failed to create AppendRows stream", "error", err)
return fmt.Errorf("failed to create AppendRows stream: %v", err)
}
// Write to BigQuery and handle response synchronously
err = writeToBQClient(schema, []arrow.Record{record}, stream.Name, appendStream)
if err != nil {
slog.Error("failed to write to BigQuery client", "error", err)
return fmt.Errorf("failed to write to BigQuery client: %v", err)
}
// Receive responses
for {
resp, err := appendStream.Recv()
if err == io.EOF {
break
}
if err != nil {
slog.Error("Failed to receive AppendRows response", "error", err)
return err
}
if resp.GetError() != nil {
slog.Error("BigQuery error", "error", resp.GetError())
return fmt.Errorf("bigquery error: %v", resp.GetError())
}
}
return nil
}
func setupTable(ctx context.Context, bc *bigquery.Client) error {
schema := bigquery.Schema{
{Name: "c0", Type: bigquery.StringFieldType},
}
_ = bc.Dataset(datasetID).Table(tableID).Create(ctx, &bigquery.TableMetadata{
Schema: schema,
})
return nil
}
func main() {
ctx := context.Background()
bc, err := bigquery.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("failed to setup bigquery: %v", err)
}
err = setupTable(ctx, bc)
if err != nil {
log.Fatalf("failed to setup table: %v", err)
}
wc, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatalf("failed to setup bigquery write API client: %v", err)
}
err = insertData(ctx, wc, projectID, datasetID, tableID)
if err != nil {
log.Fatalf("failed to insert data: %v", err)
}
}
The important code with the workaround is here:
recordData := recordBuf.Bytes()
// Workaround to skip schema data
// currently, ipc.Writer writes the schema data to the header of the file
// which makes BigQuery Write API to reject it, as it wants just RecordBatches
// this reads the size of the schema payload minus the 8 bytes of EOS content
recordData = recordData[len(schemaData)-8:]
slog.Info("Schema IPC data size", "bytes", len(schemaData))
slog.Info("Record IPC data size", "bytes", len(recordData))
I'll follow up by opening a discussion and/or PR on the arrow-go repository about having the option to skip serializing the schema.
Draft PR on arrow-go, need more work, but can at least start some discussions. https://github.com/apache/arrow-go/pull/421
Thanks a lot @alvarowolfx ❤️, I am sorry I didn't get notification for this. I will give it a try.