openstack4j icon indicating copy to clipboard operation
openstack4j copied to clipboard

HttpClient : Empty files created when reAuthenticate occurs for inputstream request

Open naddame opened this issue 6 years ago • 6 comments

On put object, if reAuthenticate occurs, the previously stream is consumed. So after reAuthenticate invokeRequest will write an empty stream => Empty files.

The solution should be: When request contains stream, and AlreadyConsumedInputStream exception should be thrown .

naddame avatar Nov 30 '18 16:11 naddame

Any news on this ? I think I am having this error

RemiBou avatar Jun 04 '19 13:06 RemiBou

Hello, I don't know if with latest release they fixed the problem ! To fix this problem we use this MyAutoCloseInputStream bellow (When the original Inputream was (consumed or closed) the original one will be replaced by an AlreadyConsumedInputStream that throw an exception when retrying to read => then you have to manage the StreamAlreadyConsumedException):

`public class MyAutoCloseInputStream extends ProxyInputStream {

public MyAutoCloseInputStream(final InputStream in) {
    super(in);
}

/**
 * Closes the underlying input stream and replaces the reference to it
 * with a {@link ClosedInputStream} instance.
 * <p>
 * This method is automatically called by the read methods when the end
 * of input has been reached.
 * <p>
 * Note that it is safe to call this method any number of times. The original
 * underlying input stream is closed and discarded only once when this
 * method is first called.
 *
 * @throws IOException if the underlying input stream can not be closed
 */
@Override
public void close() throws IOException {
    in.close();
    in = new AlreadyConsumedInputStream();
}

/**
 * Automatically closes the stream if the end of stream was reached.
 *
 * @param n number of bytes read, or -1 if no more bytes are available
 * @throws IOException if the stream could not be closed
 * @since 2.0
 */
@Override
protected void afterRead(final int n) throws IOException {
    if (n == EOF) {
        close();
    }
}

/**
 * Ensures that the stream is closed before it gets garbage-collected.
 * As mentioned in {@link #close()}, this is a no-op if the stream has
 * already been closed.
 * @throws Throwable if an error occurs
 */
@Override
protected void finalize() throws Throwable {
    close();
    super.finalize();
}

public class AlreadyConsumedInputStream extends InputStream {

    @Override
    public int read() throws IOException {
        throw new StreamAlreadyConsumedException("Already closed !");
    }

    @Override
    public int available() throws IOException {
        throw new StreamAlreadyConsumedException("Already closed !");
    }

    @Override
    public long skip(final long n) throws IOException {
        throw new StreamAlreadyConsumedException("Already closed !");
    }

    @Override
    public void reset() throws IOException {
        throw new StreamAlreadyConsumedException("Already closed !");
    }

    @Override
    public void close() {
        // NOP
    }
}

} `

naddame avatar Jun 04 '19 13:06 naddame

Wow Thanks :) I use the latest version of the mvn package, so I guess it's not fixed. I'll use your solution I guess.

RemiBou avatar Jun 04 '19 13:06 RemiBou

I ended up doing this

 res = getObjectStorageService().put(container, blobPath,
            Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));

        if (res == null) {//we try to send it twice if a reconnect occurs
            res = getObjectStorageService().put(container, blobPath,
                Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));

            if (res == null) {
                throw new UnsupportedOperationException(
                    "Object creation in swift failed for an unknown reason : " + blobPath);
            }
        }

I don't see any better error handling

RemiBou avatar Jun 04 '19 13:06 RemiBou

I'm not sûr "res" can be "null" if StreamAlreadyConsumedException occurs, unless you managed this exception in the method getObjectStorageService().put(...).

In short, the raison why object creation fail is the dynamically trying to renew token by the openstack4j client.

  1. Client call swift with expired token
  2. Swift consume the inputsteam to /dev/null and return Http Status 401
  3. Openstack4j renew the token and resend an already consument stream
  4. Swift write an empty file.

An other way is manually refresh token of OSClient.OSClientV3 before it expires. Take the expire of the current token and check if it will expire in a given time (1 minute for exemple). If so, then renew it before send InputStream. For more details take example from this implementation : https://github.com/ProgrammeVitam/vitam/blob/master_2.6.x/sources/common/common-storage/src/main/java/fr/gouv/vitam/common/storage/swift/SwiftKeystoneFactoryV3.java

When we need OsClient we call Supplier get method. First it check expire time and try to renew token before return the client.

naddame avatar Jun 05 '19 08:06 naddame

I fixed it with your solution. The first problem with my initial thought was that the method would return null if an exception was raised, but I was wrong, a ConnectionException was raised.

So I decided to add the try/recatch like this

 LOGGER.debug("Begin upload " + payload.length + " on " + blobPath);
        String res = null;
        try {
            res = getObjectStorageService().put(container, blobPath,
                Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));
        } catch (ConnectionException e) {
            //we try to send it twice if a reconnect occurs
            LOGGER.error("Error while uploading file", e);
            LOGGER.info("Retry upload " + payload.length + " on " + blobPath);
            res = getObjectStorageService().put(container, blobPath,
                Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));

            if (res == null) {
                throw new UnsupportedOperationException(
                    "Document creation in swift failed for an unknown reason : " + blobPath);
            }

        }

        LOGGER.debug("Done upload " + payload.length + " on " + blobPath);

And handle the reconnection in the method getObjectStorageService

private ObjectStorageObjectService getObjectStorageService() {
        Date nearTime = Date.from(Instant.now().plus(1L, ChronoUnit.HOURS));

        ObjectStorageObjectService objectStorageObjectService = objectService.get();
        if (objectStorageObjectService != null && token.get() != null && token.get().getExpires()
            .after(nearTime)) //we refresh the connection before it expires
        {
            LOGGER.debug("Reusing existing connection");
            return objectStorageObjectService;
        }
        synchronized (this) {//we make sure we reset the connection only once
            if (objectStorageObjectService != null && token.get() != null && token.get().getExpires()
                .after(nearTime)) //we refresh the connection before it expires
            {

                return objectStorageObjectService;
            }
            LOGGER.info("Connection to Swift");
            var os = OSFactory.builderV3().endpoint(endpoint)
                .credentials(userName, password, Identifier.byName(domain)).authenticate();
            os.useRegion(region);
            os.objectStorage().containers().create(container);
            token.set(os.getToken());
            OSClient.OSClientV3 osClientV3 = OSFactory.clientFromToken(token.get());
            osClientV3.useRegion(region);

            objectStorageObjectService = osClientV3.objectStorage().objects();
            objectService.set(objectStorageObjectService);
            LOGGER.info("Connection to Swift done");
            return objectStorageObjectService;
        }
    }

token and objectStorageObjectService are static ThreadLocal, so I don't reuse connection across threads (there is an issue about that).

RemiBou avatar Jun 13 '19 07:06 RemiBou