connect
connect copied to clipboard
Salesforce Object Query Language (SOQL) support
I'd like to suggest adding support for Salesforce Object Query Language (SOQL) as an input/output. I'm willing to take a stab at implementing this if I get the spare time to do it.
Hey @loicalleyne, that sounds useful! I took a stab at using https://github.com/simpleforce/simpleforce for an internal demo, but I don't have much experience with Salesforce, so I didn't spend much time refining it. I'm also not sure what kind of SOQL queries people would like to run, so I hope you have some more familiarity with this to test it. Here's the demo code I put together:
package input
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/simpleforce/simpleforce"
)
var salesforceInputConfigSpec = service.NewConfigSpec().
Summary("Creates an input that reads Salesforce data.").
Field(service.NewStringField("url")).
Field(service.NewStringField("user")).
Field(service.NewStringField("password")).
Field(service.NewStringField("token")).
Field(service.NewStringField("query"))
func newSalesforceInput(conf *service.ParsedConfig) (service.Input, error) {
input := salesforceInput{}
var err error
if input.url, err = conf.FieldString("url"); err != nil {
return nil, err
}
if _, err = url.ParseRequestURI(input.url); err != nil {
return nil, fmt.Errorf("invalid URL %q: %s", input.url, err)
}
if input.user, err = conf.FieldString("user"); err != nil {
return nil, err
}
if input.password, err = conf.FieldString("password"); err != nil {
return nil, err
}
if input.token, err = conf.FieldString("token"); err != nil {
return nil, err
}
if input.query, err = conf.FieldString("query"); err != nil {
return nil, err
}
return service.AutoRetryNacks(&input), nil
}
func init() {
err := service.RegisterInput(
"salesforce_select", salesforceInputConfigSpec,
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
return newSalesforceInput(conf)
})
if err != nil {
panic(err)
}
}
//------------------------------------------------------------------------------
type salesforceInput struct {
url string
user string
password string
token string
query string
client *simpleforce.Client
iterator *simpleforce.QueryResult
currentRecord int
}
func (si *salesforceInput) Connect(ctx context.Context) error {
si.client = simpleforce.NewClient(si.url, simpleforce.DefaultClientID, simpleforce.DefaultAPIVersion)
if si.client == nil {
return errors.New("failed to create client")
}
err := si.client.LoginPassword(si.user, si.password, si.token)
if err != nil {
return fmt.Errorf("failed to login: %s", err)
}
si.iterator, err = si.client.Query(si.query) // Note: for Tooling API, use client.Tooling().Query(q)
if err != nil {
return fmt.Errorf("failed to run SOQL query: %s", err)
}
si.currentRecord = 0
return nil
}
func (si *salesforceInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
if si.currentRecord >= len(si.iterator.Records) {
return nil, nil, service.ErrEndOfInput
}
record := si.iterator.Records[si.currentRecord]
b, err := json.Marshal(record)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal record: %s", err)
}
si.currentRecord++
return service.NewMessage(b), func(ctx context.Context, err error) error {
// Nacks are retried automatically when we use service.AutoRetryNacks
return nil
}, nil
}
func (si *salesforceInput) Close(ctx context.Context) error {
return nil
}
Note that the above code is quite crude and one might want to add an args_mapping
field similar to the sql_*
components, such as https://www.benthos.dev/docs/components/inputs/sql_select#args_mapping. I guess it's also missing pagination...
Also, this simpleforce library doesn't do certain things properly, like, for example, it emits logs using the log
package from the standard library, which I think can't be supressed.
Finally, it would be nice to have a few integration tests for it. This Salesforce API is HTTP-based (not sure if they have any alternative ones), so maybe the https://pkg.go.dev/net/http/httptest package can help.
How should a package be emitting logs in a way that makes it ok to use in Benthos?
Someone submitted an issue last year to add an option to disable logging in that package: [https://github.com/simpleforce/simpleforce/issues/35]
How should a package be emitting logs in a way that makes it ok to use in Benthos?
Have a look at any of the existing implementations under internal/impl
. For example here. You can see how that logger
object is hooked up.
Someone submitted an issue last year to add an option to disable logging in that package: [https://github.com/https://github.com/simpleforce/simpleforce/issues/35]
Yeah, the package doesn't seem to be great or well-maintained. It also doesn't contain that much code, so I wonder if it's really useful to import it. It might be better to leverage the Benthos http_client
directly and write templates, similar to how the Discord input is implemented.
☹️ Looked at the simpleforce code again, Salesforce has yet another custom auth flow which doesn't seem to fit with the current http_client auth, templates alone might not make this work out of the box.
This is turning into another one of those 'feature request to enable another feature request'...
What about a custom auth option for http_client that lets you define a stream within a stream to go retrieve tokens in a custom auth flow and put them in auth variables accessible to the http_client? Maybe two config options, auth flow and refresh flow. That could remove a lot of the complexity of working with non-standard or rarely used auth flows.
What about a custom auth option for http_client that lets you define a stream within a stream to go retrieve tokens in a custom auth flow and put them in auth variables accessible to the http_client? Maybe two config options, auth flow and refresh flow.
Could that be done perhaps using a separate http_client
and using an in-memory cache to store the auth variables? Then before calling the main http_client
, one would first query this cache for the auth variables in a branch processor, place them in the metadata (or, if they don't exist, trigger this auxiliary http_client
and then cache then place them in the metadata and cache them) and then call the main http_client
? If not, I guess some advanced fields can be added, but it would be best to check with @Jeffail as well before putting in the effort.
Is what you're thinking of a sequence input of http_client with one input getting put in variable then discarded? How would that work with tokens that expire while the stream is still running?
No, it doesn't require a sequence input and you can leverage the memory
cache default_ttl
to expire tokens. Here's how I'd do it:
input:
generate:
interval: 10s # Trigger the pipeline every 10s
mapping: root = ""
count: 0 # Never stop
processors:
- cache: # Try to fetch the auth token from the cache
resource: token_cache
operator: get
key: token
- catch:
- http: {} # Fetch the auth token from the API
- catch:
- log:
level: WARN
message: "Failed to get token: ${! error() }"
- bloblang: root = deleted() # Prevent the rest of the pipeline from running
- mapping: |
meta token = ... # Do some processing to extract the actual token and place it in the metadata
- branch:
request_map: |
root = meta("token")
processors:
- cache: # Cache the current auth token
resource: token_cache
operator: set
key: token
value: "${! content() }"
- http: {} # Fetch some data using meta("token")
output:
stdout: {}
cache_resources:
- label: token_cache
memory:
default_ttl: 1h # Expire auth tokens after 1h
Note that the above is just a sketch config and it might have bugs...