cassandra-river
cassandra-river copied to clipboard
indexing a cassandra column family
I have successfully installed the river and can successfully connect to the cassandra cluster, however I am missing something fundamental since I cannot see a index building.
I have 10,000 rows in my CF in a 3 node cluster and see activity in the ES logs [2014-02-27 16:01:03,516][INFO ][river.cassandra ] [Warren III Worthington] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 16:01:03,716][INFO ][river.cassandra ] [Warren III Worthington] [cassandra][maccasIndex] Starting thread with 738 keys
this repeats over and over again.
I am not using Auth in the cluster so have omitted the properties
sanitised
curl -XPUT 'localhost:9200/_river/maccasIndex/_meta' -d '{ "type" : "cassandra", "cassandra" : { "cluster_name" : "MaccasCluster", "keyspace" : "MaccasKeyspace", "column_family" : "MaccasCF", "batch_size" : 1000, "hosts" : "dsc1n1:9160,dsc1n2:9160,dsc1n3:9160"
},
"index" : {
"index" : "maccasIndex",
"type" : "cassandra"
}
}'
Cassandra info: [cqlsh 3.1.2 | Cassandra 1.2.6.1 | CQL spec 3.0.0 | Thrift protocol 19.36.0]
the CF definition:
CREATE TABLE "MaccasCF" ( key text PRIMARY KEY, "___class" text, "addressLine1" text, "addressLine2" text, ......
I am definitely missing something
http://localhost:9200/_status?pretty=1
{ "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "indices" : { "_river" : { "index" : { "primary_size_in_bytes" : 3820, "size_in_bytes" : 3820 }, "translog" : { "operations" : 0 }, "docs" : { "num_docs" : 1, "max_doc" : 1, "deleted_docs" : 0 }, "merges" : { "current" : 0, "current_docs" : 0, "current_size_in_bytes" : 0, "total" : 0, "total_time_in_millis" : 0, "total_docs" : 0, "total_size_in_bytes" : 0 }, "refresh" : { "total" : 2, "total_time_in_millis" : 84 }, "flush" : { "total" : 1, "total_time_in_millis" : 20 }, "shards" : { "0" : [ { "routing" : { "state" : "STARTED", "primary" : true, "node" : "czJqPzFBSseiXoKX0YtWkA", "relocating_node" : null, "shard" : 0, "index" : "_river" }, "state" : "STARTED", "index" : { "size_in_bytes" : 3820 }, "translog" : { "id" : 1393509195769, "operations" : 0 }, "docs" : { "num_docs" : 1, "max_doc" : 1, "deleted_docs" : 0 }, "merges" : { "current" : 0, "current_docs" : 0, "current_size_in_bytes" : 0, "total" : 0, "total_time_in_millis" : 0, "total_docs" : 0, "total_size_in_bytes" : 0 }, "refresh" : { "total" : 2, "total_time_in_millis" : 84 }, "flush" : { "total" : 1, "total_time_in_millis" : 20 } } ] } } } }
http://localhost:9200/_river/maccasIndex/_mapping?pretty=1 { "_river" : { "mappings" : { "maccasIndex" : { "properties" : { "cassandra" : { "properties" : { "batch_size" : { "type" : "long" }, "cluster_name" : { "type" : "string" }, "column_family" : { "type" : "string" }, "hosts" : { "type" : "string" }, "keyspace" : { "type" : "string" } } }, "index" : { "properties" : { "index" : { "type" : "string" }, "type" : { "type" : "string" } } }, "type" : { "type" : "string" } } } } }
}
if I run:
http://localhost:9200/_river/maccasIndex/search?q=:_pretty=1
I get {"took":268,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}
but no sign of any indexed data
Can you tell me where I am going wrong?
Do you ever see this log.INFO statement Inserting {} keys in ES
? If not then I think there is a bug here: https://github.com/eBay/cassandra-river/blob/master/src/main/java/org/elasticsearch/river/cassandra/CassandraRiver.java#L166
It's not calling saveToEs with the bulk data.
No I don't ever see that statement in the log I just see the "starting thread with 1000 keys"
I thought my syntax might be incorrect
Can you add these two lines before the if
check on line 165 in the class CassandraRiver:
logger.info("Current bulk size: {}", bulk.numberOfActions());
logger.info("Total batch size set: {}", this.batchSize);
Rebuild the plugin and run it again. I am not very sure what is wrong there, will need to debug it. It works on my local machine.
sure thing what do I need to do to perform the rebuild? LINUX centos with maven installed
You just need mvn clean package
and then copy over the jar. Its in the README.md:
Build: mvn clean package
Install: ./bin/plugin -url file:elasticsearch-river-cassandra/target/releases/elasticsearch-river-cassandra-1.0.0-SNAPSHOT.zip -install river-cassandra
ok I have rebuilt but I'm not seeing any additional statements in the logs
logs output....
[2014-02-27 17:39:57,286][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,396][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 738 keys [2014-02-27 17:39:57,575][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,699][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,791][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,955][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,072][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,212][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,387][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys
Am I supposed to see: I would expect to see 10,000 json documents indexed, and I would like to search on all the other fields and return a record ID which is my primary key in the C* db CF
yes, that is the expected behavior.
There looks an issue with the data or the river where your bulk
object is not being constructed.
Can you add the same print statements after this line: BulkRequestBuilder bulk = client.prepareBulk();
And also add this print statement after: for(String key : this.keys.rowColumnMap.keySet()){
logger.info("Current key: {}", key);
ok this is what I see now
[2014-02-27 17:59:45,301][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 6a81fdb0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 1f10f110-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 146688b0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,680][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,680][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,681][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,681][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 545d80e0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,779][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,779][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,780][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,780][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: a6b53720-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 610fc730-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,088][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: c2145b40-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,192][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 9704f310-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: ff236b30-7951-11e3-b704-000c29151863 [2014-02-27 17:59:46,435][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 6df99840-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 562b8a20-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,652][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 3f23d940-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,805][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,805][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,806][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,806][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: c8e6e0a0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 7324af30-7952-11e3-b704-000c29151863 [2014-02-27 17:59:47,013][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: e901fba0-7951-11e3-b704-000c29151863
FYI all CF fields including the PRIMARY KEY field are of type "text"
So the problem is some where in this line: bulk.add(indexRequest(this.indexName).type(this.typeName).id(id).source(this.keys.rowColumnMap.get(key)));
is the problem. This line is not really adding documents to the ES indexer.
I don't have much time right now to reproduce your scenario right now since this works fine on my machine locally. Also I wrote this river for a quick prototype to connect ES with cassandra. If you plan to use it in production, I strongly recommend to understand how it works and improve it.
I would be more than happy to help you debug it.
perhaps if I recreate your table structure and your create index statement (from your environment) I can remove my schema from the problem?