alluxio
alluxio copied to clipboard
flink integration with alluxio meets exception like 'Recoverable writers onHadoop are only supported for HDFS'
Alluxio Version: alluxio-2.9.3
Describe the bug
Flink integrates the alluxio client. When trying to use flink's StreamingFileSink to write data to the alluxio path, the following exception is thrown: Caused by: java.lang.UnsupportedOperationException: Recoverable writers onHadoop are only supported for HDFS
To Reproduce code like that as below:
StreamingFileSink<VehSignalInfoS> sink = StreamingFileSink
.forBulkFormat(new Path("alluxio://ebj@public/datalake/ods/tbl1"), writerFactory)
.withBucketAssigner(new SignalsCustomBucketAssigner())
.withRollingPolicy(onCheckpointRollingPolicy)
.withOutputFileConfig(OutputFileConfig.builder()
.withPartPrefix("prefix_")
.withPartSuffix(".orc.snappy") // 使用Snappy压缩
.build()).withBucketCheckInterval(Integer.valueOf(args[6])).build();
Expected behavior Compatible with alluxio filesystem Protocol
Urgency flink cannot write data to alluxio file system
Are you planning to fix it commits in on the way
Additional context none
org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
as we can see, the code in flink likes what as below:
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
this.fs = checkNotNull(fs);
// This writer is only supported on a subset of file systems
if (!("hdfs".equalsIgnoreCase(fs.getScheme())
|| "viewfs".equalsIgnoreCase(fs.getScheme()))) {
throw new UnsupportedOperationException(
"Recoverable writers on Hadoop are only supported for HDFS");
}
// Part of functionality depends on specific versions. We check these schemes and versions
// eagerly for
// better error messages.
if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
LOG.warn(
"WARNING: You are running on hadoop version "
+ VersionInfo.getVersion()
+ "."
+ " If your RollingPolicy does not roll on every checkpoint/savepoint, the StreamingFileSink will throw an exception upon recovery.");
}
}
the logic just verify and support hdfs and viewfs scheme, any other filesystem scheme is just not supported
It doesn't seem to be a bug in Alluxio, but in order to be compatible with Flink, Alluxio needs to make some internal adaptations.
In the alluxio namespace, alluxio can enable support as long as the ufs file system is compatible with the hdfs protocol.