flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-35039][rest] Use PUT method supported by YARN web proxy instead of POST

Open yeezychao opened this issue 10 months ago • 11 comments

To repair the task submitted in the Yarn mode, you can use the Profiler on the webui

yeezychao avatar Apr 19 '24 09:04 yeezychao

CI report:

  • 5179e513e55a60de15d50b32497442d432cf051b Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Apr 19 '24 09:04 flinkbot

@yeezychao Can your solution also works in standalone environment?

Myasuka avatar Apr 19 '24 10:04 Myasuka

@Myasuka Thx u for your reply. Maybe we can create a new Jira to discuss how to works in standalone env. Can you review this pr first?

yeezychao avatar Apr 22 '24 09:04 yeezychao

@Myasuka PTAL

yeezychao avatar Apr 26 '24 06:04 yeezychao

@Myasuka Thx u for your reply. Maybe we can create a new Jira to discuss how to works in standalone env. Can you review this pr first?

Flink already supports self-profiling in standalone cluster, and we should also support this after your fix merged to avoid regression. That's why I asked how it does with your patch.

Myasuka avatar Apr 28 '24 03:04 Myasuka

@Myasuka Sorry, I still don't understand what you mean, what do I need to do. Because I tested the standalone cluster locally and could not use the async-profiler. Is there any relevant documentation for the self-profiling you mentioned?

yeezychao avatar Apr 28 '24 05:04 yeezychao

@yeezychao You can refer to FLINK-34310 to launch a local standalone Flink cluster (not a YARN cluster) to see whether this could still work.

Myasuka avatar Apr 28 '24 06:04 Myasuka

Hi @Myasuka ,I understand what you mean.I tested. If use PUT method, Return 404 error. If use POST method, Return 400 Bad Request. image image { "errors": [ "org.apache.flink.runtime.rest.handler.RestHandlerException: Bad request received. Request did not conform to expected format.\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:154)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:88)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:85)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:50)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot map nullinto typeint (set DeserializationConfig.DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES to 'false' to allow)\n at [Source: (String)\"{}\"; line: 1, column: 2] (through reference chain: org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody[\"duration\"])\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1720)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.NumberDeserializers$PrimitiveOrWrapperDeserializer.getNullValue(NumberDeserializers.java:176)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer.getAbsentValue(JsonDeserializer.java:350)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:203)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:158)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromObjectWith(ValueInstantiator.java:288)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:202)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:519)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4730)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3677)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3645)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:152)\n\t... 45 more\n" ] }

yeezychao avatar Apr 28 '24 09:04 yeezychao

@yeezychao from many tests, we can use this profiling feature in a standalone cluster on local Mac. If the Flink cluster cannot be profiled after merging your PR, I think this introduced such an obvious regression. I will also try to test your PR later.

Myasuka avatar Apr 29 '24 09:04 Myasuka

Hi @yeezychao , sorry for the late response. It's not enough to just change the request type on the front end, you also need to modify the type of the corresponding REST interface on the JobManager side (e.g., org.apache.flink.runtime.rest.messages.cluster.JobManagerProfilingHeaders# getHttpMethod).

In addition, in Standalone Cluster, you can validate your changes in Postman by referring to the request shown below. (I think POST 400 because you missed the request parameters, which contain the necessary arguments: profiling type and duration)

image

yuchen-ecnu avatar Apr 30 '24 02:04 yuchen-ecnu

Hi @yuchen-ecnu, I agreee with you,so I created issues YARN-11691 to the YARN community.

yeezychao avatar Apr 30 '24 05:04 yeezychao