The client is not authorized to access this topic
Description
When I use SASL OAUTHBEARER authentication, is there an internal mechanism to automatically refresh the token? Currently, my service encounters the error: "The client is not authorized to access this topic."
Versions
| Sarama | Kafka | Go |
|---|---|---|
| v1.43.2 | v3.5.1 | V1.19 |
Configuration
type accessTokenProvider struct {
awsRegion string
awsProfiles string
token string
mu sync.RWMutex
}
func (m *accessTokenProvider) Token() (*sarama.AccessToken, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var err error
if m.awsProfiles == "" {
m.token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
return &sarama.AccessToken{Token: m.token}, err
} else {
m.token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
return &sarama.AccessToken{Token: m.token}, err
}
}
func (m *accessTokenProvider) RefreshToken() {
for {
var token string
var err error
if m.awsProfiles == "" {
token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
} else {
token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
}
if err != nil {
fmt.Println("RefreshToken error :", err)
}
m.mu.Lock()
m.token = token
m.mu.Unlock()
// 等待令牌到期前的一段时间再刷新
time.Sleep(10 * time.Minute)
}
}
func InitConfig(awsRegion string, awsProfiles string, SASL, TLS bool) *sarama.Config {
/*err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}
*/
/*awsRegion, hasEnvRegion := os.LookupEnv(awsRegion)
if !hasEnvRegion {
log.Fatal("AWS_REGION environment variable not set")
}*/
configure := sarama.NewConfig()
if SASL {
configure.Net.SASL.Enable = true
configure.Net.SASL.Mechanism = sarama.SASLTypeOAuth
tokenProvider := &accessTokenProvider{
awsRegion: awsRegion,
awsProfiles: awsProfiles,
}
configure.Net.SASL.TokenProvider = tokenProvider
go func() {
tokenProvider.RefreshToken()
}()
}
if TLS {
configure.Net.TLS.Enable = true
configure.Net.TLS.Config = &tls.Config{}
}
configure.Consumer.Offsets.Initial = sarama.OffsetOldest
return configure
}
Logs
The client is not authorized to access this topic
2024-08-26 20:00:00.528033078 +0000 UTC m=+43107.359917041 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic
2024-08-26 20:00:01.528134796 +0000 UTC m=+43108.360018739 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic
2024-08-26 20:00:02.867247119 +0000 UTC m=+43109.699131083 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic
2024-08-26 20:00:05.02019884 +0000 UTC m=+43111.852082706 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic
Additional Context
I reviewed the relevant documentation, and it seems that when the producer sends a message, it triggers authenticateViaSASLv1, which retrieves the latest token through an interface class. However, this behavior is not as I expected. Currently, the authentication fails periodically after some time. I’m not sure what internal mechanism could be used to refresh the token automatically.
Broker has per listener config that forces clients to re-authenticate: https://kafka.apache.org/documentation/#brokerconfigs_connections.max.reauth.ms By default it is off.
@aethir-paas as per above, Sarama will only send the token for authentication when requested by the broker, which by default is only on initial connection / re-connection. Sarama calls your Token() method, which in your implementation appears to generate a new token every time anyway, so there is nothing to refresh. You might want to improve that by caching your issued credentials and only refreshing them when needed rather than always calling signer.GenerateAuthToken on every request. However, that authentication step only requires the token be valid at the point of authentication
The error message 'The client is not authorized to access this topic' relates to topic authorisation rather than authentication, which is handled by the server-side broker configuration. Normally this uses Kafka ACLs or if you're using a Cloud provider then potentially some custom authorisation logic. I wouldn't expect your identity to change when the authentication token is refreshed, but if that is the behaviour then configuring the broker to force re-authentication periodically would account for that too if necessary
Closing this as assumed to be resolved. Please re-open if this is still a problem for you