spring-cloud-dataflow icon indicating copy to clipboard operation
spring-cloud-dataflow copied to clipboard

CTR : not working in SNAPSHOT-3.0.0

Open cbonami opened this issue 1 year ago • 13 comments

Description:

Our journey began when trying to have a Composed Task with a Boot 3 based applications. The CTR of the latest released version of SCDF is still based on Boot 2. So the CTR itself started reading/writing in BOOT3-prefixed tables, while it is a Boot 2 application. That's why this didn't work. We also assumed that it is even not supposed to work due to the combination of Boot2 and Boot3. A bit odd, as Boot 3 is available already for such a long time.

So we hoped that the Boot 3 version of the CTR, available in SCDF github branch 'main3' would solve our issue. We also assumed that this branch corresponds to the image on dockerhub springcloud/spring-cloud-dataflow-composed-task-runner:3.0.0-SNAPSHOT. We ran our composed task with SCDF version: bitnami/spring-cloud-dataflow:2.11.5-debian-12-r3.

But then, even before sub-tasks are launched, we encounter this exception in the container running the composed task:

2024-11-14 10:57:39.320  INFO 1 --- [           main] o.s.c.d.c.ComposedTaskRunner             : Started ComposedTaskRunner in 27.109 seconds (process running for 28.389)
2024-11-14 10:57:39.326  INFO 1 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2024-11-14 10:57:39.538  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=migrate-employer]] launched with the following parameters: [{'ctr.id':'{value=15d0d628-c9ea-4c42-8b25-ac66711b573b, type=class java.lang.String, identifying=true}'}]
2024-11-14 10:57:39.620 ERROR 1 --- [           main] o.s.batch.core.job.AbstractJob           : Encountered fatal error executing job

org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
	at org.springframework.dao.support.DataAccessUtils.nullableSingleResult(DataAccessUtils.java:190) ~[spring-tx-6.1.14.jar:6.1.14]
	at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:890) ~[spring-jdbc-6.1.14.jar:6.1.14]
	at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:916) ~[spring-jdbc-6.1.14.jar:6.1.14]
	at org.springframework.batch.core.repository.dao.JdbcJobExecutionDao.synchronizeStatus(JdbcJobExecutionDao.java:394) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:194) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:379) ~[spring-tx-6.1.14.jar:6.1.14]
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.14.jar:6.1.14]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) ~[spring-aop-6.1.14.jar:6.1.14]
	at jdk.proxy2/jdk.proxy2.$Proxy108.update(Unknown Source) ~[na:na]
	at org.springframework.batch.core.job.AbstractJob.updateStatus(AbstractJob.java:445) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:312) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:157) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:148) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.run(TaskExecutorJobLauncher.java:59) ~[spring-batch-core-5.1.2.jar:5.1.2]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:210) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:194) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:174) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:169) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:164) ~[spring-boot-autoconfigure-3.3.5.jar:3.3.5]
	at org.springframework.cloud.task.configuration.observation.ObservationApplicationRunner.run(ObservationApplicationRunner.java:59) ~[spring-cloud-task-core-3.1.2.jar:3.1.2]
	at org.springframework.boot.SpringApplication.lambda$callRunner$4(SpringApplication.java:786) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:83) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:88) ~[spring-core-6.1.14.jar:6.1.14]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:786) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:774) ~[spring-boot-3.3.5.jar:3.3.5]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source) ~[na:na]
	at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(Unknown Source) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[na:na]
	at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) ~[na:na]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:774) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:342) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.5.jar:3.3.5]
	at org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskRunner.main(ComposedTaskRunner.java:31) ~[classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:95) ~[workspace/:na]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[workspace/:na]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65) ~[workspace/:na]
	at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:31) ~[workspace/:na]

