asgarde
asgarde copied to clipboard
Asgarde allows simplifying error handling with Apache Beam Java, with less code, more concise and expressive code.

Asgarde
This module allows simplifying error handling with Apache Beam Java.
Versions compatibility between Beam and Asgarde
| Asgarde | Beam |
|---|---|
| 0.10.0 | 2.31.0 |
| 0.11.0 | 2.32.0 |
| 0.12.0 | 2.33.0 |
| 0.13.0 | 2.34.0 |
| 0.14.0 | 2.35.0 |
| 0.15.0 | 2.36.0 |
| 0.16.0 | 2.37.0 |
| 0.17.0 | 2.38.0 |
| 0.18.0 | 2.39.0 |
| 0.19.0 | 2.40.0 |
| 0.20.0 | 2.41.0 |
| 0.21.0 | 2.42.0 |
| 0.22.0 | 2.43.0 |
| 0.23.0 | 2.44.0 |
| 0.24.0 | 2.45.0 |
| 0.25.0 | 2.46.0 |
| 0.26.0 | 2.47.0 |
| 0.27.0 | 2.48.0 |
| 0.28.0 | 2.49.0 |
| 0.29.0 | 2.50.0 |
| 0.30.0 | 2.51.0 |
| 0.31.0 | 2.52.0 |
| 0.32.0 | 2.53.0 |
| 0.33.0 | 2.54.0 |
Installation of project
The project is hosted on Maven repository.
You can install it with all the build tools compatibles with Maven.
Example with Maven and Gradle :
Maven
<dependency>
<groupId>fr.groupbees</groupId>
<artifactId>asgarde</artifactId>
<version>0.33.0</version>
</dependency>
Gradle
implementation group: 'fr.groupbees', name: 'asgarde', version: '0.33.0'
Error logic with Beam ParDo and DoFn
Beam recommends treating errors with Dead letters.
It means catching errors in the flow and, using side outputs, sinking errors to a file, database or any other output...
Beam suggests handling side outputs with TupleTags in a DoFn class, example :
// Failure object.
public class Failure implements Serializable {
private final String pipelineStep;
private final Integer inputElement;
private final Throwable exception;
public static <T> Failure from(final String pipelineStep,
final T element,
final Throwable exception) {
return new Failure(pipelineStep, element.toString(), exception);
}
}
// Word count DoFn class.
public class WordCountFn extends DoFn<String, Integer> {
private final TupleTag<Integer> outputTag = new TupleTag<Integer>() {};
private final TupleTag<Failure> failuresTag = new TupleTag<Failure>() {};
@ProcessElement
public void processElement(ProcessContext ctx) {
try {
// Could throw ArithmeticException.
final String word = ctx.element();
ctx.output(1 / word.length());
} catch (Throwable throwable) {
final Failure failure = Failure.from("step", ctx.element(), throwable);
ctx.output(failuresTag, failure);
}
}
public TupleTag<Integer> getOutputTag() {
return outputTag;
}
public TupleTag<Failure> getFailuresTag() {
return failuresTag;
}
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....
final WordCountFn wordCountFn = new WordCountFn();
final PCollectionTuple tuple = wordPCollection
.apply("ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));
// Output PCollection via outputTag.
PCollection<Integer> outputCollection = tuple.get(wordCountFn.getOutputTag());
// Failures PCollection via failuresTag.
PCollection<Failure> failuresCollection = tuple.get(wordCountFn.getFailuresTag());
With this approach we can, in all steps, get the output and failures result PCollections.
Error logic with Beam MapElements and FlatMapElements
Beam also allows handling errors with built-in components like MapElements and FlatMapElements (it's currently an experimental feature as of april of 2020).
Behind the scene, in these classes Beam use the same concept explained above.
Example:
public class Failure implements Serializable {
private final String pipelineStep;
private final String inputElement;
private final Throwable exception;
public static <T> Failure from(final String pipelineStep,
final WithFailures.ExceptionElement<T> exceptionElement) {
final T inputElement = exceptionElement.element();
return new Failure(pipelineStep, inputElement.toString(), exceptionElement.exception());
}
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....
WithFailures.Result<PCollection<Integer>, Failure> result = wordPCollection
.apply("Map", MapElements
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length()) // Could throw ArithmeticException
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt))
);
PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();
The logic is the same for FlatMapElements :
final PCollection<String> wordPCollection....
WithFailures.Result<PCollection<String>, Failure>> result = wordPCollection
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt))
)
PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();
Comparison between approaches
Usual Beam pipeline
In a usual Beam pipeline flow, steps are chained fluently:
final PCollection<Integer> outputPCollection = inputPCollection
.apply("Map", MapElements .into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("Map ParDo", ParDo.of(new WordCountFn()));
Usual Beam pipeline with error handling
Here's the same flow with error handling in each step:
WithFailures.Result<PCollection<String>, Failure> result1 = input
.apply("Map", MapElements
.into(TypeDescriptors.strings())
.via((String word) -> word + "Test")
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)));
final PCollection<String> output1 = result1.output();
final PCollection<Failure> failure1 = result1.failures();
WithFailures.Result<PCollection<String>, Failure> result2 = output1
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)));
final PCollection<String> output2 = result1.output();
final PCollection<Failure> failure2 = result1.failures();
final PCollectionTuple result3 = output2
.apply("Map ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));
final PCollection<Integer> output3 = result3.get(wordCountFn.getOutputTag());
final PCollection<Failure> failure3 = result3.get(wordCountFn.getFailuresTag());
final PCollection<Failure> allFailures = PCollectionList
.of(failure1)
.and(failure2)
.and(failure3)
.apply(Flatten.pCollections());
Problems with this approach:
- We loose the native fluent style on apply chains, because we have to handle output and error for each step.
- For
MapElementsandFlatMapElementswe have to always addexceptionsIntoandexceptionsVia(can be centralized). - For each custom DoFn, we have to duplicate the code of
TupleTaglogic and the try catch block (can be centralized). - The code is verbose.
- There is no centralized code to concat all the errors, we have to concat all failures (can be centralized).
Usual Beam pipeline with error handling using Asgarde
Here's the same flow with error handling, but using this library instead:
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
.getResult();
Some explanations:
- The
CollectionComposerclass allows to centralize all the error logic, fluently compose the applies and concat all the failures occurring in the flow. - For
MapElementsandFlatMapElements, behind the scene, theapplymethod addsexceptionsIntoandexceptionsViaonFailureobject. We can also explicitely useexceptionsIntoandexceptionsViaif needed, if you have some custom logic based onFailureobject. - The
MapElementFnclass is a customDoFnclass internally wraps the shared logic forDoFnlike try/catch block and Tuple tags. We will detail concepts in the next sections.
Purpose of the library
- Wrap all error handling logic in a composer class.
- Wrap
exceptionsIntoandexceptionsViausage in the native Beam classesMapElementsandFlatMapElements. - Keep the fluent style natively proposed by Beam in
applymethods while checking for failures and offer a less verbose way of handling errors. - Expose custom
DoFnclasses with centralized try/catch blocks (loan pattern) and Tuple tags. - Expose an easier access to the
@Setup,@StartBundle,@FinishBundle,@Teardownsteps ofDoFnclasses. - Expose a way to handle errors in filtering logic (currently not available with Beam's
Filter.by).
Some resources for Loan pattern :
https://dzone.com/articles/functional-programming-patterns-with-java-8
https://blog.knoldus.com/scalaknol-understanding-loan-pattern/
Custom DoFn classes
MapElementFn
This class is the equivalent of a Beam MapElements.
It must be created by the outputTypeDescriptor and takes a SerializableFunction on generic input and output (input and output types of the mapper).
This SerializableFunction will be invoked lazily in the @ProcessElement method and lifecycle of the DoFn.
We can also give to this class actions related to DoFn lifecycle :
setupActionexecuted in the@SetupmethodstartBundleActionexecuted in the@StartBundlemethodfinishBundleActionexecuted in the@FinishBundlemethodteardownActionexecuted in the@Teardownmethod
This action is represented by a SerializableAction:
@FunctionalInterface
public interface SerializableAction extends Serializable {
void execute();
}
The SerializableAction is like a java.lang.Runnable that has to implement Serializable.
When writing a DoFn, Beam can infer the output type from DoFn<Input, Output> and deduce the output type descriptor from it.
A default Coder can be added for this descriptor.
When we write a generic DoFn, Beam is unable to infer the output type and create the output descriptor,
that's why in our custom DoFn classes we have to give the output descriptor in the into method.
Usage example:
final PCollection<Integer> outputMapElementFn = CollectionComposer.of(inputPCollection)
.apply("PaDo", MapElementFn
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.withSetupAction(() -> LOGGER.info("Start word count action in the worker"))
.withTeardownAction(() -> LOGGER.info("End word count action in the worker")))
.getResult()
.output();
Behind the scene the CollectionComposer class adds a ParDo on this DoFn and handles errors with tuple tags.
MapProcessElementFn
This class works exactly as MapElementFn, but gives access to Beam's ProcessContext object.
It must be created from the input type of DoFn, which allows giving more information on the input,
because the SerializableFunction is from the ProcessContext and not from the Input (doesn't bring the input type):
SerializableFunction<ProcessContext, Output>
This class can take a DoFn lifecycle methods as the MapElementFn :
setupActionstartBundleActionfinishBundleActionteardownAction
and expects an output descriptor too.
Usage example:
final PCollection<Integer> resMapProcessElementFn = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptors.integers())
.via(ctx -> 1 / ctx.element().length())
.withSetupAction(() -> LOGGER.info("Setup word count action in the worker"))
.withStartBundleAction(() -> LOGGER.info("Start bundle word count action in the worker"))
.withFinishBundleAction(() -> LOGGER.info("Finish bundle word count action in the worker"))
.withTeardownAction(() -> LOGGER.info("End word count action in the worker"))))
.getResult()
.output();
Sometimes we need access to Beam's ProcessContext to get technical fields or handle side inputs.
For MapElementFn and MapProcessContextFn, we can give side inputs to the CollectionComposer.
Here's an example using side inputs:
// Simulates a side input, from Beam pipeline.
final String wordDescription = "Word to describe Football teams";
final PCollectionView<String> sideInputs = pipeline
.apply("Create word description side input", Create.of(wordDescription))
.apply("Create as collection view", View.asSingleton());
final PCollection<WordStats> resMapProcessElementFn = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptor.of(WordStats.class))
.via(ctx -> toWordStats(sideInputs, ctx))
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")),
Collections.singleton(sideInputs))
.getResult()
.output();
private WordStats toWordStats(final PCollectionView<String> sideInputs, final DoFn<String, WordStats>.ProcessContext context) {
final String word = context.element();
final Integer dividedWordCount = 1 / word.length();
// Gets the current timestamp in context.
final Instant timestamp = context.timestamp();
// Gets the side input value from PCollectionView and context.
final String wordDescription = context.sideInput(sideInputs);
return new WordStats(word, dividedWordCount, timestamp, wordDescription);
}
private static class WordStats implements Serializable {
private final String word;
private final Integer dividedWordCount;
private final Instant timestamp;
private final String wordDescription;
public WordStats(String word, Integer dividedWordCount, Instant timestamp, String wordDescription) {
this.word = word;
this.dividedWordCount = dividedWordCount;
this.timestamp = timestamp;
this.wordDescription = wordDescription;
}
}
Behind the scene the CollectionComposer class adds the ParDo on this DoFn and handles errors with tuple tags.
FlatMapElementFn
Same principle as MapElementFn but for flatMap operator.
PCollection<Player> players =
FlatMapElementFn.into(TypeDescriptor.of(Player.class))
.via(team -> team.getPlayers())
.withSetupAction(() -> System.out.println("Starting of mapping...")
.withStartBundleAction(() -> System.out.println("Starting bundle of mapping...")
.withFinishBundleAction(() -> System.out.println("Ending bundle of mapping...")
.withTeardownAction(() -> System.out.println("Ending of mapping...")
FlatMapProcessContextFn
Same principle as MapProcessContextFn but for flatMap operator.
PCollection<Player> players =
FlatMapProcessContextFn.from(Team.class)
.into(TypeDescriptor.of(Player.class))
.via((ProcessContext ctx) -> ctx.element().getPlayers())
.withSetupAction(() -> System.out.println("Starting of mapping...")
.withStartBundleAction(() -> System.out.println("Starting bundle of mapping...")
.withFinishBundleAction(() -> System.out.println("Ending bundle of mapping...")
.withTeardownAction(() -> System.out.println("Ending of mapping...")
FilterFn
This class is like the Filter class exposed by Beam but with built-in error handling.
It's a generic DoFn implementation just like MapElementFn and MapProcessElementFn.
Usage example:
final PCollection<String> resFilterFn = CollectionComposer.of(wordCollection)
.apply("FilterFn", FilterFn.by(word -> word.length() > 3))
.getResult()
.output();
It takes a predicate via:
final SerializableFunction<InputT, Boolean> predicate
Behind the scene, the CollectionComposer takes the output from the previous PCollection.
No need to pass any output descriptor in this case, because it's only a filtering operation on the same type.
Collection composer class
Centralizes error handling in one place.
Create an instance from a PCollection.
CollectionComposer.of(wordPCollection)
- It accepts native
MapElementsandFlatMapElementswithout addingexceptionsTypeandexceptionsVia:
final PCollection<Integer> resMapElements = CollectionComposer.of(input)
.apply("ParDo", MapElements.into(TypeDescriptors.integers()).via((String word) -> 1 / word.length()))
.getResult()
.output();
final PCollection<String> resFlatMapElements = CollectionComposer.of(input)
.apply("FlatMapElements", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.getResult()
.output();
- It accepts native
MapElementsandFlatMapElementswithexceptionsTypeandexceptionsVia(if externalexceptionsTypeandexceptionsViaare given, they have to be based on theFailureobject exposed by the library):
final PCollection<Integer> resMapElements2 = CollectionComposer.of(input)
.apply("MapElements", MapElements
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)))
.getResult()
.output();
final PCollection<String> resFlatMapElements2 = CollectionComposer.of(input)
.apply("FlatMapElements", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)))
.getResult()
.output();
The Failure object exposed by the library, contains the inputElement of the step in a String format (toString method), and the occurred exception.
For the input element used in transforms, developers can override toString method and implement the string representation of object.
A possible approach is to serialize the object to Json string via a framework like Jackson for better readability.
Beam already uses Jackson as an internal dependency.
Example with a Team class used in a Beam Transform :
public static class Team implements Serializable {
...
@Override
public String toString() {
return JsonUtil.serialize(this);
}
}
public static <T> String serialize(final T obj) {
try {
return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new IllegalStateException("The serialization of object fails : " + obj);
}
}
In this case, we use Jackson to serialize the current object to Json String, in the toString method of the Team object.
The Failure class exposes factory methods to build from inputs:
public class Failure implements Serializable {
private final String pipelineStep;
private final String inputElement;
private final Throwable exception;
private Failure(String pipelineStep,
String inputElement,
Throwable exception) {
this.pipelineStep = pipelineStep;
this.inputElement = inputElement;
this.exception = exception;
}
public static <T> Failure from(final String pipelineStep,
final WithFailures.ExceptionElement<T> exceptionElement) {
final T inputElement = exceptionElement.element();
return new Failure(pipelineStep, inputElement.toString(), exceptionElement.exception());
}
public static <T> Failure from(final String pipelineStep,
final T element,
final Throwable exception) {
return new Failure(pipelineStep, element.toString(), exception);
}
}
- It accepts
MapElementFn,MapProcessElementFn,MapElementFn,MapProcessElementFn,FilterFnand customDoFns:
// Map element Fn.
final PCollection<Integer> resMapElementFn = CollectionComposer.of(input)
.apply("PaDo", MapElementFn
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")))
.getResult()
.output();
// Map process context Fn without side inputs.
final PCollection<Integer> resMapProcessElementFn = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptors.integers())
.via(ctx -> 1 / ctx.element().length())
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")))
.getResult()
.output();
// Map process context Fn with side inputs.
final String wordDescription = "Word to describe Football teams";
final PCollectionView<String> sideInputs = input.getPipeline()
.apply("String side input", Create.of(wordDescription))
.apply("Create as collection view", View.asSingleton());
final PCollection<WordStats> resMapProcessElementFn2 = CollectionComposer.of(input)
.apply("PaDo", MapProcessContextFn
.from(String.class)
.into(TypeDescriptor.of(WordStats.class))
.via(ctx -> toWordStats(sideInputs, ctx))
.withSetupAction(() -> LOGGER.info("Start word count action in the worker")),
Collections.singleton(sideInputs))
.getResult()
.output();
// Filter Fn.
final PCollection<String> resFilterFn = CollectionComposer.of(input)
.apply("FilterFn", FilterFn.by(word -> word.length() > 3))
.getResult()
.output();
- Custom non-generic
DoFnclasses are also supported, they must extend theBaseElementFn(it initializes the tuple tags logic) and the type descriptors will be deduced from non-generic types:
import fr.groupbees.asgarde.Failure;
final PCollection<WordStats> resCustomDoFn=CollectionComposer.of(input)
.apply("PaDo",new WordStatsFn(sideInputs),Collections.singleton(sideInputs))
.getResult()
.output();
// Custom DoFn class.
public class WordStatsFn extends BaseElementFn<String, WordStats> {
private PCollectionView<String> sideInputs;
public WordStatsFn(final PCollectionView<String> sideInputs) {
// Do not forget to call this!
super();
this.sideInputs = sideInputs
}
@ProcessElement
public void processElement(ProcessContext ctx) {
try {
ctx.output(toWordStats(sideInputs, ctx));
} catch (Throwable throwable) {
final Failure failure = Failure.from("step", ctx.element(), throwable);
ctx.output(failuresTag, failure);
}
}
}
- It keeps the
applymethods' fluent style and internally concats all the occurring failures.
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
.getResult();
final PCollection<String> output = resultComposer.output();
final PCollection<Failure> failures = resultComposer.failures();
Coder in CollectionComposer class
Coder for output PCollection
A coder can be set for the current output PCollection in the flow, example :
// Generate and instantiate an Avro specific object.
final AvroTest inputAvroObject = AvroTest.newBuilder()
.setId(34444)
.setName("test")
.build();
// Read an Avro file with a type corresponding to the previous specific object.
final PCollection<AvroTest> avroObjectsCollection = pipeline.apply("Reads Avro objects", AvroIO
.read(AvroTest.class)
.from("filePath"));
private GenericRecord avroObjectToGenericRecord(final AvroTest avroTest) {
GenericRecord record = new GenericData.Record(avroTest.getSchema());
record.put("id", avroTest.getId());
record.put("name", avroTest.getName());
return record;
}
// Flow with CollectionComposer.
final Result<PCollection<GenericRecord>, Failure> result2 = CollectionComposer.of(avroObjectsCollection)
.apply("Map", MapElements.into(of(GenericRecord.class)).via(this::avroObjectToGenericRecord))
.setCoder(AvroCoder.of(GenericRecord.class, inputAvroObject.getSchema()))
.getResult();
In this example, an Avro file is read and mapped to a typed and specific object AvroTest.
Then we simulate a transformation from this AvroTest instance to a GenericRecord object.
The GenericRecord doesn't contain any information about Serialization and in this case Beam can't infer
a default Coder.
We have to set a Coder for the output PCollection of GenericRecord :
.setCoder(AvroCoder.of(GenericRecord.class, inputAvroObject.getSchema()))
Asgarde and the CollectionComposer class work as the usual PCollection for coders and
propose the same setCoder method for good outputs.
Asgarde with Kotlin
Apache Beam Java can be used with Kotlin, and it's make the experience more enjoyable.
Asgarde proposes also Kotlin extensions to use CollectionComposer class with more expressive/concise code and with
functional programming style.
:warning: Kotlin Asgarde is proposed from 0.15.0 and Beam 2.36.0 versions :warning: The current Kotlin version used with Asgarde is : 1.9.22
Let's take a previous example of Asgarde Java pipeline with error handling :
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word)-> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1/word.length()))
.getResult();
In Asgarde Kotlin the same pipeline is :
import fr.groupbees.asgarde.*
val result: Result<PCollection<Int>, Failure> = CollectionComposer.of(words)
.map("Map") { word -> word + "Test" }
.flatMap("FlatMap") { Arrays.asList(*Arrays.copyOfRange(it.split(" ").toTypedArray(), 1, 5)) }
.mapFn("ParDo", { word -> 1 / word.length })
.result
The Kotlin code of Asgarde uses extensions.
To use these extensions, the following import must be added :
import fr.groupbees.asgarde.*
This feature is great because we can mix native Asgarde code with
functions dedicated to Kotlin, example :
val result: Result<PCollection<Int>, Failure> = CollectionComposer.of(words)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via(SerializableFunction { String word -> word + "Test" })
.flatMap("FlatMap") { Arrays.asList(*Arrays.copyOfRange(it.split(" ").toTypedArray(), 1, 5)) }
.mapFn("ParDo", { word -> 1 / word.length })
.result
In this case, the first map function has been replaced by native apply function with MapElements
The type of lambda expression in via function needs to be specified in Kotlin, because this can take ProcessFunction
or SerializableFunction.
This causes an ambiguity (SerializableFunction is used in this example).
In the following sections, all Asgarde native components and their equivalents in Kotlin are proposed.
Extension for MapElements
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", MapElements.into(TypeDescriptor.of(OtherTeam.class)).via(team -> TestSettings.toOtherTeam(team)))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapElements.into(TypeDescriptor.of(OtherTeam.class)).via(TestSettings::toOtherTeam))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teams)
.map("Step name") { team -> TestSettings.toOtherTeam(team) }
.result
With Kotlin it
CollectionComposer.of(teams)
.map("Step name") { TestSettings.toOtherTeam(it) }
.result
The Kotlin version takes only a lambda type in the inline function,
Kotlin recommends writing the lambda in a separated parenthesis.
Extension for FlatMapElements
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements.into(of(Player.class)).via(team -> team.getPlayers()))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements.into(of(Player.class)).via(Team::getPlayers))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teams)
.flatMap("Step name") { team -> team.players }
.result
With Kotlin it
CollectionComposer.of(teams)
.flatMap("Step name") { it.players }
.result
Extension for MapElement with failure
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", MapElements
.into(of(Team.class))
.via(team -> toTeamWithPsgError(team))
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapElements
.into(of(Team.class))
.via(this::toTeamWithPsgError)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teamCollection)
.mapWithFailure(
"Step name",
{ team -> toTeamWithPsgError(team) },
{ exElt -> Failure.from("Step name", exElt) }
)
With Kotlin it
CollectionComposer.of(teamCollection)
.mapWithFailure(
"Step name",
{ toTeamWithPsgError(it) },
{ Failure.from("Step name", it) }
)
Extension for FlatMapElement with failure
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements
.into(of(Player.class))
.via(team -> simulateFlatMapErrorPsgTeam(team))
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapElements
.into(of(Player.class))
.via(this::simulateFlatMapErrorPsgTeam)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Step name", exElt)))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teamCollection)
.flatMapWithFailure(
"Step name",
{ team -> simulateFlatMapErrorPsgTeam(team) },
{ exElt -> Failure.from("Step name", exElt) }
)
.result
With Kotlin it
CollectionComposer.of(teamCollection)
.flatMapWithFailure(
"Step name",
{ simulateFlatMapErrorPsgTeam(it) },
{ Failure.from("Step name", it) }
)
.result
Extension for MapElementFn
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", MapElementFn
.into(of(OtherTeam.class))
.via(team -> TestSettings.toOtherTeam(team))
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapElementFn
.into(of(OtherTeam.class))
.via(TestSettings::toOtherTeam)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teams)
.mapFn("Step name",
{ team -> TestSettings.toOtherTeam(team) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin it
CollectionComposer.of(teams)
.mapFn("Step name",
{ TestSettings.toOtherTeam(it) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Extension for FlatMapElementFn
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", FlatMapElementFn
.into(of(Player.class))
.via(team -> team.getPlayers())
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapElementFn
.into(of(Player.class))
.via(Team::getPlayers)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teams)
.flatMapFn("Step name",
{ team -> team.players },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin it
CollectionComposer.of(teams)
.flatMapFn("Step name",
{ it.players },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Extension for MapProcessContextFn
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", MapProcessContextFn
.from(Team.class)
.into(of(OtherTeam.class))
.via(ctx -> TestSettings.toOtherTeam(ctx))
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", MapProcessContextFn
.from(Team.class)
.into(of(OtherTeam.class))
.via(TestSettings::toOtherTeam)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teams)
.mapFnWithContext("Step name",
{ ctx: DoFn<Team, OtherTeam>.ProcessContext -> TestSettings.toOtherTeam(ctx) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin it
CollectionComposer.of(teams)
.mapFnWithContext<Team, OtherTeam>("Step name",
{ TestSettings.toOtherTeam(it) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Extension for FlatMapProcessContextFn
Asgarde Java :
CollectionComposer.of(teams)
.apply("Step name", FlatMapProcessContextFn
.from(Team.class)
.into(of(Player.class))
.via(ctx -> TestSettins.toPlayers(ctx))
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
With method reference
CollectionComposer.of(teams)
.apply("Step name", FlatMapProcessContextFn
.from(Team.class)
.into(of(Player.class))
.via(TestSettins::toPlayers)
.withSetupAction(() -> System.out.print("Test"))
.withStartBundleAction(() -> System.out.print("Test"))
.withFinishBundleAction(() -> System.out.print("Test"))
.withTeardownAction(() -> System.out.print("Test")))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teams)
.flatMapFnWithContext("Step name",
{ ctx: DoFn<Team, Player>.ProcessContext -> TestSettings.toPlayers(ctx) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
With Kotlin it
CollectionComposer.of(teams)
.flatMapFnWithContext<Team, Player>("Step name",
{ TestSettings.toPlayers(it) },
setupAction = { print("Test") },
startBundleAction = { print("Test") },
finishBundleAction = { print("Test") },
teardownAction = { print("Test") }
)
.result
Extension for FilterFn
Asgarde Java :
CollectionComposer.of(teamCollection)
.apply("Step name", FilterFn.by(team -> simulateFilterErrorPsgTeam(team)))
.getResult();
With method reference
CollectionComposer.of(teamCollection)
.apply("Step name", FilterFn.by(this::simulateFilterErrorPsgTeam))
.getResult();
Asgarde Kotlin :
CollectionComposer.of(teamCollection)
.filter("Step name") { team -> simulateFilterErrorPsgTeam(team) }
.result
With Kotlin it
CollectionComposer.of(teamCollection)
.filter("Step name") { simulateFilterErrorPsgTeam(it) }
.result
Possible evolutions in the future
- Maybe allow injecting a custom
Failureobject and error handling function to be used in allapplycalls.