cassandra-river icon indicating copy to clipboard operation
cassandra-river copied to clipboard

indexing a cassandra column family

Open macca129 opened this issue 11 years ago • 14 comments

I have successfully installed the river and can successfully connect to the cassandra cluster, however I am missing something fundamental since I cannot see a index building.

I have 10,000 rows in my CF in a 3 node cluster and see activity in the ES logs [2014-02-27 16:01:03,516][INFO ][river.cassandra ] [Warren III Worthington] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 16:01:03,716][INFO ][river.cassandra ] [Warren III Worthington] [cassandra][maccasIndex] Starting thread with 738 keys

this repeats over and over again.

I am not using Auth in the cluster so have omitted the properties

sanitised

curl -XPUT 'localhost:9200/_river/maccasIndex/_meta' -d '{ "type" : "cassandra", "cassandra" : { "cluster_name" : "MaccasCluster", "keyspace" : "MaccasKeyspace", "column_family" : "MaccasCF", "batch_size" : 1000, "hosts" : "dsc1n1:9160,dsc1n2:9160,dsc1n3:9160"

},
"index" : {
    "index" : "maccasIndex",
    "type" : "cassandra"
}

}'

Cassandra info: [cqlsh 3.1.2 | Cassandra 1.2.6.1 | CQL spec 3.0.0 | Thrift protocol 19.36.0]

the CF definition:

