go-crate
go-crate copied to clipboard
Bulkservice proposal
I noticed the bulk services is not implemented, I am not sure if you wanted add support for it or not, but I thought I would toss out a possible implementation.
package crate
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"reflect"
"sync"
"github.com/herenow/go-crate"
)
type (
CrateBulkDriver struct {
Url string // Crate http endpoint url
client *http.Client
lock sync.Mutex
}
CrateService struct {
Statement string `json:"stmt"`
BulkArgs [][]interface{} `json:"bulk_args"`
Fields []string `json:"-"`
Table string `json:"-"`
Driver *CrateBulkDriver `json:"-"`
}
InsertService struct {
CrateService
}
BulkResponse struct {
Error struct {
Message string
Code int
} `json:"error"`
Cols []string `json:"cols"`
Duration float64 `json:"duration"`
Results []BulkResponseResult `json:"results"`
}
BulkResponseResult struct {
Rowcount int `json:"rowcount"`
}
)
// Init a new "Connection" to a Crate Data Storage instance.
// Note that the connection is not tested until the first query.
func NewBulkConnection(crate_url string) (c *CrateBulkDriver, e error) {
c = &CrateBulkDriver{
lock: sync.Mutex{},
}
u, err := url.Parse(crate_url)
if err != nil {
return nil, err
}
sanUrl := fmt.Sprintf("%s://%s", u.Scheme, u.Host)
c.Url = sanUrl
c.client = &http.Client{}
return c, nil
}
// Creates a new insert service for crate, using the table and fields
func (driver *CrateBulkDriver) NewInsertService(table string, fields ...string) (i *InsertService) {
buffer := &bytes.Buffer{}
values := &bytes.Buffer{}
fmt.Fprintf(buffer, "INSERT INTO %s (", table)
fmt.Fprintf(values, "VALUES (")
first := true
for _, field := range fields {
if !first {
buffer.WriteString(", ")
values.WriteString(", ")
} else {
first = false
}
buffer.WriteString(field)
values.WriteString("?")
}
buffer.WriteString(") ")
values.WriteString(") ")
buffer.Write(values.Bytes())
i = &InsertService{
CrateService{
Statement: string(buffer.Bytes()),
Table: table,
Fields: fields,
Driver: driver,
},
}
return i
}
// Add a specfic interface to the bulk args, could blow up if field name does not exist
func (service *InsertService) Add(data interface{}) {
// Simplest addition
orderedValues := make([]interface{}, len(service.Fields))
for i, v := range service.Fields {
orderedValues[i] = reflect.ValueOf(data).Elem().FieldByName(v).Interface()
}
service.BulkArgs = append(service.BulkArgs, orderedValues)
}
// Use the map to add a bulk arguement
func (service *InsertService) AddMap(data map[string]interface{}) {
// Simplest addition
orderedValues := make([]interface{}, len(service.Fields))
for i, v := range service.Fields {
orderedValues[i] = data[v]
}
service.BulkArgs = append(service.BulkArgs, orderedValues)
}
// Perform the bulk operation on this service
func (service *CrateService) Do() (*BulkResponse, error) {
service.Driver.lock.Lock()
defer service.Driver.lock.Unlock()
res := &BulkResponse{}
if len(service.BulkArgs) == 0 {
return res, nil
}
endpoint := service.Driver.Url + "/_sql?pretty"
buf, err := json.Marshal(service)
if err != nil {
return nil, err
}
data := bytes.NewReader(buf)
resp, err := service.Driver.client.Post(endpoint, "application/json", data)
if err != nil {
return nil, err
}
defer resp.Body.Close()
d := json.NewDecoder(resp.Body)
// We need to set this, or long integers will be interpreted as floats
d.UseNumber()
err = d.Decode(res)
//fmt.Printf("\n\n response: %#v \n",res)
if err != nil {
return nil, err
}
// // Check for db errors
if res.Error.Code != 0 {
err = &crate.CrateErr{
Code: res.Error.Code,
Message: res.Error.Message,
}
return nil, err
}
return res, nil
}
I havent done the update or delete services but as you can see they would be pretty easy to add. The only extra thing I could see is adding a function to BulkResponse to check to see if any of the results returned a -1...
thoughts ??
We could add a bulk
package to this project, I was unsure if I wanted to maintain extra packages in this project, such as the blob
package. But I think this is the way to go.
We could probably use the database/sql package and return Rows in the BulkResponse.
I also need this feature.
Any updates on this?