2024-11-14 10:57:39.630  INFO 1 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=migrate-employer]] failed unexpectedly and fatally with the following parameters: [{'ctr.id':'{value=15d0d628-c9ea-4c42-8b25-ac66711b573b, type=class java.lang.String, identifying=true}'}]`

No clue why this happens.

Is this (still) as expected? Do we have to wait for the final 3.0.0 version of the CTR?

Release versions:

  • SCDF: bitnami/spring-cloud-dataflow:2.11.5-debian-12-r3
  • Composed Task Runner: springcloud/spring-cloud-dataflow-composed-task-runner:3.0.0-SNAPSHOT

cbonami avatar Nov 14 '24 11:11 cbonami

Hello, SCDF 2.11.5 only supports the Boot 2 implementation of CTR. Looking at the stack trace above it looks like it is from the BOOT-3 implementation of CTR.

If you were to create the following CTR definition, t1: timestamp && t2: timestamp do you get the same results? If so, what properties are you setting when you launch your CTR?

cppwfs avatar Nov 14 '24 12:11 cppwfs

I will try to do what you asked asap. But in the mean time, I can confirm: we deployed a CTR Boot 3 version with a SCDF 2.11.5. Because we are so desperate to get our Boot 3 tasks/jobs running in SCDF. We really need them to be Boot 3 (and not Boot 2 anymore) because we want to use Boot 3's support for improved metrics with Micrometer and new distributed tracing support with Micrometer Tracing.

So we configured the SCDF Helm chart as follows:

spring-cloud-dataflow:
  ...
  server:
    composedTaskRunner:
      image:
        registry: docker.io
        repository: springcloud/spring-cloud-dataflow-composed-task-runner
        tag: 3.0.0-SNAPSHOT
        digest: ""
    image:
      registry: docker.io
      repository: bitnami/spring-cloud-dataflow
      tag: 2.11.5-debian-12-r3
      digest: ""

Using the default Boot 2 CTR in combination with SB3 jobs simply didn't work, as that CTR will write in different tables. Juggling with table prefixes didn't lead anywhere.

But from what I understood from your answer, running a Boot 3 CTR on SCDF 2.11.5 won't be possible anyway, right?

cbonami avatar Nov 14 '24 17:11 cbonami

CTR 2.11.5 can launch Boot 3 applications. So don't use CTR 3.0.0 with SCDF 2.11.5.

cppwfs avatar Nov 14 '24 19:11 cppwfs

We didn't get CTR 2.11.5 working with Boot 3 BATCH applications. The CTR itself started reading/writing in BOOT3-prefixed tables, while it is a Boot 2 application. We received SQL exceptions about jobs not existing etc. We will check this again and give more details.

cbonami avatar Nov 18 '24 10:11 cbonami

Hey @cbonami,

in your task application are you defining a custom JobExplorer or JobRepository?

If so try to autowire org.springframework.boot.autoconfigure.batch.BatchProperties into the bean definition and create the beans like this

        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        // ....
        Optional.ofNullable(batchProperties)
            .map(BatchProperties::getJdbc)
            .map(BatchProperties.Jdbc::getTablePrefix)
            .ifPresent(factory::setTablePrefix);
        factory.afterPropertiesSet();
        return Objects.requireNonNull(factory.getObject());
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        // ....
        Optional.ofNullable(batchProperties)
            .map(BatchProperties::getJdbc)
            .map(BatchProperties.Jdbc::getTablePrefix)
            .ifPresent(jobExplorerFactoryBean::setTablePrefix);
        jobExplorerFactoryBean.afterPropertiesSet();
        return Objects.requireNonNull(jobExplorerFactoryBean.getObject());

klopfdreh avatar Nov 19 '24 12:11 klopfdreh

@klopfdreh , thanks for your reply. But no, we did not provide a custom JobExplorer or JobRepository.

Anyway, in the meantime we could get it running via a hack. At least, we consider this a hack.

We reverted everything to SCDF 2.11.5 and the standard Boot2-based CTR.

We deployed our Composed Task comprising 2 subtasks: a Boot3 task, and a Boot3 batch app. Like in our initial experiments, the FIRST run of the CT worked just fine. However, when launching the CT for the second time, we received this:

Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={run.id=1}.  If you want to run this job again, change the parameters.

This is what put us on the wrong foot during our first experiments when we tried the CTR (Boot2) with the 2 subtasks that where Boot2 before, but that we converted to Boot3. At that time we received exactly the same error, something that we never encountered in the Boot2-world. This made us conclude that the CTR also needed to be Boot3, which apparently is not the case.

So now, we circumvented this problem by adding an argument (called 'unique') when starting the CT. The argument contains a random UUID.

Now we can run the CT multiple times without any problem; this is what we see in the logs:

Job: [FlowJob: [name=migrate-employer]] launched with the following parameters: [{run.id=1, unique=fd1ed4b1-c9a6-40a2-9f9f-ac77a2414da0}]

As you can see, run.id always remains 1 for some reason, but we make the arguments unique with our 'hack'. Of course, we don't think that this is how this is intended to be done. The run.id should auto-increment, just like it did (we suppose) when using the Boot2-subtasks.

cbonami avatar Nov 21 '24 15:11 cbonami

To clarify. Its CTR that is throwing the JobInstanceAlreadyCompleteException exception?

cppwfs avatar Nov 21 '24 16:11 cppwfs

Indeed.

cbonami avatar Nov 27 '24 10:11 cbonami

Can you share with us the deployer and app properties (sensitive information redacted) you are using? Currently we are unable to reproduce.

cppwfs avatar Dec 02 '24 14:12 cppwfs

Sorry for the late reply. Here you go:

**Schema Target:** boot3

**Arguments:**
--migrate-employer.ignore-prerequisites: OVERRULE,WORK_SCHEDULE,CASCADE
--migrate-employer.pulsar-broker-url: ******
--migrate-employer.dicco-ds-url: ******
--spring.datasource.driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver
--management.metrics.tags.application: ${spring.cloud.task.name:unknown}-${spring.cloud.task.executionid:unknown}
--spring.cloud.task.name: migrate-employer-migrate-employer
--spring.datasource.password: ******
--spring.cloud.deployer.bootVersion: 3
--management.metrics.tags.service: task-application
--spring.datasource.username: ******
--spring.datasource.url: ******
--spring.cloud.task.initialize-enabled: false
--migrate-employer.skip-work-schedule-check: false
--migrate-employer.pulsar-client-url: ******
--spring.cloud.task.schemaTarget: boot3
--spring.batch.jdbc.table-prefix: BOOT3_BATCH_
--migrate-employer.continue-on-fail: false
--migrate-employer.onboarding-service-grpc-host: clink-grpc.connect-evolution-inttst
--spring.cloud.task.tablePrefix: BOOT3_TASK_
--migrate-employer.exclude-phases:
--migrate-employer.include-phases: CACHE_EMPLOYER,SPLIT_ADJUSTMENTS,EMPLOYER,AGREEMENT,WORK_SCHEDULE,WORKING_TIMES
--spring.cloud.task.parent-execution-id: 3
--spring.cloud.task.parent-schema-target: boot3
--app.migrate-employer.spring.cloud.task.initialize-enabled: false
--app.migrate-employer.spring.batch.jdbc.table-prefix: BOOT3_BATCH_
--app.migrate-employer.spring.cloud.task.tablePrefix: BOOT3_TASK_
--app.migrate-employer.spring.cloud.task.schemaTarget: boot3
--app.migrate-employer.spring.cloud.deployer.bootVersion: 3
--spring.cloud.task.executionid: 5

**External Execution Id:**
migrate-employer-migrate-employer-kev1mo4zqm

**Job Execution Ids:**
[3](https://scdf-server.inttst.tst.ce.acerta.io/dashboard/index.html#/tasks-jobs/job-executions/3/schemaTarget/boot3)
...

**Resource URL:**
Docker Resource [docker:harbor.tools.acerta.io/acerta-releases/acerta-connect-evolution/migrate-employer-task:0.59.0]

**Application Properties:**
migrate-employer.ignore-prerequisites: OVERRULE,WORK_SCHEDULE,CASCADE
migrate-employer.pulsar-broker-url: ******
migrate-employer.dicco-ds-url: ******
spring.datasource.driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver
management.metrics.tags.application: ${spring.cloud.task.name:unknown}-${spring.cloud.task.executionid:unknown}
spring.cloud.task.name: migrate-employer-migrate-employer
spring.datasource.password: ******
spring.cloud.deployer.bootVersion: 3
management.metrics.tags.service: task-application
spring.datasource.username: ******
spring.datasource.url: ******
spring.cloud.task.initialize-enabled: false
migrate-employer.skip-work-schedule-check: false
migrate-employer.pulsar-client-url: ******
spring.cloud.task.schemaTarget: boot3
spring.batch.jdbc.table-prefix: BOOT3_BATCH_
migrate-employer.continue-on-fail: false
migrate-employer.onboarding-service-grpc-host: clink-grpc.connect-evolution-inttst
spring.cloud.task.tablePrefix: BOOT3_TASK_

**Platform Properties:**
app.migrate-employer.spring.cloud.task.initialize-enabled: false
deployer.migrate-employer.kubernetes.secretKeyRefs: [{"envVarName":"MIGRATE_EMPLOYER.DICCO-DS-USERNAME","secretName":"scdf-server-dicco-datasource-cred","dataKey":"username"},{"envVarName":"MIGRATE_EMPLOYER.DICCO-DS-PASSWORD","secretName":"scdf-server-dicco-datasource-cred","dataKey":"password"}]
app.migrate-employer.dicco-ds-url: ******
deployer.migrate-employer.kubernetes.deploymentLabels: loki:acerta-payroll-service-tst,app.kubernetes.io/name:migrate-employer-migrate-employer
app.migrate-employer.ignore-prerequisites: OVERRULE,WORK_SCHEDULE,CASCADE
deployer.*.kubernetes.environment-variables: DATAFLOW_COMPOSED_TASK_NAME=migrate-employer,MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_ENABLED=true,MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_HOST=prometheus-rsocket-proxy.connect-evolution-prometheus-rsocket-proxy-inttst
app.migrate-employer.onboarding-service-grpc-host: clink-grpc.connect-evolution-inttst
app.migrate-employer.spring.cloud.task.schemaTarget: boot3
app.migrate-employer.pulsar-client-url: ******
app.migrate-employer.spring.cloud.task.tablePrefix: BOOT3_TASK_
app.migrate-employer.skip-work-schedule-check: false
app.migrate-employer.pulsar-broker-url: ******
app.migrate-employer.spring.cloud.deployer.bootVersion: 3
deployer.migrate-employer.kubernetes.podAnnotations: instrumentation.opentelemetry.io/inject-sdk: open-telemetry/default-sdk-tst
app.migrate-employer.spring.batch.jdbc.table-prefix: BOOT3_BATCH_
app.migrate-employer.continue-on-fail: false

Maybe not noteworthy, but we configure the apps and launch the CTR using SCDF-server's API.

cbonami avatar Dec 10 '24 09:12 cbonami

Hello @cbonami , Assuming **Arguments:** is for CTR you should not set --spring.cloud.task.schemaTarget: boot3 nor --spring.batch.jdbc.table-prefix: BOOT3_BATCH_ for the CTR. Since it is a Boot2 Application.

CTR is a Boot 2.x application that sends requests to SCDF to launch both Boot2 and Boot3 apps.

cppwfs avatar Dec 10 '24 13:12 cppwfs

Agreed. But here's the thing: we don't add these arguments. They are added automatically somehow. We register the apps and create the CT-task using the SCDF-server's API. We also start the CT-task using the SCDF-server's API.

Now, isn't this maybe related to naming? :

  • we have a CT called 'migrate-employer'; the DSL/definition is "init && migrate-employer". In the screenshots below, you will see that in fact there's more: "init && migrate-employer && onboard-ce && notify". The last 2 tasks are Spring Boot 2 tasks. But, if I remember well, we also tested with only "init && migrate-employer".
  • we have an app called "init" that is registered as a "bootVersion: 3"
  • we have an app called "migrate-employer" (same name as the CT) that is registered as a "bootVersion: 3"

In 'Applications':

image

In 'Tasks':

image

Here's the code that we use to start the CT 'migrate-employer':

...
import org.springframework.cloud.dataflow.rest.client.DataFlowOperations;
import org.springframework.cloud.dataflow.rest.client.dsl.DeploymentPropertiesBuilder;
import org.springframework.cloud.dataflow.rest.client.dsl.task.Task;
import org.springframework.cloud.dataflow.rest.resource.LaunchResponseResource;
...
    @Override
    public String startMigrateEmployer(String initiator, List<MigrateEmployerData> employers, @NotNull List<TaskPhase> includePhases, @NotNull List<TaskPhase> excludePhases) {
        final ComposedTask task = ComposedTask.MIGRATE_EMPLOYER;

        final Task scdfTask = Task.builder(dataFlowOperations).findByName(task.getName()).orElseThrow(() -> new IllegalArgumentException("Task [%s] not installed in SCDF Server".formatted(task.getName())));

        final DeploymentPropertiesBuilder deploymentPropertiesBuilder = new DeploymentPropertiesBuilder();
        final ScdfArgumentBuilder argumentBuilder = new ScdfArgumentBuilder();
        addGlobalProperties(deploymentPropertiesBuilder, task);
        addInitiator(deploymentPropertiesBuilder, initiator);
        addMigrateEmployerData(deploymentPropertiesBuilder, employers);

        DeterminePhasesForComposedTask.execute(task, includePhases, excludePhases)
                .forEach((key, value) -> {
                    addIncludedPhases(argumentBuilder, key, value.getFirst());
                    addExcludedPhases(argumentBuilder, key, value.getSecond());
                });
        addCustomAppProperties(deploymentPropertiesBuilder, task);
        addK8sLabels(deploymentPropertiesBuilder, task);
        addK8sAnnotations(deploymentPropertiesBuilder, task);

        Map<String, String> deploymentProperties = deploymentPropertiesBuilder.build();
        List<String> arguments = argumentBuilder.build();

        // If you omit or leave properties empty in subsequent runs, scdf will reuse the property and its filled in
        // value from the previous run. For include/exclude phases we don't want that behavior because its value
        // could be empty. To avoid the -transfer properties from last run behavior-, we use arguments.
        // Arguments always start fresh and scdf won't look at arguments from previous runs.
        final LaunchResponseResource launchResponse = scdfTask.launch(deploymentProperties, arguments);
        final long executionId = launchResponse.getExecutionId();

        return ComposedTaskId.from(task, executionId).getId();
    }

    private void addGlobalProperties(final DeploymentPropertiesBuilder builder, final ComposedTask task) {
        final Map<String, String> environmentVariables = new HashMap<>();

        environmentVariables.put(COMPOSED_TASK_NAME_ENVIRONMENT_VARIABLE, task.getName());
        if (scdfProperties.monitor().enabled()) {
            environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_ENABLED", "true");
            final ScdfProperties.monitor monitor = scdfProperties.monitor();
            if (StringUtils.isNotBlank(monitor.prometheusRSocketHost())) {
                environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_HOST", monitor.prometheusRSocketHost());
            }
            if (Objects.nonNull(monitor.prometheusRSocketPort())) {
                environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_PORT", monitor.prometheusRSocketPort().toString());
            }
        }

        builder.put(
                "deployer.*.kubernetes.environment-variables",
                environmentVariables.entrySet().stream()
                        .map(entry -> entry.getKey() + "=" + entry.getValue())
                        .collect(Collectors.joining(","))
        );
    }

    private void addInitiator(final DeploymentPropertiesBuilder builder, String initiator) {
        builder.put("app.%s.%s.initiator".formatted(INIT.getName(), INIT.getName()), initiator);
    }

    private void addMigrateEmployerData(final DeploymentPropertiesBuilder builder, final List<MigrateEmployerData> employers) {
        builder.put(
                "app.%s.%s.migrate-employer-data".
                        formatted(
                                INIT.getName(),
                                INIT.getName()
                        ),
                employers.stream()
                        .map(jsonSerializer::serialize)
                        .collect(Collectors.joining(";"))
        );
    }

    private void addCustomAppProperties(DeploymentPropertiesBuilder builder, ComposedTask composedTask) {
        composedTask.getSubTasks().forEach(task -> {
            final Map<String, String> taskProperties = scdfProperties.tasks().getOrDefault(task.getName(), Map.of());

            final String secrets = taskProperties
                    .entrySet()
                    .stream()
                    .filter(e -> Objects.nonNull(e.getValue()) && e.getValue().contains("secretName"))
                    .map(e -> {
                        final ScdfSecret scdfSecret = jsonSerializer.parse(e.getValue(), ScdfSecret.class);
                        return jsonSerializer.serialize(
                                new ScdfSecret(
                                        "%s.%s".formatted(task.name(), e.getKey().toUpperCase()),
                                        scdfSecret.secretName(),
                                        scdfSecret.dataKey()
                                )
                        );
                    })
                    .collect(Collectors.joining(","));

            builder.put("deployer.%s.kubernetes.secretKeyRefs".formatted(task.getName()), "[%s]".formatted(secrets));

            taskProperties
                    .entrySet()
                    .stream()
                    .filter(entry -> Objects.nonNull(entry.getValue()) && !entry.getValue().contains("secretName"))
                    .forEach(entry -> builder.put("app.%s.%s.%s".formatted(task.getName(), task.getName(), entry.getKey()), entry.getValue()));
        });
    }


and

public class ScdfArgumentBuilder {

    private final Map<Task, List<Pair<String, String>>> argumentsByTask = new EnumMap<>(Task.class);

    ScdfArgumentBuilder() {
    }

    ScdfArgumentBuilder add(final Task task, final String key, final String value) {
        final List<Pair<String, String>> arguments = argumentsByTask.computeIfAbsent(task, t -> new ArrayList<>());
        arguments.add(Pair.of(key, value));
        return this;
    }

    List<String> build() {
        List<String> list = argumentsByTask.entrySet().stream()
                .map(taskMapEntry -> {
                    final Task task = taskMapEntry.getKey();
                    return IntStream.range(0, taskMapEntry.getValue().size())
                            .mapToObj(i -> {
                                final Pair<String, String> argument = taskMapEntry.getValue().get(i);
                                return "app.%s.%s=--%s=%s".formatted(task.getName(), i, argument.getKey(), argument.getValue());
                            })
                            .toList();
                })
                .flatMap(List::stream)
                .toList();
        List<String> list2 = new ArrayList<>(list);
        // hack: since Boot 3 subtasks are used, we need to add a unique identifier to the arguments
        list2.add("unique=" + UUID.randomUUID());
        return list2;
    }
}

cbonami avatar Dec 10 '24 16:12 cbonami

Are you deleting the previous task-executions of your CTR before launching the new one?

cppwfs avatar Jan 02 '25 16:01 cppwfs