seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Improve][connector-V2-Neo4j]Supports neo4j sink batch write and update docs

Open FuYouJ opened this issue 2 years ago • 5 comments

Purpose of this pull request

Check list

  • [x] Code changed are covered with tests, or it does not need tests for reason:
  • [x] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
  • [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
  • [x] If you are contributing the connector code, please check that the following files are updated:
    1. Update change log that in connector document. For more details you can refer to connector-v2
    2. Update plugin-mapping.properties and add new connector information in it
    3. Update the pom file of seatunnel-dist
  • [x] Update the release-note.

FuYouJ avatar May 26 '23 16:05 FuYouJ

issue

FuYouJ avatar May 30 '23 02:05 FuYouJ

please add e2e testcase check this change https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e

hailin0 avatar Jun 02 '23 02:06 hailin0

update release-note.md

https://github.com/apache/seatunnel/blob/dev/release-note.md

hailin0 avatar Jun 02 '23 02:06 hailin0

please add e2e testcase check this change https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e

according to your request, I have now provided the updated test code.

FuYouJ avatar Jun 03 '23 12:06 FuYouJ

update release-note.md

https://github.com/apache/seatunnel/blob/dev/release-note.md

[Connector-v2] [Neo4j] Sink supports batch write. (#4835)

FuYouJ avatar Jun 03 '23 12:06 FuYouJ

please add e2e testcase check this change https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e

I have added additional tests, please review.

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 5000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    parallelism = 1
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
}

sink {
  Neo4j {
    uri = "neo4j://neo4j-host:7687"
    username = "neo4j"
    password = "Test@12343"
    database = "neo4j"
    batch_data_variable = "ttt"
    max_batch_size = 3
    write_mode = "Batch"

    max_transaction_retry_time = 3
    max_connection_timeout = 1

    query = "unwind $ttt as row  create(n:BatchLabel) set n.name = row.name,n.age = row.age"
  }
}

and this is e2e test code

public void testBatchWrite(TestContainer container) throws IOException, InterruptedException {
        // clean test data before test
        final Result checkExists = neo4jSession.run("MATCH (n:BatchLabel) RETURN n limit 1");
        if (checkExists.hasNext()) {
            neo4jSession.run("MATCH (n:BatchLabel) delete n");
        }

        // unwind $ttt as row create(n:BatchLabel) set n.name = row.name,n.age = row.age
        Container.ExecResult execResult =
                container.executeJob("/neo4j/fake_to_neo4j_batch_write.conf");
        // then
        Assertions.assertEquals(0, execResult.getExitCode());
        final Result result = neo4jSession.run("MATCH (n:BatchLabel) RETURN n");
        // nodes
        assertTrue(result.hasNext());
        // verify the attributes of the node
        result.stream()
                .forEach(
                        r -> {
                            String name = r.get("n").get("name").asString();
                            assertNotNull(name);
                            Object age = r.get("n").get("age").asObject();
                            assertNotNull(age);
                        });
    }

FuYouJ avatar Jun 09 '23 12:06 FuYouJ

@TyrantLucifer image image The latest code has been replaced with batch_data_variable, which represents how a set of data is represented in neo4j. If batch_data_variable = ttt, then in cypher it's unwind $ttt to do something

FuYouJ avatar Jun 10 '23 12:06 FuYouJ

now, i removed batch_data_variable in sink config, which now provides the default value is batch @TyrantLucifer

FuYouJ avatar Jun 10 '23 13:06 FuYouJ