cascading-cassandra
cascading-cassandra copied to clipboard
Final sink step will always be skipped
Hello,
I believe I have found a bug that will always prevent sinking to Cassandra - i.e. cascading will always skip the step.
getModifiedTime() in CassandraTap.java will always return the current time - for sourcing from tables this will be 'correct', but for sinking this will cause the Cascading
isSkipFlowStep()
task (https://github.com/Cascading/cascading/blob/2.1/cascading-core/src/main/java/cascading/flow/planner/FlowStepJob.java) to always return true, due to the
return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() );
step in https://github.com/ifesdjeen/cascading-cassandra/blob/master/src/main/java/com/ifesdjeen/cascading/cassandra/CassandraTap.java
I believe modifying CassandraTap to return, say a time of 0 in sink mode will always force it to return true, thus not skipping the step.
In addition here are exemplar lines of the output logs showing this in action (first is cassandra sinking, second is HFS text file sinking
13/09/06 15:37:35 INFO mapred.Task: Task 'attempt_local1747952158_0002_r_000000_0' done.
13/09/06 15:37:36 INFO flow.FlowStep: [spotifyStreams] sink oldest modified date: Fri Sep 06 15:37:36 BST 2013
13/09/06 15:37:36 INFO flow.FlowStep: [spotifyStreams] source modification date at: Fri Sep 06 15:37:35 BST 2013
13/09/06 15:37:36 INFO flow.FlowStep: [spotifyStreams] skipping step: (3/3) ...al_spotify_artist_streams
13/09/06 15:45:15 INFO flow.Flow: [spotifyStreams] sink: Hfs["TextDelimited[['artistId', 'date', 'spotifyRecord']]"]["testdata/outputentity"]"]
13/09/06 16:13:39 INFO flow.Flow: [spotifyStreams] sink: CassandraTap["CassandraCQL3Scheme[[UNKNOWN]->['date', 'artistId', 'spotifyRecord']]"]["TEMP_ID_127.0.0.1_9160_artistportal_spotify_artist_streams"]"]
hmm... it def doesn't always prevent sinking to c* ... i've sunk lots of data and never encountered this prob
i wonder under which circs it skips and which it doesn't ?
i'll take a look as soon as i can: probably monday now
:c
On 6 Sep 2013, at 16:53, Matthew Larsen [email protected] wrote:
Hello,
I believe I have found a bug that will always prevent sinking to Cassandra
- i.e. cascading will always skip the step.
getModifiedTime() in CassandraTap.java will always return the current time
- for sourcing from tables this will be 'correct', but for sinking this will cause the Cascading ```java isSkipFlowStep()
task (https://github.com/Cascading/cascading/blob/2.1/cascading-core/src/main/java/cascading/flow/planner/FlowStepJob.java) to always return true, due to the
return flowStep.allSourcesExist() && !flowStep.areSourcesNewer(
flowStep.getSinkModified() );
step in
https://github.com/ifesdjeen/cascading-cassandra/blob/master/src/main/java/com/ifesdjeen/cascading/cassandra/CassandraTap.java
I believe modifying CassandraTap to return, say a time of 0 in sink mode
will always force it to return true, thus not skipping the step.
—
Reply to this email directly or view it on
GitHub<https://github.com/ifesdjeen/cascading-cassandra/issues/11>
.
I appreciate it!
I am running it using pure java (not cascalog); as far as I can tell I have all my vars / configs setup properly - I have no idea why it's skipping this last step :(
I am running locally on Hadoop 1.2.1 with cascading 2.1.2
Going to try cloning the repo and adjusting that value, see if I can coax it to not skip.
Just tested with setting the Timestamp Long to 0, got it to pass through and start sinking .... but now I've run into serialisation problems! So that's my problem.
Also while on the subject, is it supposed to be UNKNOWN in the definition?
CassandraTap["CassandraCQL3Scheme[[UNKNOWN]->['date', 'artistId', 'spotifyRecord']]"]
great ! so maybe cascalog is doing something which mitigates the prob, hence i never see it
i'll look at a patch on monday
On 6 Sep 2013, at 18:09, Matthew Larsen [email protected] wrote:
Just tested with setting the Timestamp Long to 0, got it to pass through and start sinking .... but now I've run into serialisation problems! So that's my problem.
Also while on the subject, is it supposed to be UNKNOWN in the definition?
CassandraTap["CassandraCQL3Scheme[[UNKNOWN]->['date', 'artistId', 'spotifyRecord']]"]
— Reply to this email directly or view it on GitHubhttps://github.com/ifesdjeen/cascading-cassandra/issues/11#issuecomment-23955073 .
Submitted a pull request :o)