alpakka
alpakka copied to clipboard
CsvFormatting.format() crashes
Versions used
Alpakka version 3.0.2 Akka version: 2.12-2.4.20
Expected Behavior
Alpakka formats CSV or issues an error.
Actual Behavior
Alpakka silently crashes.
Relevant logs
2021-08-11 11:24:59.973 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:367] received command: export terminals 2021-08-11 11:25:00.048 DEBUG [Thread-18] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:407] Success - exporting keys ByteString(-17, -69, -65) 2021-08-11 11:25:00.054 DEBUG [Thread-18] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:409] Success - exported keys 2021-08-11 11:25:00.072 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:412] Found collection 2021-08-11 11:25:00.073 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:417] Mapped to values 2021-08-11 11:25:00.073 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:422] Mapped to String List 2021-08-11 11:25:02.224 DEBUG [SpringContextShutdownHook] c.p.r.s.ClientConnectionsService [ClientConnectionsService.java:85] removed client connection: com.portalify.radioaudit.auth.UserAuthentication@e8496900: Principal: admin; Credentials: [PROTECTED]; Authenticated: true; Details: [method, GET, request, https://10.0.112.139:8498/websocket, remote-address, 10.0.112.223]; Not granted any authorities StandardWebSocketSession[id=d4dbaf64-4026-c138-926f-51cde78a1d16, uri=wss://10.0.112.139:8498/websocket?token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyODc1NjY5NSwiaWF0IjoxNjI4NjcwMjk1fQ.sG3b1Ci3gPeOyp9aEk1jNpFVqAMs6JO3RQVfelnK3L_6jC4zqCw6jBgPp0hZ0Hv6UwSoa4hcJSrdVE-0LJ20kQ] 2021-08-11 11:25:02.224 DEBUG [SpringContextShutdownHook] c.p.r.s.ClientConnectionsService [ClientConnectionsService.java:86] total connections (remove): 0, for token: 0, tokens: 0 2021-08-11 11:25:02.226 INFO [LocateClientServerActorSystem-akka.actor.default-dispatcher-3] c.p.r.a.WebsocketSubscriptionActor [WebsocketSubscriptionActor.java:52] protocol=websocket event=unsubscribe remote_address=/10.0.112.223:62096 id=d4dbaf64-4026-c138-926f-51cde78a1d16 2021-08-11 11:25:02.226 INFO [LocateClientServerActorSystem-akka.actor.default-dispatcher-3] c.p.r.a.WebsocketSubscriptionActor [WebsocketSubscriptionActor.java:83] protocol=websocket event=broadcast 2021-08-11 11:25:02.240 WARN [https-jsse-nio-8498-exec-2] c.p.r.c.JSONExceptionHandler [JSONExceptionHandler.java:43] Unhandled exception java.lang.InterruptedException: null
Reproducible Test Case
final Source<Document, NotUsed> source =
MongoSource.create(collection.find().sort(ascending("issi")))
.map(MongoDbUtils::fixId);
try {
source
.map(x -> {
log.debug("Found source");
return x;
})
.take(1)
.map(x -> {
log.debug("take 1");
return x;
})
.map(Document::keySet)
.map(x -> {
log.debug("keySet");
return x;
})
.via(CsvFormatting.format(
',',
'"',
'\\',
"\r\n",
CsvQuotingStyle.REQUIRED,
StandardCharsets.UTF_8,
Optional.of(ByteOrderMark.UTF_8)))
.runWith(Sink.head(), materializer)
.whenComplete((result, e) -> {
if (e == null) {
log.debug("Success - exporting keys {}", result);
exporter.export(result.utf8String());
log.debug("Success - exported keys");
source
.map(x -> {
log.debug("Found collection");
return x;
})
.map(Document::values)
.map(x -> {
log.debug("Mapped to values");
return x;
})
.map(values -> values.stream().map(Object::toString).collect(Collectors.toList()))
.map(x -> {
log.debug("Mapped to String List");
return x;
})
.via(CsvFormatting.format())
.map(s -> {
log.debug("Formatted to CSV");
return s;
})
.runWith(Sink.head(), materializer)
.whenComplete((values, error) -> {
if (error == null) {
log.debug("Success - exporting values {}", values);
exporter.export(values.utf8String());
exporter.endData();
log.debug("Success - ended data");
} else {
log.error("Failed to export data", error);
}
});
} else {
log.error("Failed to export data", e);
}
});
} catch (final Throwable t) {
log.error("Export error", t);
}
Did you identify what causes this error?
Did you identify what causes this error?
No, I did not. Apache CSVPrinter produced the same crash, so it might not be specific to Alpakka.
Ok, I suggest this has nothing to do with the CSV. It looks like a problem in JSON handling/Mongo:
c.p.r.c.JSONExceptionHandler [JSONExceptionHandler.java:43]
Unhandled exception
java.lang.InterruptedException: null
Also lineScanner crashed. openjdk version "1.8.0_292" OpenJDK Runtime Environment (build 1.8.0_292-b10) OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
If you can provide a test case without involving anything but Alpakka CSV there might be a chance to trace it down.