CREATE TABLE "MaccasCF" ( key text PRIMARY KEY, "___class" text, "addressLine1" text, "addressLine2" text, ......


I am definitely missing something

http://localhost:9200/_status?pretty=1

{ "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "indices" : { "_river" : { "index" : { "primary_size_in_bytes" : 3820, "size_in_bytes" : 3820 }, "translog" : { "operations" : 0 }, "docs" : { "num_docs" : 1, "max_doc" : 1, "deleted_docs" : 0 }, "merges" : { "current" : 0, "current_docs" : 0, "current_size_in_bytes" : 0, "total" : 0, "total_time_in_millis" : 0, "total_docs" : 0, "total_size_in_bytes" : 0 }, "refresh" : { "total" : 2, "total_time_in_millis" : 84 }, "flush" : { "total" : 1, "total_time_in_millis" : 20 }, "shards" : { "0" : [ { "routing" : { "state" : "STARTED", "primary" : true, "node" : "czJqPzFBSseiXoKX0YtWkA", "relocating_node" : null, "shard" : 0, "index" : "_river" }, "state" : "STARTED", "index" : { "size_in_bytes" : 3820 }, "translog" : { "id" : 1393509195769, "operations" : 0 }, "docs" : { "num_docs" : 1, "max_doc" : 1, "deleted_docs" : 0 }, "merges" : { "current" : 0, "current_docs" : 0, "current_size_in_bytes" : 0, "total" : 0, "total_time_in_millis" : 0, "total_docs" : 0, "total_size_in_bytes" : 0 }, "refresh" : { "total" : 2, "total_time_in_millis" : 84 }, "flush" : { "total" : 1, "total_time_in_millis" : 20 } } ] } } } }

http://localhost:9200/_river/maccasIndex/_mapping?pretty=1 { "_river" : { "mappings" : { "maccasIndex" : { "properties" : { "cassandra" : { "properties" : { "batch_size" : { "type" : "long" }, "cluster_name" : { "type" : "string" }, "column_family" : { "type" : "string" }, "hosts" : { "type" : "string" }, "keyspace" : { "type" : "string" } } }, "index" : { "properties" : { "index" : { "type" : "string" }, "type" : { "type" : "string" } } }, "type" : { "type" : "string" } } } } }

}

if I run:

http://localhost:9200/_river/maccasIndex/search?q=:_pretty=1

I get {"took":268,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}

but no sign of any indexed data

Can you tell me where I am going wrong?

macca129 avatar Feb 27 '14 16:02 macca129

Do you ever see this log.INFO statement Inserting {} keys in ES? If not then I think there is a bug here: https://github.com/eBay/cassandra-river/blob/master/src/main/java/org/elasticsearch/river/cassandra/CassandraRiver.java#L166

It's not calling saveToEs with the bulk data.

utsengar avatar Feb 27 '14 17:02 utsengar

No I don't ever see that statement in the log I just see the "starting thread with 1000 keys"

macca129 avatar Feb 27 '14 17:02 macca129

I thought my syntax might be incorrect

macca129 avatar Feb 27 '14 17:02 macca129

Can you add these two lines before the if check on line 165 in the class CassandraRiver:

logger.info("Current bulk size: {}", bulk.numberOfActions());
logger.info("Total batch size set: {}", this.batchSize);

Rebuild the plugin and run it again. I am not very sure what is wrong there, will need to debug it. It works on my local machine.

utsengar avatar Feb 27 '14 17:02 utsengar

sure thing what do I need to do to perform the rebuild? LINUX centos with maven installed

macca129 avatar Feb 27 '14 17:02 macca129

You just need mvn clean package and then copy over the jar. Its in the README.md:

Build: mvn clean package

Install: ./bin/plugin -url file:elasticsearch-river-cassandra/target/releases/elasticsearch-river-cassandra-1.0.0-SNAPSHOT.zip -install river-cassandra

utsengar avatar Feb 27 '14 17:02 utsengar

ok I have rebuilt but I'm not seeing any additional statements in the logs

macca129 avatar Feb 27 '14 17:02 macca129

logs output....

[2014-02-27 17:39:57,286][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,396][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 738 keys [2014-02-27 17:39:57,575][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,699][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,791][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,955][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,072][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,212][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,387][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys

macca129 avatar Feb 27 '14 17:02 macca129

Am I supposed to see: I would expect to see 10,000 json documents indexed, and I would like to search on all the other fields and return a record ID which is my primary key in the C* db CF

macca129 avatar Feb 27 '14 17:02 macca129

yes, that is the expected behavior. There looks an issue with the data or the river where your bulk object is not being constructed.

Can you add the same print statements after this line: BulkRequestBuilder bulk = client.prepareBulk();

And also add this print statement after: for(String key : this.keys.rowColumnMap.keySet()){ logger.info("Current key: {}", key);

utsengar avatar Feb 27 '14 17:02 utsengar

ok this is what I see now

[2014-02-27 17:59:45,301][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 6a81fdb0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 1f10f110-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 146688b0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,680][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,680][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,681][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,681][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 545d80e0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,779][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,779][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,780][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,780][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: a6b53720-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 610fc730-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,088][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: c2145b40-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,192][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 9704f310-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: ff236b30-7951-11e3-b704-000c29151863 [2014-02-27 17:59:46,435][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 6df99840-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 562b8a20-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,652][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 3f23d940-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,805][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,805][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,806][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,806][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: c8e6e0a0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 7324af30-7952-11e3-b704-000c29151863 [2014-02-27 17:59:47,013][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: e901fba0-7951-11e3-b704-000c29151863

macca129 avatar Feb 27 '14 18:02 macca129

FYI all CF fields including the PRIMARY KEY field are of type "text"

macca129 avatar Feb 27 '14 18:02 macca129

So the problem is some where in this line: bulk.add(indexRequest(this.indexName).type(this.typeName).id(id).source(this.keys.rowColumnMap.get(key))); is the problem. This line is not really adding documents to the ES indexer.

I don't have much time right now to reproduce your scenario right now since this works fine on my machine locally. Also I wrote this river for a quick prototype to connect ES with cassandra. If you plan to use it in production, I strongly recommend to understand how it works and improve it.

I would be more than happy to help you debug it.

utsengar avatar Feb 27 '14 18:02 utsengar

perhaps if I recreate your table structure and your create index statement (from your environment) I can remove my schema from the problem?

macca129 avatar Feb 27 '14 19:02 macca129