openstack4j
openstack4j copied to clipboard
HttpClient : Empty files created when reAuthenticate occurs for inputstream request
On put object, if reAuthenticate occurs, the previously stream is consumed. So after reAuthenticate invokeRequest will write an empty stream => Empty files.
The solution should be: When request contains stream, and AlreadyConsumedInputStream exception should be thrown .
Any news on this ? I think I am having this error
Hello, I don't know if with latest release they fixed the problem ! To fix this problem we use this MyAutoCloseInputStream bellow (When the original Inputream was (consumed or closed) the original one will be replaced by an AlreadyConsumedInputStream that throw an exception when retrying to read => then you have to manage the StreamAlreadyConsumedException):
`public class MyAutoCloseInputStream extends ProxyInputStream {
public MyAutoCloseInputStream(final InputStream in) {
super(in);
}
/**
* Closes the underlying input stream and replaces the reference to it
* with a {@link ClosedInputStream} instance.
* <p>
* This method is automatically called by the read methods when the end
* of input has been reached.
* <p>
* Note that it is safe to call this method any number of times. The original
* underlying input stream is closed and discarded only once when this
* method is first called.
*
* @throws IOException if the underlying input stream can not be closed
*/
@Override
public void close() throws IOException {
in.close();
in = new AlreadyConsumedInputStream();
}
/**
* Automatically closes the stream if the end of stream was reached.
*
* @param n number of bytes read, or -1 if no more bytes are available
* @throws IOException if the stream could not be closed
* @since 2.0
*/
@Override
protected void afterRead(final int n) throws IOException {
if (n == EOF) {
close();
}
}
/**
* Ensures that the stream is closed before it gets garbage-collected.
* As mentioned in {@link #close()}, this is a no-op if the stream has
* already been closed.
* @throws Throwable if an error occurs
*/
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
public class AlreadyConsumedInputStream extends InputStream {
@Override
public int read() throws IOException {
throw new StreamAlreadyConsumedException("Already closed !");
}
@Override
public int available() throws IOException {
throw new StreamAlreadyConsumedException("Already closed !");
}
@Override
public long skip(final long n) throws IOException {
throw new StreamAlreadyConsumedException("Already closed !");
}
@Override
public void reset() throws IOException {
throw new StreamAlreadyConsumedException("Already closed !");
}
@Override
public void close() {
// NOP
}
}
} `
Wow Thanks :) I use the latest version of the mvn package, so I guess it's not fixed. I'll use your solution I guess.
I ended up doing this
res = getObjectStorageService().put(container, blobPath,
Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));
if (res == null) {//we try to send it twice if a reconnect occurs
res = getObjectStorageService().put(container, blobPath,
Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));
if (res == null) {
throw new UnsupportedOperationException(
"Object creation in swift failed for an unknown reason : " + blobPath);
}
}
I don't see any better error handling
I'm not sûr "res" can be "null" if StreamAlreadyConsumedException occurs, unless you managed this exception in the method getObjectStorageService().put(...).
In short, the raison why object creation fail is the dynamically trying to renew token by the openstack4j client.
- Client call swift with expired token
- Swift consume the inputsteam to /dev/null and return Http Status 401
- Openstack4j renew the token and resend an already consument stream
- Swift write an empty file.
An other way is manually refresh token of OSClient.OSClientV3 before it expires. Take the expire of the current token and check if it will expire in a given time (1 minute for exemple). If so, then renew it before send InputStream. For more details take example from this implementation : https://github.com/ProgrammeVitam/vitam/blob/master_2.6.x/sources/common/common-storage/src/main/java/fr/gouv/vitam/common/storage/swift/SwiftKeystoneFactoryV3.java
When we need OsClient we call Supplier get method. First it check expire time and try to renew token before return the client.
I fixed it with your solution. The first problem with my initial thought was that the method would return null if an exception was raised, but I was wrong, a ConnectionException was raised.
So I decided to add the try/recatch like this
LOGGER.debug("Begin upload " + payload.length + " on " + blobPath);
String res = null;
try {
res = getObjectStorageService().put(container, blobPath,
Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));
} catch (ConnectionException e) {
//we try to send it twice if a reconnect occurs
LOGGER.error("Error while uploading file", e);
LOGGER.info("Retry upload " + payload.length + " on " + blobPath);
res = getObjectStorageService().put(container, blobPath,
Payloads.create(new MyAutoCloseInputStream(new ByteArrayInputStream(payload))));
if (res == null) {
throw new UnsupportedOperationException(
"Document creation in swift failed for an unknown reason : " + blobPath);
}
}
LOGGER.debug("Done upload " + payload.length + " on " + blobPath);
And handle the reconnection in the method getObjectStorageService
private ObjectStorageObjectService getObjectStorageService() {
Date nearTime = Date.from(Instant.now().plus(1L, ChronoUnit.HOURS));
ObjectStorageObjectService objectStorageObjectService = objectService.get();
if (objectStorageObjectService != null && token.get() != null && token.get().getExpires()
.after(nearTime)) //we refresh the connection before it expires
{
LOGGER.debug("Reusing existing connection");
return objectStorageObjectService;
}
synchronized (this) {//we make sure we reset the connection only once
if (objectStorageObjectService != null && token.get() != null && token.get().getExpires()
.after(nearTime)) //we refresh the connection before it expires
{
return objectStorageObjectService;
}
LOGGER.info("Connection to Swift");
var os = OSFactory.builderV3().endpoint(endpoint)
.credentials(userName, password, Identifier.byName(domain)).authenticate();
os.useRegion(region);
os.objectStorage().containers().create(container);
token.set(os.getToken());
OSClient.OSClientV3 osClientV3 = OSFactory.clientFromToken(token.get());
osClientV3.useRegion(region);
objectStorageObjectService = osClientV3.objectStorage().objects();
objectService.set(objectStorageObjectService);
LOGGER.info("Connection to Swift done");
return objectStorageObjectService;
}
}
token and objectStorageObjectService are static ThreadLocal, so I don't reuse connection across threads (there is an issue about that).