quackpipe icon indicating copy to clipboard operation
quackpipe copied to clipboard

Create a mechanism to write parquet files into a specific hard drive folder and merge them in a while

Open akvlad opened this issue 6 months ago • 2 comments

How

Create a service implementing the following interface and helper types

type Table struct {
	Name    string
	Path    string
	Fields  [][2]string // field name and type
	OrderBy []string
}

type IMergeTree interface {
	Store(table *Table, columns []string, data []any) error
	Merge(table *Table) error
}

The method Store(table *Table, columns []string, data []any) error description

  • table - table to store
  • columns - column names for the data
  • data - array of arrays for each column:
    • UInt64 column type should be []uint64 in the data arg
    • Int64 column type should be []int64 in the data arg
    • String column type should be []string or [][]byte in the data arg
    • Float64 column type should be []string or [][]byte in the data arg

The Store method should

  • check that columns length, data length and table.Fields length are the same
  • check that columns enumerate all the table.Fields ehtries
  • check if the data entries types are legit according to description above
  • check if all the data entries have the same size
  • save the data as a .parquet file into the table.Path/data folder.

The Merge method should

  • enumerate all the files in the directory
  • select the files to merge so the resulting file size is (approximately) less than 4G
  • generate a DuckDB request to merge the planned .parquet files into one .parquet file inside table.Path/tmp folder
  • the request should have OrderBy expression according to the table.OrderBy field
  • delete the source .parquet files and move the resulting .parquet file into the table.Path/data folder

Testing

The following request should create a parquet file

    var mt IMergeTree = mt
	mt.Store(&Table{
		Name:    "example",
		Path:    "/tmp/example",
		Fields:  [][2]string{{"timestamp", "UInt64"}, {"str", "String"}, {"value", "Float64"}},
		OrderBy: []string{"timestamp"},
	}, []string{"timestamp", "str", "value"}, []any{
		[]uint64{1628596000, 1628596001, 1628596002},
		[]string{"a", "b", "c"},
		[]float64{1.1, 2.2, 3.3},
	})

Create a set of unit tests for the positive scenario and several negative scenarios:

  • data entries have the invalid type
  • data entries are not of the same size
  • data size is less than columns size
  • columns size is not equal to the table.Fields size

akvlad avatar Aug 13 '24 10:08 akvlad