google-cloud-go icon indicating copy to clipboard operation
google-cloud-go copied to clipboard

bigquery:storage write client doesn't work for arrow format

Open tushartg opened this issue 8 months ago • 4 comments

Client

Environment

go version go1.23.4 darwin/arm64 Tried with arrow12 to arrow18

Code and Dependencies

  • Following is the code I ran which isn't working
import (
        arrow15 "github.com/apache/arrow/go/v15/arrow"
        arrow15array "github.com/apache/arrow/go/v15/arrow/array"
        arrow15ipc "github.com/apache/arrow/go/v15/arrow/ipc"
        arrow15memory "github.com/apache/arrow/go/v15/arrow/memory"
)

func writeToBQClient(record arrow15.Record, writeStreamName string, appendStream storagepb.BigQueryWrite_AppendRowsClient) error {

	mem := arrow15memory.NewGoAllocator()
	defer mem.Close()

	// Serialize schema using IPC format (just the schema, no data)
	var schemaBuf bytes.Buffer
	schemaWriter := arrow15ipc.NewWriter(&schemaBuf, arrow15ipc.WithSchema(record.Schema()), arrow15ipc.WithAllocator(mem))
	err := schemaWriter.Close() // This writes just the schema without any records
	if err != nil {
		log.Logger().WithError(err).Error("Failed to serialize schema")
		return err
	}
	schemaData := schemaBuf.Bytes()

	// Serialize record batch using IPC format
	var recordBuf bytes.Buffer
	recordWriter := arrow15ipc.NewWriter(&recordBuf, arrow15ipc.WithSchema(record.Schema()), arrow15ipc.WithAllocator(mem))

	err = recordWriter.Write(record)
	if err != nil {
		log.Logger().WithError(err).Error("Failed to write Arrow record")
		recordWriter.Close()
		return err
	}

	err = recordWriter.Close()
	if err != nil {
		log.Logger().WithError(err).Error("Failed to close Arrow writer")
		return err
	}

	recordData := recordBuf.Bytes()

	log.Logger().Infof("Schema IPC data size: %d bytes", len(schemaData))
	log.Logger().Infof("Record IPC data size: %d bytes", len(recordData))

	request := &storagepb.AppendRowsRequest{
		WriteStream: writeStreamName,
		Rows: &storagepb.AppendRowsRequest_ArrowRows{
			ArrowRows: &storagepb.AppendRowsRequest_ArrowData{
				WriterSchema: &storagepb.ArrowSchema{
					SerializedSchema: schemaData,
				},
				Rows: &storagepb.ArrowRecordBatch{
					SerializedRecordBatch: recordData,
				},
			},
		},
	}

	// Send the request
	err = appendStream.Send(request)
	if err != nil {
		log.Logger().WithError(err).Error("Failed to send AppendRows request")
		return err
	}

	err = appendStream.CloseSend()
	if err != nil {
		log.Logger().WithError(err).Error("Failed to close AppendRows request")
		return err
	}

	return nil
}

func Main() {
	// Create Arrow v12 record with single column c0 of type string and value "tushartg"
	pool := arrow15memory.NewGoAllocator()

	// Create schema with single string column
	schema := arrow15.NewSchema([]arrow15.Field{
		{Name: "c0", Type: arrow15.BinaryTypes.String, Nullable: true},
	}, nil)

	// Create record builder
	builder := arrow15array.NewRecordBuilder(pool, schema)
	defer builder.Release()

	// Add single row with value "tushartg"
	builder.Field(0).(*arrow15array.StringBuilder).Append("tushartg")

	// Build the record
	record := builder.NewRecord()
	defer record.Release()

	storageWriteClient, err := bqstorage.NewBigQueryWriteClient(ctx, c.authOption)
	if err != nil {
		log.Logger().WithError(err).Error("Failed to create fresh BigQuery Storage write client")
		return err
	}
	defer storageWriteClient.Close()

        stream, err := storageWriteClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
		Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", c.projectID, schema, tableName),
		WriteStream: &storagepb.WriteStream{
			Type: storagepb.WriteStream_COMMITTED,
		},
	})
	if err != nil {
		return err
	}

	appendStream, err := storageWriteClient.AppendRows(ctx)
	if err != nil {
		log.Logger().WithError(err).Error("Failed to create AppendRows stream")
		return err
	}

	// Write to BigQuery and handle response synchronously
	err = writeToBQClient(record, stream.Name, appendStream)
	if err != nil {
		log.Logger().WithError(err).Error("Failed to write to BigQuery client")
		return err
	}

	// Receive responses
	for {
		resp, err := appendStream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Logger().WithError(err).Error("Failed to receive AppendRows response")
			return err
		}
		if resp.GetError() != nil {
			log.Logger().Errorf("BigQuery error: %v", resp.GetError())
			return errors.NewErrorf(errors.Internal, "BigQuery error: %v", resp.GetError())
		}
	}
}

