go-crate icon indicating copy to clipboard operation
go-crate copied to clipboard

Bulkservice proposal

Open notzippy opened this issue 9 years ago • 3 comments

I noticed the bulk services is not implemented, I am not sure if you wanted add support for it or not, but I thought I would toss out a possible implementation.

package crate

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
    "reflect"
    "sync"
    "github.com/herenow/go-crate"
)

type (
    CrateBulkDriver struct {
        Url    string // Crate http endpoint url
        client *http.Client
        lock   sync.Mutex
    }
    CrateService struct {
        Statement string           `json:"stmt"`
        BulkArgs  [][]interface{}  `json:"bulk_args"`
        Fields    []string         `json:"-"`
        Table     string           `json:"-"`
        Driver    *CrateBulkDriver `json:"-"`
    }
    InsertService struct {
        CrateService
    }
    BulkResponse struct {
        Error    struct {
                    Message string
                    Code    int
                } `json:"error"`
        Cols     []string             `json:"cols"`
        Duration float64              `json:"duration"`
        Results  []BulkResponseResult `json:"results"`
    }
    BulkResponseResult struct {
        Rowcount int `json:"rowcount"`
    }
)

// Init a new "Connection" to a Crate Data Storage instance.
// Note that the connection is not tested until the first query.
func NewBulkConnection(crate_url string) (c *CrateBulkDriver, e error) {
    c = &CrateBulkDriver{
        lock: sync.Mutex{},
    }
    u, err := url.Parse(crate_url)

    if err != nil {
        return nil, err
    }

    sanUrl := fmt.Sprintf("%s://%s", u.Scheme, u.Host)

    c.Url = sanUrl
    c.client = &http.Client{}

    return c, nil
}

// Creates a new insert service for crate, using the table and fields
func (driver *CrateBulkDriver) NewInsertService(table string, fields ...string) (i *InsertService) {
    buffer := &bytes.Buffer{}
    values := &bytes.Buffer{}
    fmt.Fprintf(buffer, "INSERT INTO %s (", table)
    fmt.Fprintf(values, "VALUES (")
    first := true
    for _, field := range fields {
        if !first {
            buffer.WriteString(", ")
            values.WriteString(", ")
        } else {
            first = false
        }
        buffer.WriteString(field)
        values.WriteString("?")
    }
    buffer.WriteString(") ")
    values.WriteString(") ")
    buffer.Write(values.Bytes())
    i = &InsertService{
        CrateService{
            Statement: string(buffer.Bytes()),
            Table:     table,
            Fields:    fields,
            Driver:    driver,
        },
    }
    return i
}

// Add a specfic interface to the bulk args, could blow up if field name does not exist
func (service *InsertService) Add(data interface{}) {
    // Simplest addition
    orderedValues := make([]interface{}, len(service.Fields))
    for i, v := range service.Fields {
        orderedValues[i] = reflect.ValueOf(data).Elem().FieldByName(v).Interface()
    }
    service.BulkArgs = append(service.BulkArgs, orderedValues)

}

// Use the map to add a bulk arguement
func (service *InsertService) AddMap(data map[string]interface{}) {
    // Simplest addition
    orderedValues := make([]interface{}, len(service.Fields))
    for i, v := range service.Fields {
        orderedValues[i] = data[v]
    }
    service.BulkArgs = append(service.BulkArgs, orderedValues)
}

// Perform the bulk operation on this service
func (service *CrateService) Do() (*BulkResponse, error) {
    service.Driver.lock.Lock()
    defer service.Driver.lock.Unlock()
    res := &BulkResponse{}
    if len(service.BulkArgs) == 0 {
        return res, nil
    }
    endpoint := service.Driver.Url + "/_sql?pretty"

    buf, err := json.Marshal(service)

    if err != nil {
        return nil, err
    }

    data := bytes.NewReader(buf)

    resp, err := service.Driver.client.Post(endpoint, "application/json", data)
    if err != nil {
        return nil, err
    }

    defer resp.Body.Close()
    d := json.NewDecoder(resp.Body)

    // We need to set this, or long integers will be interpreted as floats
    d.UseNumber()

    err = d.Decode(res)

    //fmt.Printf("\n\n response: %#v \n",res)

    if err != nil {
        return nil, err
    }

    //  // Check for db errors
    if res.Error.Code != 0 {
        err = &crate.CrateErr{
            Code:    res.Error.Code,
            Message: res.Error.Message,
        }
        return nil, err
    }

    return res, nil
}

I havent done the update or delete services but as you can see they would be pretty easy to add. The only extra thing I could see is adding a function to BulkResponse to check to see if any of the results returned a -1...

thoughts ??

notzippy avatar Dec 11 '15 21:12 notzippy

We could add a bulk package to this project, I was unsure if I wanted to maintain extra packages in this project, such as the blob package. But I think this is the way to go.

We could probably use the database/sql package and return Rows in the BulkResponse.

herenow avatar Dec 14 '15 13:12 herenow

I also need this feature.

0i avatar Aug 01 '17 10:08 0i

Any updates on this?

kiura24metrics avatar Aug 28 '18 16:08 kiura24metrics