sarama icon indicating copy to clipboard operation
sarama copied to clipboard

The client is not authorized to access this topic

Open aethir-paas opened this issue 1 year ago • 1 comments

Description

When I use SASL OAUTHBEARER authentication, is there an internal mechanism to automatically refresh the token? Currently, my service encounters the error: "The client is not authorized to access this topic."

Versions
Sarama Kafka Go
v1.43.2 v3.5.1 V1.19
Configuration
type accessTokenProvider struct {
	awsRegion   string
	awsProfiles string
	token       string
	mu          sync.RWMutex
}

func (m *accessTokenProvider) Token() (*sarama.AccessToken, error) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	var err error
	if m.awsProfiles == "" {
		m.token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
		return &sarama.AccessToken{Token: m.token}, err
	} else {
		m.token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
		return &sarama.AccessToken{Token: m.token}, err
	}
}

func (m *accessTokenProvider) RefreshToken() {
	for {
		var token string
		var err error
		if m.awsProfiles == "" {
			token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
		} else {
			token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
		}
		if err != nil {
			fmt.Println("RefreshToken error :", err)
		}

		m.mu.Lock()
		m.token = token
		m.mu.Unlock()

		// 等待令牌到期前的一段时间再刷新
		time.Sleep(10 * time.Minute)
	}
}

func InitConfig(awsRegion string, awsProfiles string, SASL, TLS bool) *sarama.Config {
	/*err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}
	*/
	/*awsRegion, hasEnvRegion := os.LookupEnv(awsRegion)
	if !hasEnvRegion {
		log.Fatal("AWS_REGION environment variable not set")
	}*/

	configure := sarama.NewConfig()
	if SASL {
		configure.Net.SASL.Enable = true
		configure.Net.SASL.Mechanism = sarama.SASLTypeOAuth

		tokenProvider := &accessTokenProvider{
			awsRegion:   awsRegion,
			awsProfiles: awsProfiles,
		}
		configure.Net.SASL.TokenProvider = tokenProvider
		go func() {
			tokenProvider.RefreshToken()
		}()
	}

	if TLS {
		configure.Net.TLS.Enable = true
		configure.Net.TLS.Config = &tls.Config{}
	}
	configure.Consumer.Offsets.Initial = sarama.OffsetOldest
	return configure
}


Logs
The client is not authorized to access this topic

2024-08-26 20:00:00.528033078 +0000 UTC m=+43107.359917041 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

2024-08-26 20:00:01.528134796 +0000 UTC m=+43108.360018739 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

2024-08-26 20:00:02.867247119 +0000 UTC m=+43109.699131083 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

2024-08-26 20:00:05.02019884 +0000 UTC m=+43111.852082706 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

Additional Context

I reviewed the relevant documentation, and it seems that when the producer sends a message, it triggers authenticateViaSASLv1, which retrieves the latest token through an interface class. However, this behavior is not as I expected. Currently, the authentication fails periodically after some time. I’m not sure what internal mechanism could be used to refresh the token automatically.

aethir-paas avatar Aug 27 '24 04:08 aethir-paas

Broker has per listener config that forces clients to re-authenticate: https://kafka.apache.org/documentation/#brokerconfigs_connections.max.reauth.ms By default it is off.

JunliWang avatar Sep 28 '24 00:09 JunliWang

@aethir-paas as per above, Sarama will only send the token for authentication when requested by the broker, which by default is only on initial connection / re-connection. Sarama calls your Token() method, which in your implementation appears to generate a new token every time anyway, so there is nothing to refresh. You might want to improve that by caching your issued credentials and only refreshing them when needed rather than always calling signer.GenerateAuthToken on every request. However, that authentication step only requires the token be valid at the point of authentication

The error message 'The client is not authorized to access this topic' relates to topic authorisation rather than authentication, which is handled by the server-side broker configuration. Normally this uses Kafka ACLs or if you're using a Cloud provider then potentially some custom authorisation logic. I wouldn't expect your identity to change when the authentication token is refreshed, but if that is the behaviour then configuring the broker to force re-authentication periodically would account for that too if necessary

dnwe avatar Dec 27 '24 11:12 dnwe

Closing this as assumed to be resolved. Please re-open if this is still a problem for you

dnwe avatar Jan 14 '25 09:01 dnwe