flink
flink copied to clipboard
[FLINK-35039][rest] Use PUT method supported by YARN web proxy instead of POST
To repair the task submitted in the Yarn mode, you can use the Profiler on the webui
CI report:
- 5179e513e55a60de15d50b32497442d432cf051b Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
@yeezychao Can your solution also works in standalone environment?
@Myasuka Thx u for your reply. Maybe we can create a new Jira to discuss how to works in standalone env. Can you review this pr first?
@Myasuka PTAL
@Myasuka Thx u for your reply. Maybe we can create a new Jira to discuss how to works in standalone env. Can you review this pr first?
Flink already supports self-profiling in standalone cluster, and we should also support this after your fix merged to avoid regression. That's why I asked how it does with your patch.
@Myasuka Sorry, I still don't understand what you mean, what do I need to do. Because I tested the standalone cluster locally and could not use the async-profiler. Is there any relevant documentation for the self-profiling you mentioned?
@yeezychao You can refer to FLINK-34310 to launch a local standalone Flink cluster (not a YARN cluster) to see whether this could still work.
Hi @Myasuka ,I understand what you mean.I tested.
If use PUT method, Return 404 error.
If use POST method, Return 400 Bad Request.
{ "errors": [ "org.apache.flink.runtime.rest.handler.RestHandlerException: Bad request received. Request did not conform to expected format.\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:154)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:88)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:85)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:50)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot map
nullinto type
int (set DeserializationConfig.DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES to 'false' to allow)\n at [Source: (String)\"{}\"; line: 1, column: 2] (through reference chain: org.apache.flink.runtime.rest.messages.cluster.ProfilingRequestBody[\"duration\"])\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1720)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.NumberDeserializers$PrimitiveOrWrapperDeserializer.getNullValue(NumberDeserializers.java:176)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer.getAbsentValue(JsonDeserializer.java:350)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:203)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:158)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromObjectWith(ValueInstantiator.java:288)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:202)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:519)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4730)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3677)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3645)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:152)\n\t... 45 more\n" ] }
@yeezychao from many tests, we can use this profiling feature in a standalone cluster on local Mac. If the Flink cluster cannot be profiled after merging your PR, I think this introduced such an obvious regression. I will also try to test your PR later.
Hi @yeezychao , sorry for the late response.
It's not enough to just change the request type on the front end, you also need to modify the type of the corresponding REST interface on the JobManager side (e.g., org.apache.flink.runtime.rest.messages.cluster.JobManagerProfilingHeaders# getHttpMethod
).
In addition, in Standalone Cluster, you can validate your changes in Postman by referring to the request shown below. (I think POST 400 because you missed the request parameters, which contain the necessary arguments: profiling type and duration)
Hi @yuchen-ecnu, I agreee with you,so I created issues YARN-11691 to the YARN community.