s3fs-nio icon indicating copy to clipboard operation
s3fs-nio copied to clipboard

Apache MINA integration

Open SergiyBojko opened this issue 1 year ago • 9 comments

Bug Description

I'm trying to expose Minio bucket through Apache MINA SFTP server (2.14.0) using example in the docs but it errors with:

 Not a directory: s3://[email protected]/foo/
	at org.apache.sshd.common.util.io.IoUtils.ensureDirectory(IoUtils.java:483) 

using following file system factory config:

		FileSystem fileSystem = FileSystems.newFileSystem(URI.create("s3://play.min.io/"),
				env, 
				Thread.currentThread()
						.getContextClassLoader());
		Path bucketPath = fileSystem.getPath("/foo/"); // "foo", "foo/" and "/foo" does't work either
		return new VirtualFileSystemFactory(bucketPath);

Data from the file actually can be read by:

Files.readAllLines(fileSystem.getPath("/foo/bar")) 

I managed to bypass this by implementing custom FileSystemFactory, but after that MINA still fails because S3FileSystemProvider does not have newFileChannel method implemented.

This really doesn't seem right, am I doing something wrong?

Proposed Solution

Known Workarounds

Useful Links

Task Relationships

This bug:

  • Is caused by:
  • Relates to:
    • #856
  • Depends on:

SergiyBojko avatar Oct 16 '24 12:10 SergiyBojko

Hi @SergiyBojko ,

Would it be possible for you to provide a docker-compose script (or a test with TestContainers) with MinIO and/or a JUnit test case that we can try and reproduce this with?

carlspring avatar Oct 17 '24 13:10 carlspring

Hi @carlspring Here is example project example.zip

You can run it with

./gradlew run

After server has started you should be able to run:

scp -P 2222 user@localhost:/bar abc.txt
passsword: p

this will try to copy file https://play.min.io/foo/bar and result in error:

14:17:50.617 [sshd-SftpSubsystem-64455-thread-1] DEBUG o.a.sshd.sftp.server.SftpSubsystem -- doSendStatus[ServerSessionImpl[user@/[0:0:0:0:0:0:0:1]:53170]][id=4,cmd=3] exception
java.lang.UnsupportedOperationException: null
        at java.base/java.nio.file.spi.FileSystemProvider.newFileChannel(FileSystemProvider.java:535)
        at org.apache.sshd.common.file.root.RootedFileSystemProvider.newFileChannel(RootedFileSystemProvider.java:193)
        at java.base/java.nio.channels.FileChannel.open(FileChannel.java:298)
        at org.apache.sshd.sftp.server.SftpFileSystemAccessor.openFile(SftpFileSystemAccessor.java:215)
        at org.apache.sshd.sftp.server.FileHandle.<init>(FileHandle.java:92)
        at org.apache.sshd.sftp.server.SftpSubsystem.doOpen(SftpSubsystem.java:967)
        at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doOpen(AbstractSftpSubsystemHelper.java:496)
        at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doProcess(AbstractSftpSubsystemHelper.java:353)
        at org.apache.sshd.sftp.server.SftpSubsystem.doProcess(SftpSubsystem.java:354)
        at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.process(AbstractSftpSubsystemHelper.java:344)
        at org.apache.sshd.sftp.server.SftpSubsystem.run(SftpSubsystem.java:330)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

SergiyBojko avatar Oct 19 '24 11:10 SergiyBojko

@SergiyBojko We haven't had a chance to look into this yet. However, can you try the code you have with the 1.0.6-SNAPSHOT version which is published in Maven Central Snapshots? There was a bug related to the properties not being passed from the Files.newFileSystem down to the S3FileSystem which could have been related to the problem you are seeing.

steve-todorov avatar Nov 06 '24 21:11 steve-todorov

I'm using the 1.0.6-SNAPSHOT and seeing the same problem when integrating with Apache MINA. My case seems to be the same as the original, except I'm connecting to Digital Ocean Spaces.