Error received

ERROR: BigQuery error: code:3  message:"Header-type of flatbuffer-encoded Message is not RecordBatch. Entity: projects/PROJECT/datasets/DATASET/tables/TABLENAME/streams/_default"  details:{[type.googleapis.com/google.rpc.DebugInfo]:{detail:"INVALID_ARGUMENT: Header-type of flatbuffer-encoded Message is not RecordBatch. [type.googleapis.com/util.MessageSetPayload='[cloud.helix.vortex.VortexStatus] { error_code: CONVERTER_PARSE_ERROR component: CONVERTER is_public_error: true }']"}}

Expected behavior

Data gets ingested into table

Actual behavior

Error: Header-type of flatbuffer-encoded Message is not RecordBatch

Additional context

I want to know how to serialize the arrow records basically. Can you please help improve documentation?

tushartg avatar Jun 21 '25 16:06 tushartg

thanks for the report @tushartg, I'll work on writing a quick sample for that, I haven't tried yet to use the arrow format with the Go SDK. Will keep you posted.

alvarowolfx avatar Jun 23 '25 17:06 alvarowolfx

@tushartg I found the issue, which is related to the arrow-go ipc.Writer always serializing the schema data to the buffer, which is not expected on the BigQuery side. BigQuery expects only RecordBatches, so it fails when it sees a Schema in the first message. Currently the Arrow lib in Go doesn't provide a way to skip writing the header, I did some modifications locally, but might need to send a PR to the arrow lib to add support for that.

In the meantime, there is a not that nice solution, which is to skip the initial payload that contains the schema information. Here is my full test code that successfully writes data using arrow. It's a slightly modification of your code, I might write another sample to add to our samples repository.

package main

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"log"
	"log/slog"

	"cloud.google.com/go/bigquery"
	storage "cloud.google.com/go/bigquery/storage/apiv1"
	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
	"github.com/apache/arrow/go/v15/arrow"
	"github.com/apache/arrow/go/v15/arrow/array"
	"github.com/apache/arrow/go/v15/arrow/ipc"
	"github.com/apache/arrow/go/v15/arrow/memory"
)

// table configurations
var (
	projectID = "<YOUR_PROJECT_ID>"
	datasetID = "<YOUR_DATASET_ID>"
	tableID   = "<YOUR_TABLE_ID>"
)

func writeToBQClient(schema *arrow.Schema, records []arrow.Record, writeStreamName string, appendStream storagepb.BigQueryWrite_AppendRowsClient) error {
	// Serialize schema using IPC format (just the schema, no data)
	var schemaBuf bytes.Buffer
	schemaWriter := ipc.NewWriter(&schemaBuf, ipc.WithSchema(schema))
	err := schemaWriter.Close() // This writes just the schema without any records
	if err != nil {
		slog.Error("Failed to serialize schema", "error", err)
		return err
	}
	schemaData := schemaBuf.Bytes()

	// Serialize record batch using IPC format
	var recordBuf bytes.Buffer
	recordWriter := ipc.NewWriter(&recordBuf, ipc.WithSchema(schema))
	for _, record := range records {
		err = recordWriter.Write(record)
		if err != nil {
			slog.Error("Failed to write Arrow record", "error", err)
			return err
		}
	}

	err = recordWriter.Close()
	if err != nil {
		slog.Error("Failed to close Arrow writer", "error", err)
		return err
	}

	recordData := recordBuf.Bytes()
	// Workaround to skip schema data
	// currently, ipc.Writer writes the schema data to the header of the file
	// which makes BigQuery Write API to reject it, as it wants just RecordBatches
	// this reads the size of the schema payload minus the 8 bytes of EOS content
	recordData = recordData[len(schemaData)-8:]

	slog.Info("Schema IPC data size", "bytes", len(schemaData))
	slog.Info("Record IPC data size", "bytes", len(recordData))

	request := &storagepb.AppendRowsRequest{
		WriteStream: writeStreamName,
		Rows: &storagepb.AppendRowsRequest_ArrowRows{
			ArrowRows: &storagepb.AppendRowsRequest_ArrowData{
				WriterSchema: &storagepb.ArrowSchema{
					SerializedSchema: schemaData,
				},
				Rows: &storagepb.ArrowRecordBatch{
					SerializedRecordBatch: recordData,
				},
			},
		},
	}

	// Send the request
	err = appendStream.Send(request)
	if err != nil {
		slog.Error("failed to send AppendRows request", "error", err)
		return err
	}

	err = appendStream.CloseSend()
	if err != nil {
		slog.Error("failed to close AppendRows request: %v")
		return err
	}

	return nil
}

