pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Open Denovo1998 opened this issue 3 years ago • 13 comments

Fixes #17354

Motivation

Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as IncompatibleSchemaException("Topic does not have schema to check").

Modifications

In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe to the topic. Continuing to create a schema consumer to subscribe to the topic will fail.

  • When numActiveConsumers != 0, and check the schema of the currently existing consumers is AUTO_CONSUME schema.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • [ ] doc-required (Your PR needs to update docs and you will update later)

  • [x] doc-not-needed (Please explain why)

  • [ ] doc (Your PR contains doc changes)

  • [ ] doc-complete (Docs have been already added)

Denovo1998 avatar Sep 03 '22 16:09 Denovo1998

Hi @Denovo1998 , we'd better add test for this.

Technoboy- avatar Sep 06 '22 07:09 Technoboy-

Already done @Technoboy-

Denovo1998 avatar Sep 06 '22 12:09 Denovo1998

/pulsarbot rerun-failure-checks

Denovo1998 avatar Oct 02 '22 10:10 Denovo1998

@congbobo184 @mattisonchao PTAL

Denovo1998 avatar Oct 18 '22 01:10 Denovo1998

@codelipenghui @gaoran10 @liangyepianzhou PTAL

Denovo1998 avatar Nov 01 '22 06:11 Denovo1998

What if the producer writes the data but hasn't uploaded the schema yet? https://github.com/apache/pulsar/pull/9853#issuecomment-794646589 #2669

Denovo1998 avatar Nov 01 '22 08:11 Denovo1998

When a consumer with Schema.AUTO_CONSUME subscribes to the topic. The schema of CommandSubscribe is null, so the schema.getType() == SchemaType.AUTO_CONSUME always be false.

liangyepianzhou avatar Nov 01 '22 08:11 liangyepianzhou

The problem is not caused by the check of the AUTO_CONSUME. It is that there are some activeConsumer with AUTO_CONSUME. And then the new consumer connecting with the new other schemas will fail at addSchemaIfIdleOrCheckCompatible with the exception IncompatibleSchemaException("Topic does not have schema to check"). IMO, The check is confusing. If there is no hasSchema, then we should addSchema without other conditions. https://github.com/apache/pulsar/blob/fe1963988fc6883f52826069a781b91aba0405bf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3037

if (hasSchema || (!producers.isEmpty()) || (numActiveConsumers != 0) || (ledger.getTotalSize() != 0)) { return checkSchemaCompatibleForConsumer(schema); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); }

Why did not we change it to

if (hasSchema) { return checkSchemaCompatibleForConsumer(schema); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); }

And store the schema of the byte[]