My dependencies:

  <dependencies>
    <dependency>
        <groupId>org.apache.sshd</groupId>
        <artifactId>sshd-core</artifactId>
        <version>2.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.sshd</groupId>
        <artifactId>sshd-sftp</artifactId>
        <version>2.10.0</version>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.20.18</version>
    </dependency>
    <dependency>
        <groupId>org.carlspring.cloud.aws</groupId>
        <artifactId>s3fs-nio</artifactId>
        <version>1.0.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.9</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.11</version>
    </dependency>
  </dependencies>

The traceback, which I think is the same as above:

java.lang.UnsupportedOperationException: null
	at java.base/java.nio.file.spi.FileSystemProvider.newFileChannel(FileSystemProvider.java:533)
	at org.apache.sshd.common.file.root.RootedFileSystemProvider.newFileChannel(RootedFileSystemProvider.java:193)
	at java.base/java.nio.channels.FileChannel.open(FileChannel.java:309)
	at org.apache.sshd.sftp.server.SftpFileSystemAccessor.openFile(SftpFileSystemAccessor.java:215)
	at org.apache.sshd.sftp.server.FileHandle.<init>(FileHandle.java:85)
	at org.apache.sshd.sftp.server.SftpSubsystem.doOpen(SftpSubsystem.java:928)
	at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doOpen(AbstractSftpSubsystemHelper.java:496)
	at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doProcess(AbstractSftpSubsystemHelper.java:353)
	at org.apache.sshd.sftp.server.SftpSubsystem.doProcess(SftpSubsystem.java:327)
	at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.process(AbstractSftpSubsystemHelper.java:344)
	at org.apache.sshd.sftp.server.SftpSubsystem.run(SftpSubsystem.java:303)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

I'm not sure if it's related, but it seems like the MINA code is expecting newFileChannel to exist, but s3fs-nio implements newAsynchronousFileChannel instead. I'll keep looking at this, but thought it was worth mentioning that this does seem to be an issue.

zchtodd avatar Dec 10 '24 13:12 zchtodd

I've made some progress on the Apache MINA integration. The main problem is that the SFTP subsystem expects newFileChannel to exist and, by extension, to function synchronously.

To get it working, I wrapped S3FileChannel in a new S3FileChannelSyncWrapper class.