func insertData(ctx context.Context, wc *storage.BigQueryWriteClient, projectID, datasetID, tableID string) error {
	// Create Arrow record with single column c0 of type string and value "bar"
	pool := memory.NewGoAllocator()

	// Create schema with single string column
	schema := arrow.NewSchema([]arrow.Field{
		{Name: "c0", Type: arrow.BinaryTypes.String, Nullable: true},
	}, nil)

	// Create record builder
	builder := array.NewRecordBuilder(pool, schema)
	defer builder.Release()

	// Add single row with value "bar"
	builder.Field(0).(*array.StringBuilder).Append("bar")

	// Build the record
	record := builder.NewRecord()
	defer record.Release()

	stream, err := wc.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
		Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID),
		WriteStream: &storagepb.WriteStream{
			Type: storagepb.WriteStream_COMMITTED,
		},
	})
	if err != nil {
		return fmt.Errorf("failed to create write stream: %v", err)
	}

	appendStream, err := wc.AppendRows(ctx)
	if err != nil {
		slog.Error("Failed to create AppendRows stream", "error", err)
		return fmt.Errorf("failed to create AppendRows stream: %v", err)
	}

	// Write to BigQuery and handle response synchronously
	err = writeToBQClient(schema, []arrow.Record{record}, stream.Name, appendStream)
	if err != nil {
		slog.Error("failed to write to BigQuery client", "error", err)
		return fmt.Errorf("failed to write to BigQuery client: %v", err)
	}

	// Receive responses
	for {
		resp, err := appendStream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			slog.Error("Failed to receive AppendRows response", "error", err)
			return err
		}
		if resp.GetError() != nil {
			slog.Error("BigQuery error", "error", resp.GetError())
			return fmt.Errorf("bigquery error: %v", resp.GetError())
		}
	}
	return nil
}

func setupTable(ctx context.Context, bc *bigquery.Client) error {
	schema := bigquery.Schema{
		{Name: "c0", Type: bigquery.StringFieldType},
	}

	_ = bc.Dataset(datasetID).Table(tableID).Create(ctx, &bigquery.TableMetadata{
		Schema: schema,
	})
	return nil
}

func main() {
	ctx := context.Background()

	bc, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("failed to setup bigquery: %v", err)
	}

	err = setupTable(ctx, bc)
	if err != nil {
		log.Fatalf("failed to setup table: %v", err)
	}

	wc, err := storage.NewBigQueryWriteClient(ctx)
	if err != nil {
		log.Fatalf("failed to setup bigquery write API client: %v", err)
	}

	err = insertData(ctx, wc, projectID, datasetID, tableID)
	if err != nil {
		log.Fatalf("failed to insert data: %v", err)
	}
}

The important code with the workaround is here:

	recordData := recordBuf.Bytes()
	// Workaround to skip schema data
	// currently, ipc.Writer writes the schema data to the header of the file
	// which makes BigQuery Write API to reject it, as it wants just RecordBatches
	// this reads the size of the schema payload minus the 8 bytes of EOS content
	recordData = recordData[len(schemaData)-8:]

	slog.Info("Schema IPC data size", "bytes", len(schemaData))
	slog.Info("Record IPC data size", "bytes", len(recordData))

I'll follow up by opening a discussion and/or PR on the arrow-go repository about having the option to skip serializing the schema.

alvarowolfx avatar Jun 23 '25 20:06 alvarowolfx

Draft PR on arrow-go, need more work, but can at least start some discussions. https://github.com/apache/arrow-go/pull/421

alvarowolfx avatar Jun 23 '25 21:06 alvarowolfx

Thanks a lot @alvarowolfx ❤️, I am sorry I didn't get notification for this. I will give it a try.

tushartg avatar Jul 20 '25 06:07 tushartg