PoC for Async streams support in Tyk (Kafka / MQTT / AMQP etc.) [DO NOT MERGE]
User description
Description
This PoC embedds the rather excellent Benthos streams server into Tyk, and wraps it's control API into Tyk's control API.
Benthos is a highly-robust stream processor that can handle Kafka, AMQP, MQTT, Web sockets, Unix sockets, HTTP endpoints and much more as inputs and outputs, and has a highly capable, and pluggable middleware system to enable transformations, rate limiting, introspection etc.
This PoC, when run, will spin up Tyk as normal, as well as a Benthos server (all aprt of the same binary, no separate processes here) in streams mode. It will then scan the streams/active folder for stream configurations and load them into the server.
You can use the Tyk API endpoints under /tyk/streams/ to add, update, and remove streams dynamically, they take as input the same yaml config format that benthos uses (it's a straight pass-through). All configs are written to disk so if you restart, they are reloaded - just like apiDefs.
It's worth noting that Tyk speaks to Benthos via it's REST API, which has pretty weak security, it's bound to localhost, and it does support basic auth, but I didn't have time to implement or experiment fully.
In this shape, the benthos capability and Tyk capability are still very decoupled, I think we need some kind of benthos plugin to hook Tyk APIs as an input to the benthos server (so you can do HTTP -> Event processing), as well as potentially the other way around (event -> websocket or event -> SSE) so that you can use traditional APIM on those endpoints.
Also required: a dashboard loader (should not be too hard).
As this server handles streams dynamically, it doesn't need hot-reload functionality so that complexity is managed.
Short video of it working: https://www.loom.com/share/9a1fb36840ca40059ab129a41e1bf68c?sid=9218a202-d5bc-4e02-91e3-2652d8e3ceae
API usage:
Create a stream (like in the video):
(Use PUT to update an existing stream, and GET on this URI to fetch a stream)
curl --request POST \
--url http://localhost:80/tyk/streams/test \
--header 'content-type: text/plain' \
--header 'x-tyk-authorization: 352d20ee67be67f6340b4c0605b044b7' \
--data '---
input:
kafka:
consumer_group: "main"
addresses:
- localhost:9092
topics:
- my_topic
pipeline:
threads: 1
processors:
- mapping: root = content().uppercase()
output:
label: "socket"
socket:
network: "unix"
address: /tmp/benthos1.sock
codec: lines
'
List Streams:
curl --request GET \
--url http://localhost:80/tyk/streams \
--header 'x-tyk-authorization: 352d20ee67be67f6340b4c0605b044b7'
Delete Stream:
curl --request DELETE \
--url http://localhost:80/tyk/streams/test \
--header 'content-type: text/plain' \
--header 'x-tyk-authorization: 352d20ee67be67f6340b4c0605b044b7'
PR Description updated to latest commit (https://github.com/TykTechnologies/tyk/commit/e38312551d2c7cac4b741d41700e1d02e0b7342e)
API Changes
--- prev.txt 2024-05-16 15:34:54.530972937 +0000
+++ current.txt 2024-05-16 15:34:47.158905916 +0000
@@ -3154,12 +3154,6 @@
// TransformResponseHeaders contains the configurations related to API level response header transformation.
// Tyk classic API definition: `global_response_headers`/`global_response_headers_remove`.
TransformResponseHeaders *TransformHeaders `bson:"transformResponseHeaders,omitempty" json:"transformResponseHeaders,omitempty"`
-
- // ContextVariables contains the configuration related to Tyk context variables.
- ContextVariables *ContextVariables `bson:"contextVariables,omitempty" json:"contextVariables,omitempty"`
-
- // TrafficLogs contains the configurations related to API level log analytics.
- TrafficLogs *TrafficLogs `bson:"trafficLogs,omitempty" json:"trafficLogs,omitempty"`
}
Global contains configuration that affects the whole API (all endpoints).
@@ -3993,6 +3987,9 @@
//
// Tyk classic API definition: `event_handlers`
EventHandlers EventHandlers `bson:"eventHandlers,omitempty" json:"eventHandlers,omitempty"`
+
+ // ContextVariables contains the configuration related to Tyk context variables.
+ ContextVariables *ContextVariables `bson:"contextVariables,omitempty" json:"contextVariables,omitempty"`
}
Server contains the configuration that sets Tyk up to receive requests from
the client applications.
@@ -4219,19 +4216,6 @@
func (i *TrackEndpoint) Fill(meta apidef.TrackEndpointMeta)
Fill fills *TrackEndpoint receiver with data from apidef.TrackEndpointMeta.
-type TrafficLogs struct {
- // Enabled enables traffic log analytics for the API.
- // Tyk classic API definition: `do_not_track`.
- Enabled bool `bson:"enabled" json:"enabled"`
-}
- TrafficLogs holds configuration about API log analytics.
-
-func (t *TrafficLogs) ExtractTo(api *apidef.APIDefinition)
- ExtractTo extracts *TrafficLogs into *apidef.APIDefinition.
-
-func (t *TrafficLogs) Fill(api apidef.APIDefinition)
- Fill fills *TrafficLogs from apidef.APIDefinition.
-
type TransformBody struct {
// Enabled activates transform request/request body middleware.
Enabled bool `bson:"enabled" json:"enabled"`
@@ -4573,6 +4557,36 @@
func (x *XTykAPIGateway) Fill(api apidef.APIDefinition)
Fill fills *XTykAPIGateway from apidef.APIDefinition.
+# Package: ./benthos-client
+
+package benthosClient // import "github.com/TykTechnologies/tyk/benthos-client"
+
+
+TYPES
+
+type Client struct {
+ BaseURL string
+ Username string
+ Password string
+ HTTPClient *http.Client
+}
+
+func NewClient(baseURL, username, password string) *Client
+
+func (c *Client) CreateStream(streamID string, config []byte) (map[string]interface{}, error)
+
+func (c *Client) DeleteStream(streamID string) (map[string]interface{}, error)
+
+func (c *Client) GetReady() (map[string]interface{}, error)
+
+func (c *Client) GetStream(streamID string) (map[string]interface{}, error)
+
+func (c *Client) GetStreams() (map[string]interface{}, error)
+
+func (c *Client) PatchStream(streamID string, config []byte) (map[string]interface{}, error)
+
+func (c *Client) UpdateStream(streamID string, config []byte) (map[string]interface{}, error)
+
# Package: ./certs
package certs // import "github.com/TykTechnologies/tyk/certs"
@@ -5831,7 +5845,7 @@
// EnableFixedWindow enables fixed window rate limiting.
EnableFixedWindowRateLimiter bool `json:"enable_fixed_window_rate_limiter"`
- // Redis based rate limiter with sliding log. Provides 100% rate limiting accuracy, but require two additional Redis roundtrip for each request.
+ // Redis based rate limiter with fixed window. Provides 100% rate limiting accuracy, but require two additional Redis roundtrip for each request.
EnableRedisRollingLimiter bool `json:"enable_redis_rolling_limiter"`
// To enable, set to `true`. The sentinel-based rate limiter delivers a smoother performance curve as rate-limit calculations happen off-thread, but a stricter time-out based cool-down for clients. For example, when a throttling action is triggered, they are required to cool-down for the period of the rate limit.
@@ -7106,11 +7120,8 @@
const (
// QuotaKeyPrefix serves as a standard prefix for generating quota keys.
QuotaKeyPrefix = "quota-"
-
- // RateLimitKeyPrefix serves as a standard prefix for generating rate limiter keys.
- RateLimitKeyPrefix = rate.LimiterKeyPrefix
-
- // SentinelRateLimitKeyPostfix is appended to the rate limiting key to combine into a sentinel key.
+ // RateLimitKeyPrefix serves as a standard prefix for generating rate limit keys.
+ RateLimitKeyPrefix = "rate-limit-"
SentinelRateLimitKeyPostfix = ".BLOCKED"
)
const (
@@ -7996,6 +8007,8 @@
type Gateway struct {
DefaultProxyMux *proxyMux
+ StreamClient *benthosClient.Client
+
DRLManager *drl.DRL
Analytics RedisAnalyticsHandler
@@ -8109,6 +8122,8 @@
func (gw *Gateway) LoadSampleAPI(def string) (spec *APISpec)
+func (gw *Gateway) LoadStreamsFromDisk() error
+
func (gw *Gateway) MarshalJSON() ([]byte, error)
func (gw *Gateway) MonitorApplicationInstrumentation()
@@ -8140,6 +8155,12 @@
func (gw *Gateway) SetupNewRelic() (app newrelic.Application)
SetupNewRelic creates new newrelic.Application instance
+func (gw *Gateway) StartStreamServer()
+
+func (gw *Gateway) StreamHandler(w http.ResponseWriter, r *http.Request)
+
+func (gw *Gateway) StreamListHandler(w http.ResponseWriter, r *http.Request)
+
func (gw *Gateway) TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec, logger *logrus.Entry) *ReverseProxy
TykNewSingleHostReverseProxy returns a new ReverseProxy that rewrites URLs
to the scheme, host, and base path provided in target. If the target's path
@@ -9709,7 +9730,7 @@
func (l *SessionLimiter) Context() context.Context
-func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, api *APISpec, dryRun bool) sessionFailReason
+func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason
ForwardMessage will enforce rate limiting, returning a non-zero
sessionFailReason if session limits have been exceeded. Key values to manage
rate are Rate and Per, e.g. Rate of 10 messages Per 10 seconds
@@ -11349,6 +11370,23 @@
func (v *Vault) Get(key string) (string, error)
+# Package: ./streams
+
+package streams // import "github.com/TykTechnologies/tyk/streams"
+
+
+TYPES
+
+type Server struct {
+ // Has unexported fields.
+}
+
+func New() *Server
+
+func (s *Server) Start()
+
+func (s *Server) Stop()
+
# Package: ./tcp
package tcp // import "github.com/TykTechnologies/tyk/tcp"
PR Review 🔍
| ⏱️ Estimated effort to review [1-5] |
4, due to the integration of a new system (Benthos) within Tyk, the complexity of handling multiple stream protocols, and the security implications of the new endpoints. The PR involves significant changes across multiple files, including new API endpoints and background services, which increases the review complexity. |
| 🧪 Relevant tests |
No |
| ⚡ Possible issues |
Possible Bug: The |
|
Error Handling: In | |
| 🔒 Security concerns |
- Sensitive information exposure: The configuration in |
Code feedback:
| relevant file | gateway/streams_api.go |
| suggestion |
Handle unsupported HTTP methods in |
| relevant line | switch r.Method { |
| relevant file | gateway/streams_api.go |
| suggestion |
Modify the error handling in |
| relevant line | return err, http.StatusInternalServerError |
| relevant file | streams/config.yaml |
| suggestion |
Consider enabling basic authentication and ensure that the password is not stored in a configuration file as a hash. Use environment variables or a secure vault solution to handle sensitive credentials. This change is crucial to prevent unauthorized access and potential leakage of sensitive information. [important] |
| relevant line | enabled: false |
PR Code Suggestions ✨
| Category | Suggestion | Score |
| Security |
Enhance the security of password storageTo improve security, consider encrypting the password stored in the benthos-client/benthos_client.go [22]
Suggestion importance[1-10]: 8Why: Storing passwords in plain text within a struct can lead to security vulnerabilities. Encrypting the password or using a more secure authentication method is a valid and important suggestion. | 8 |
Add rate limiting to stream handling endpoints to prevent abuseImplement rate limiting or other protective measures in the
Suggestion importance[1-10]: 8Why: Adding rate limiting to protect endpoints from abuse is a crucial security measure, especially for public-facing APIs. This suggestion is highly relevant and would help in safeguarding the system against potential abuse. | 8 | |
| Maintainability |
Improve error logging for better traceabilityInstead of printing the error directly in the benthos-client/benthos_client.go [55]
Suggestion importance[1-10]: 7Why: The suggestion to improve error logging for better traceability is valid. Using structured logging instead of simple print statements can greatly enhance debugging and operational monitoring. | 7 |
:boom: CI tests failed :see_no_evil:
git-state
diff --git a/gateway/server.go b/gateway/server.go
index 8de5bac..3b3513c 100644
--- a/gateway/server.go
+++ b/gateway/server.go
@@ -32,7 +32,6 @@ import (
"github.com/TykTechnologies/tyk/streams"
"github.com/TykTechnologies/tyk/test"
- benthosClient "github.com/TykTechnologies/tyk/benthos-client"
logstashHook "github.com/bshuster-repo/logrus-logstash-hook"
"github.com/evalphobia/logrus_sentry"
graylogHook "github.com/gemnasium/logrus-graylog-hook"
@@ -42,6 +41,8 @@ import (
"github.com/sirupsen/logrus"
logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
+ benthosClient "github.com/TykTechnologies/tyk/benthos-client"
+
"github.com/TykTechnologies/tyk/internal/uuid"
"github.com/TykTechnologies/again"
Please look at the run or in the Checks tab.