package org.carlspring.cloud.storage.s3fs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class S3FileChannelSyncWrapper extends FileChannel {

    private static final Logger LOGGER = LoggerFactory.getLogger(S3FileChannelSyncWrapper.class);
    private final S3FileChannel asyncFileChannel;
    private long position = 0;

    public S3FileChannelSyncWrapper(S3FileChannel asyncFileChannel) {
        this.asyncFileChannel = asyncFileChannel;
    }

    @Override
    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
        LOGGER.warn("unsupported: read(ByteBuffer[], int, int)");
        throw new UnsupportedOperationException("read(ByteBuffer[], int, int) is not supported.");
    }

    @Override
    public int read(ByteBuffer dst, long position) throws IOException {
        try {
            Future<Integer> future = asyncFileChannel.read(dst, position);
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error while reading", e);
        }
    }

    @Override
    public int read(ByteBuffer dst) throws IOException {
        int bytesRead = read(dst, position);
        if (bytesRead > 0) {
            position += bytesRead;
        }
        return bytesRead;
    }

    @Override
    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
        throw new UnsupportedOperationException("write(ByteBuffer[], int, int) is not supported.");
    }

    @Override
    public int write(ByteBuffer src, long position) throws IOException {
        try {   
            Future<Integer> future = asyncFileChannel.write(src, position);
            int bytesWritten = future.get();
            LOGGER.info("Bytes written: {}", bytesWritten);
            return bytesWritten;
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error while writing", e);
        }   
    }

    @Override
    public int write(ByteBuffer src) throws IOException {
        int bytesWritten = write(src, position);
        if (bytesWritten > 0) {
            position += bytesWritten;
        }
        return bytesWritten;
    }

    @Override
    public long position() {
        return position;
    }

    @Override
    public FileChannel position(long newPosition) {
        this.position = newPosition;
        return this;
    }

    @Override
    public long size() throws IOException {
        return asyncFileChannel.size();
    }

    @Override
    public FileChannel truncate(long size) throws IOException {
        asyncFileChannel.truncate(size);
        return this;
    }

    protected void sync() throws IOException {
        asyncFileChannel.sync();
    }

    @Override
    public void force(boolean metaData) throws IOException {
        asyncFileChannel.force(metaData);
    }

    @Override
    public FileLock lock(long position, long size, boolean shared) throws IOException {
        try {
            Future<FileLock> future = asyncFileChannel.lock(position, size, shared);
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error while locking", e);
        }
    }

    @Override
    public FileLock tryLock(long position, long size, boolean shared) throws IOException {
        return asyncFileChannel.tryLock(position, size, shared);
    }

    @Override
    public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
        throw new UnsupportedOperationException("Memory mapping is not supported.");
    }

    @Override
    public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
        LOGGER.warn("unsupported: transferFrom(ReadableByteChannel, long, long)");
        throw new UnsupportedOperationException("transferFrom is not supported.");
    }

    @Override
    public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
        LOGGER.warn("unsupported: transferTo(long, long, WritableByteChannel)");
        throw new UnsupportedOperationException("transferTo is not supported.");
    }

    @Override
    protected void implCloseChannel() throws IOException {
        asyncFileChannel.close();
    }
}

To put this class into use, add the newFileChannel method to the S3FileSystemProvider class.

    @Override
    public FileChannel newFileChannel(Path path,
                                      Set<? extends OpenOption> options,
                                      FileAttribute<?>... attrs)
                               throws IOException {
        S3Path s3Path = toS3Path(path);

        ExecutorService defaultExecutor = Executors.newFixedThreadPool(4);

        S3FileChannel s3FileChannel = new S3FileChannel(s3Path, options, defaultExecutor, true);
        return new S3FileChannelSyncWrapper(s3FileChannel);
    }

Posting this in case it's helpful to someone, but it is very experimental.

zchtodd avatar Dec 14 '24 13:12 zchtodd

This is an interesting finding!

It looks like this was introduced back in 2020 as part of #99. The newFileChannel method was removed back then and only the async one was left. To me it would make more sense to rename the existing S3FileChannel to S3AsyncFileChannel and add the S3FileChannel, but I guess the original S3FileChannel class name was preserved to avoid backwards compatibility issues. I would need to dig into the code as well, but unfortunately my time is currently limited. Might be able to do this early next year.

Would you be interested into writing a test reproducing your problem with Apache MINA, the solution and submit it as an editable PR?

steve-todorov avatar Dec 14 '24 18:12 steve-todorov

Guys, we could really use the help with MINA. We haven't really worked with it much and we have very limited time. We are keen on keeping this project evolving.

It seems we're getting more and more issues reported with MINA. We're open to contibutors joining and helping us in this regard.

If you can provide a fix, that would be great! If not, if you could provide a test, with TestContainers, or Docker, that would be awesome!

carlspring avatar Dec 14 '24 18:12 carlspring

I'm continuing to test the MINA integration. I'm having some trouble with reads returning stale data, but that problem disappears if I disable the cache. Obviously the problem there is that it becomes very slow, but seems to work pretty reliably. I'm going to try and find out what's happening there.

It may take a while, but I'll try to open a PR. Hopefully I don't end up just making it work for my use case while breaking others.

zchtodd avatar Dec 15 '24 13:12 zchtodd

Either way, please report your findings even if you are not able to prepare a complete fix. Thank you! :)

carlspring avatar Dec 15 '24 13:12 carlspring