seatunnel
seatunnel copied to clipboard
[Improve][connector-V2-Neo4j]Supports neo4j sink batch write and update docs
Purpose of this pull request
Check list
- [x] Code changed are covered with tests, or it does not need tests for reason:
- [x] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- [x] If you are contributing the connector code, please check that the following files are updated:
- Update change log that in connector document. For more details you can refer to connector-v2
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- [x] Update the
release-note.
please add e2e testcase check this change https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e
update release-note.md
https://github.com/apache/seatunnel/blob/dev/release-note.md
please add e2e testcase check this change https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e
according to your request, I have now provided the updated test code.
update release-note.md
https://github.com/apache/seatunnel/blob/dev/release-note.md
[Connector-v2] [Neo4j] Sink supports batch write. (#4835)
please add e2e testcase check this change https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e
I have added additional tests, please review.
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {
}
sink {
Neo4j {
uri = "neo4j://neo4j-host:7687"
username = "neo4j"
password = "Test@12343"
database = "neo4j"
batch_data_variable = "ttt"
max_batch_size = 3
write_mode = "Batch"
max_transaction_retry_time = 3
max_connection_timeout = 1
query = "unwind $ttt as row create(n:BatchLabel) set n.name = row.name,n.age = row.age"
}
}
and this is e2e test code
public void testBatchWrite(TestContainer container) throws IOException, InterruptedException {
// clean test data before test
final Result checkExists = neo4jSession.run("MATCH (n:BatchLabel) RETURN n limit 1");
if (checkExists.hasNext()) {
neo4jSession.run("MATCH (n:BatchLabel) delete n");
}
// unwind $ttt as row create(n:BatchLabel) set n.name = row.name,n.age = row.age
Container.ExecResult execResult =
container.executeJob("/neo4j/fake_to_neo4j_batch_write.conf");
// then
Assertions.assertEquals(0, execResult.getExitCode());
final Result result = neo4jSession.run("MATCH (n:BatchLabel) RETURN n");
// nodes
assertTrue(result.hasNext());
// verify the attributes of the node
result.stream()
.forEach(
r -> {
String name = r.get("n").get("name").asString();
assertNotNull(name);
Object age = r.get("n").get("age").asObject();
assertNotNull(age);
});
}
@TyrantLucifer
The latest code has been replaced with batch_data_variable, which represents how a set of data is represented in neo4j. If batch_data_variable = ttt, then in cypher it's unwind $ttt to do something
now, i removed batch_data_variable in sink config, which now provides the default value is batch
@TyrantLucifer