if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) { // don't set schema for Schema.BYTES si = null; }

change it to

if (si != null && SchemaType.NONE == si.getType()) { // don't set schema for Schema.BYTES si = null; }

liangyepianzhou avatar Nov 01 '22 12:11 liangyepianzhou

Codecov Report

Merging #17449 (ba186d7) into master (69fb3c2) will increase coverage by 36.98%. The diff coverage is 86.66%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #17449       +/-   ##
=============================================
+ Coverage     25.24%   62.22%   +36.98%     
- Complexity      205     3497     +3292     
=============================================
  Files          1668     1844      +176     
  Lines        126201   135563     +9362     
  Branches      13776    14923     +1147     
=============================================
+ Hits          31858    84355    +52497     
+ Misses        89386    43468    -45918     
- Partials       4957     7740     +2783     
Flag Coverage Δ
inttests 24.53% <48.88%> (?)
systests 25.31% <66.66%> (+0.06%) :arrow_up:
unittests 59.48% <86.66%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 70.22% <66.66%> (+41.94%) :arrow_up:
...va/org/apache/pulsar/common/protocol/Commands.java 80.00% <71.42%> (+33.13%) :arrow_up:
...ava/org/apache/pulsar/broker/service/Consumer.java 76.01% <83.33%> (+31.70%) :arrow_up:
...sar/broker/service/persistent/PersistentTopic.java 64.06% <85.71%> (+21.29%) :arrow_up:
...va/org/apache/pulsar/broker/service/ServerCnx.java 55.01% <100.00%> (+24.78%) :arrow_up:
...ache/pulsar/broker/service/SubscriptionOption.java 86.36% <100.00%> (+0.64%) :arrow_up:
...oker/service/nonpersistent/NonPersistentTopic.java 62.96% <100.00%> (+51.13%) :arrow_up:
...e/pulsar/client/impl/schema/AutoConsumeSchema.java 72.95% <100.00%> (+11.26%) :arrow_up:
...lsar/functions/runtime/process/ProcessRuntime.java 47.44% <0.00%> (-2.05%) :arrow_down:
...in/java/org/apache/pulsar/common/api/AuthData.java 71.42% <0.00%> (ø)
... and 1302 more

codecov-commenter avatar Nov 01 '22 12:11 codecov-commenter

https://github.com/apache/pulsar/blob/adae4ae2b06304eae9ec001357e0bd6ee6ffb053/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3030-L3046 I think this method only need to check whether have schema, if doesn't hava schema then add the schema, if have shema then check the compatible. so the code like this

                if (hasSchema) {
                    return checkSchemaCompatibleForConsumer(schema);
                } else {
                    return addSchema(schema).thenCompose(schemaVersion ->
                            CompletableFuture.completedFuture(null));
                }

active producer or active consumer can't affect this method. if not have any active producer or active consumer, the schema also can add success. users will still use it incorrectly. It only blocks some, but not all, which probably doesn't make sense. and current code cannot be guaranteed to be correct in the case of concurrency. or keep current logic or think of a better solution and make it easy @codelipenghui @liangyepianzhou

congbobo184 avatar Nov 01 '22 13:11 congbobo184

The problem is not caused by the check of the AUTO_CONSUME. It is that there are some activeConsumer with AUTO_CONSUME.

Oh, got it.

IMO, The check is confusing. If there is no hasSchema, then we should addSchema without other conditions.

No, if we have active producers or consumers. The new schema might break the existing producer and consumers because they are using the byte[] schema in this case. The main reason is the client side will not upload the byte[] schema.

I think the current fix will break the existing behavior (If there are active consumers with byte[] schema, the new schema should not be added to the schema registry)

The client side will not upload the AUTO_CONSUME. That should be a challenge for us to know if all the active consumers are using the AUTO_CONSUME schema or byte[] schema.

codelipenghui avatar Nov 01 '22 13:11 codelipenghui

No, if we have active producers or consumers. The new schema might break the existing producer and consumers because they are using the byte[] schema in this case. The main reason is the client side will not upload the byte[] schema.

@codelipenghui Yes, I have taken this into consideration as well and have given a suggestion: If there is a byte[] schema for a topic, then we do not hope it has another schema. That is to say, byte[] schema is a valid schema that is exclusive. So we need to store it and mark this topic as hasSchema.

And store the schema of the byte[]

if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) { // don't set schema for Schema.BYTES si = null; }

change it to

if (si != null && SchemaType.NONE == si.getType()) { // don't set schema for Schema.BYTES si = null; }

liangyepianzhou avatar Nov 01 '22 14:11 liangyepianzhou

@liangyepianzhou

I think the code snippet is from the client-side right? Here is a related discussion https://lists.apache.org/thread/3js51tq2p3c3oldfrhprn4kcohx7h1wv

codelipenghui avatar Nov 01 '22 14:11 codelipenghui

I think we should not check the schema compatibility for AUTO_CONSUME and AUTO_PUBLISH schema

https://github.com/apache/pulsar/blob/fe1963988fc6883f52826069a781b91aba0405bf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1135

I think we should change to

if (schema != null && schema.getType() != SchemaType.AUTO_PUBLISH && schema.getType() != SchemaType.AUTO_CONSUME) {
      return topic.addSchemaIfIdleOrCheckCompatible(schema)
              .thenCompose(v -> topic.subscribe(option));
  } else {
      return topic.subscribe(option);
  }

What do you think @congbobo184 @liangyepianzhou

This doesn't fix the bug that pr is trying to fix. For example, if the schema is AVRO, but two AUTO_CONSUME consumers have subscribed to the topic, the schema compatibility will still be checked. Because numActiveConsumers ! = 0 is true, so the active consumers shouldn't only include the AUTO_CONSUME consumers. Of course we should add your code, too. So I made some changes. @codelipenghui @liangyepianzhou @congbobo184 PTAL

Denovo1998 avatar Nov 11 '22 15:11 Denovo1998

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Dec 12 '22 02:12 github-actions[bot]

I think the better way to resolve this bug is to record schemaType in Consumer class:

https://github.com/apache/pulsar/blob/a95bb630d76c3234ef4660d722518e642341ad3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L71-L81

Then when we calculate the active consumer num, we can ignore the consumers without a specific schema:

https://github.com/apache/pulsar/blob/a95bb630d76c3234ef4660d722518e642341ad3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1162-L1173

I can provide another PR to implement it.

labuladong avatar Dec 20 '22 03:12 labuladong

@labuladong When did I say I needed your help?

Denovo1998 avatar Dec 20 '22 04:12 Denovo1998

I have resolve this bug by record SchemaData in Consumer last night.

Denovo1998 avatar Dec 20 '22 04:12 Denovo1998

@congbobo184 PTAL.

Denovo1998 avatar Dec 20 '22 05:12 Denovo1998

@congbobo184 The Commit of my PR is a bit dirty. Should I open a PR and close this?

Denovo1998 avatar Dec 20 '22 08:12 Denovo1998

@congbobo184 The Commit of my PR is a bit dirty. Should I open a PR and close this?

don't need push a new pr, please merge the apache/master to resolve the conflict

congbobo184 avatar Dec 20 '22 08:12 congbobo184

@congbobo184 The conflict has been resolved. What else should i do?

Denovo1998 avatar Dec 20 '22 08:12 Denovo1998

@codelipenghui @congbobo184

  1. https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817 change here to: SchemaInfo si = schema.getSchemaInfo(); if (si != null && SchemaType.NONE == si.getType()) { // don't set schema for Schema.BYTES si = null; } But here it will change BYTE to NONE: https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L641-L651
  2. Then: https://github.com/Denovo1998/pulsar/blob/33c838af4d4a70e40e6782d37579c67e3f338237/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1133-L1138 in handleSubscribe(): if (schema != null && schema.getType() != SchemaType.NONE) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe(option, schema)); } else { return topic.subscribe(option, schema); }

Is this OK?

  1. And: https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1023 https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1209-L1222 Does the other two handleProducer() and handleGetOrCreateSchema() need to be changed?

Denovo1998 avatar Dec 20 '22 16:12 Denovo1998

We only need to upload AUTO_CONSUME and don't change the behavior of BYTES schema(don't unload NONE AND BYTES schema. in broker schema = null represents the BYTES schema). how about this? @Denovo1998 @codelipenghui

congbobo184 avatar Dec 20 '22 16:12 congbobo184

Hi @Denovo1998! I'm glad that you have this idea too, so I'll close my pr.

For your discussion above, my solution is to only count the consumer which specifies a schema:

int numActiveConsumersWithSchema = subscriptions.values().stream()
  .mapToInt(subscription -> (int) subscription.getConsumers().stream()
        // filter out the consumers have specific schema
        .filter((c) -> c.getSchemaType() != null && c.getSchemaType().getValue() > 0).count())
  .sum();

If so, the original schema won’t be changed, and this code can ignore the consumers with null, AUTO_CONSUME, NONE or BYTES schema, because these schemas are all non-positive enums:

https://github.com/apache/pulsar/blob/90f67587e31cc0cfb27773f85f18d607c9c5c324/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java#L140-L175

labuladong avatar Dec 22 '22 13:12 labuladong

But when AUTO_CONSUME consumer subscribes to an empty topic first. The schemaInfo is null. There seems to be no way to fix this bug. https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L270-L280 https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817 @congbobo184 @codelipenghui

Denovo1998 avatar Dec 23 '22 14:12 Denovo1998

we need to send the AUTO_CONSUME type to the broker https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L773-L788

https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817

change AUTO_CONSUME type can be send to broker

congbobo184 avatar Dec 24 '22 09:12 congbobo184

@congbobo184 There is nothing wrong with my test consumer, but will this affect the behavior of Producer?

Denovo1998 avatar Dec 24 '22 15:12 Denovo1998

https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-common/src/main/proto/PulsarApi.proto#L47

we should add the AUTO_CONSUME schema type in this proto, otherwise, it can't upload AUTO_CONSUME to broker

congbobo184 avatar Dec 25 '22 08:12 congbobo184

@congbobo184 Is that all right? Is there anything else need to do?

Denovo1998 avatar Dec 27 '22 13:12 Denovo1998