alluxio icon indicating copy to clipboard operation
alluxio copied to clipboard

flink integration with alluxio meets exception like 'Recoverable writers onHadoop are only supported for HDFS'

Open xiaohu-liu opened this issue 1 year ago • 3 comments

Alluxio Version: alluxio-2.9.3

Describe the bug

Flink integrates the alluxio client. When trying to use flink's StreamingFileSink to write data to the alluxio path, the following exception is thrown: Caused by: java.lang.UnsupportedOperationException: Recoverable writers onHadoop are only supported for HDFS

To Reproduce code like that as below:

StreamingFileSink<VehSignalInfoS> sink = StreamingFileSink
.forBulkFormat(new Path("alluxio://ebj@public/datalake/ods/tbl1"), writerFactory)
.withBucketAssigner(new SignalsCustomBucketAssigner())
.withRollingPolicy(onCheckpointRollingPolicy)
.withOutputFileConfig(OutputFileConfig.builder()
.withPartPrefix("prefix_")
.withPartSuffix(".orc.snappy") // 使用Snappy压缩
.build()).withBucketCheckInterval(Integer.valueOf(args[6])).build();

Expected behavior Compatible with alluxio filesystem Protocol

Urgency flink cannot write data to alluxio file system

Are you planning to fix it commits in on the way

Additional context none

xiaohu-liu avatar Nov 27 '23 02:11 xiaohu-liu

org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java as we can see, the code in flink likes what as below:

public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
this.fs = checkNotNull(fs);

    // This writer is only supported on a subset of file systems
    if (!("hdfs".equalsIgnoreCase(fs.getScheme())
            || "viewfs".equalsIgnoreCase(fs.getScheme()))) {
        throw new UnsupportedOperationException(
                "Recoverable writers on Hadoop are only supported for HDFS");
    }

    // Part of functionality depends on specific versions. We check these schemes and versions
    // eagerly for
    // better error messages.
    if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
        LOG.warn(
                "WARNING: You are running on hadoop version "
                        + VersionInfo.getVersion()
                        + "."
                        + " If your RollingPolicy does not roll on every checkpoint/savepoint, the StreamingFileSink will throw an exception upon recovery.");
    }
}

the logic just verify and support hdfs and viewfs scheme, any other filesystem scheme is just not supported

xiaohu-liu avatar Nov 27 '23 02:11 xiaohu-liu

It doesn't seem to be a bug in Alluxio, but in order to be compatible with Flink, Alluxio needs to make some internal adaptations.

xiaohu-liu avatar Nov 27 '23 02:11 xiaohu-liu

In the alluxio namespace, alluxio can enable support as long as the ufs file system is compatible with the hdfs protocol.

xiaohu-liu avatar Nov 27 '23 02:11 xiaohu-liu