cascalog
cascalog copied to clipboard
Mappers writing to the same file resulting in overwriting of the results
I am using JCascalog to create a query to read and process data from a hbase table. On execution, two mappers are getting created for all types of queries and the second mapper overwrites the results from the first mapper leading to incomplete result set.
Here's a sample piece of code to test with:
Fields keyFields = new Fields("rowKey");
String[] familyNames = {"cf"};
Fields[] valueFields = new Fields[] {new Fields( "userId", "sum" )};
Tap hBaseTap = new HBaseTap("test_data", new HBaseScheme(keyFields, familyNames, valueFields), SinkMode.REPLACE);
Subquery tapSubquery = new Subquery("?rowKey", "?userId", "?sum").predicate(hBaseTap).out("?rowKeyB", "?userIdB", "?sumB")
.predicate(new ConvertBytesData(), "?rowKeyB", "?userIdB", "?sumB").out("?rowKey", "?userId", "?sum");
Api.execute(
new StdoutTap(),
new Subquery("?rowKey").predicate(tapSubquery, "?rowKey", "?userId", "?sum").predicate(new GreaterThan(), "?sum", 10));
LOGS from the above query execution:
13/04/01 15:50:06 INFO flow.Flow: [] starting
13/04/01 15:50:06 INFO flow.Flow: [] source: hbase://test_data
13/04/01 15:50:06 INFO flow.Flow: [] sink: StdoutTap["SequenceFile[[UNKNOWN]->['?rowKey']]"]["/tmp/temp745153607017456572211708411319110"]"]
13/04/01 15:50:06 INFO flow.Flow: [] parallel execution is enabled: false
13/04/01 15:50:06 INFO flow.Flow: [] starting jobs: 1
13/04/01 15:50:06 INFO flow.Flow: [] allocating threads: 1
13/04/01 15:50:06 INFO flow.FlowStep: [] starting step: (1/1) ...7017456572211708411319110
13/04/01 15:50:06 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
13/04/01 15:50:06 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-cdh4.2.0--1, built on 02/15/2013 18:33 GMT
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:host.name=localhost
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.version=1.6.0_37
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Sun Microsystems Inc.
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/lib/jvm/jdk1.6.0_37/jre
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.class.path=.....[removed]
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/usr/lib/jvm/jdk1.6.0_37/jre/lib/amd64/server:/usr/lib/jvm/jdk1.6.0_37/jre/lib/amd64:/usr/lib/jvm/jdk1.6.0_37/jre/../lib/amd64:/usr/lib/jvm/jdk1.6.0_31/jre/lib/amd64/server:/usr/lib/jvm/jdk1.6.0_31/jre/lib/amd64:/usr/lib/jvm/jdk1.6.0_31/jre/../lib/amd64:/home/username/Apps/intellij/bin::/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:os.version=3.2.0-39-generic
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:user.name=username
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/username
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Client environment:user.dir=/home/username/git/project
13/04/01 15:50:07 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=hconnection
13/04/01 15:50:07 INFO zookeeper.RecoverableZooKeeper: The identifier of this process is 30874@username-VirtualBox
13/04/01 15:50:07 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (Unable to locate a login configuration)
13/04/01 15:50:07 INFO zookeeper.ClientCnxn: Socket connection established to localhost/127.0.0.1:2181, initiating session
13/04/01 15:50:07 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x13dc7192f49002e, negotiated timeout = 40000
13/04/01 15:50:07 WARN conf.Configuration: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
13/04/01 15:50:07 INFO mapred.LocalJobRunner: OutputCommitter set in config null
13/04/01 15:50:07 INFO flow.FlowStep: [] submitted hadoop job: job_local1308159380_0001
13/04/01 15:50:07 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
13/04/01 15:50:07 INFO mapred.LocalJobRunner: Waiting for map tasks
13/04/01 15:50:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1308159380_0001_m_000000_0
13/04/01 15:50:07 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
13/04/01 15:50:07 INFO util.ProcessTree: setsid exited with exit code 0
13/04/01 15:50:07 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@63cd0037
13/04/01 15:50:07 INFO hadoop.TupleSerialization: using default comparator: cascalog.hadoop.DefaultComparator
13/04/01 15:50:07 INFO mapred.MapTask: Processing split: cascading.tap.hadoop.io.MultiInputSplit@635b9f9a
13/04/01 15:50:08 WARN mapreduce.Counters: Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
13/04/01 15:50:08 INFO mapred.MapTask: numReduceTasks: 0
13/04/01 15:50:08 INFO hadoop.TupleSerialization: using default comparator: cascalog.hadoop.DefaultComparator
13/04/01 15:50:08 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.8
13/04/01 15:50:08 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/04/01 15:50:08 INFO hadoop.FlowMapper: sourcing from: hbase://test_data
13/04/01 15:50:08 INFO hadoop.FlowMapper: sinking to: StdoutTap["SequenceFile[[UNKNOWN]->['?rowKey']]"]["/tmp/temp745153607017456572211708411319110"]"]
13/04/01 15:50:13 INFO mapred.LocalJobRunner:
13/04/01 15:50:16 INFO mapred.LocalJobRunner:
13/04/01 15:50:19 INFO mapred.LocalJobRunner:
13/04/01 15:50:22 INFO mapred.LocalJobRunner:
13/04/01 15:50:25 INFO mapred.LocalJobRunner:
13/04/01 15:50:28 INFO mapred.LocalJobRunner:
13/04/01 15:50:30 INFO mapred.Task: Task:attempt_local1308159380_0001_m_000000_0 is done. And is in the process of commiting
13/04/01 15:50:30 INFO mapred.LocalJobRunner:
13/04/01 15:50:30 INFO mapred.Task: Task attempt_local1308159380_0001_m_000000_0 is allowed to commit now
13/04/01 15:50:30 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local1308159380_0001_m_000000_0' to file:/tmp/temp745153607017456572211708411319110
13/04/01 15:50:30 INFO mapred.LocalJobRunner:
13/04/01 15:50:30 INFO mapred.Task: Task 'attempt_local1308159380_0001_m_000000_0' done.
13/04/01 15:50:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1308159380_0001_m_000000_0
13/04/01 15:50:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1308159380_0001_m_000001_0
13/04/01 15:50:30 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
13/04/01 15:50:30 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@91c0352
13/04/01 15:50:30 INFO hadoop.TupleSerialization: using default comparator: cascalog.hadoop.DefaultComparator
13/04/01 15:50:30 INFO mapred.MapTask: Processing split: cascading.tap.hadoop.io.MultiInputSplit@3179fd59
13/04/01 15:50:30 WARN mapreduce.Counters: Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
13/04/01 15:50:30 INFO mapred.MapTask: numReduceTasks: 0
13/04/01 15:50:30 INFO hadoop.TupleSerialization: using default comparator: cascalog.hadoop.DefaultComparator
13/04/01 15:50:30 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.8
13/04/01 15:50:30 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/04/01 15:50:30 INFO hadoop.FlowMapper: sourcing from: hbase://test_data
13/04/01 15:50:30 INFO hadoop.FlowMapper: sinking to: StdoutTap["SequenceFile[[UNKNOWN]->['?rowKey']]"]["/tmp/temp745153607017456572211708411319110"]"]
13/04/01 15:50:36 INFO mapred.LocalJobRunner:
13/04/01 15:50:39 INFO mapred.LocalJobRunner:
13/04/01 15:50:42 INFO mapred.LocalJobRunner:
13/04/01 15:50:45 INFO mapred.LocalJobRunner:
13/04/01 15:50:48 INFO mapred.LocalJobRunner:
13/04/01 15:50:51 INFO mapred.LocalJobRunner:
13/04/01 15:50:54 INFO mapred.LocalJobRunner:
13/04/01 15:50:57 INFO mapred.LocalJobRunner:
13/04/01 15:51:00 INFO mapred.LocalJobRunner:
13/04/01 15:51:03 INFO mapred.LocalJobRunner:
13/04/01 15:51:06 INFO mapred.LocalJobRunner:
13/04/01 15:51:09 INFO mapred.LocalJobRunner:
13/04/01 15:51:12 INFO mapred.LocalJobRunner:
13/04/01 15:51:15 INFO mapred.LocalJobRunner:
13/04/01 15:51:18 INFO mapred.LocalJobRunner:
13/04/01 15:51:21 INFO mapred.LocalJobRunner:
13/04/01 15:51:24 INFO mapred.LocalJobRunner:
13/04/01 15:51:27 INFO mapred.LocalJobRunner:
13/04/01 15:51:30 INFO mapred.LocalJobRunner:
13/04/01 15:51:33 INFO mapred.LocalJobRunner:
13/04/01 15:51:36 INFO mapred.LocalJobRunner:
13/04/01 15:51:39 INFO mapred.LocalJobRunner:
13/04/01 15:51:42 INFO mapred.LocalJobRunner:
13/04/01 15:51:45 INFO mapred.LocalJobRunner:
13/04/01 15:51:48 INFO mapred.LocalJobRunner:
13/04/01 15:51:51 INFO mapred.LocalJobRunner:
13/04/01 15:51:54 INFO mapred.LocalJobRunner:
13/04/01 15:51:57 INFO mapred.LocalJobRunner:
13/04/01 15:52:00 INFO mapred.LocalJobRunner:
13/04/01 15:52:03 INFO mapred.LocalJobRunner:
13/04/01 15:52:06 INFO mapred.LocalJobRunner:
13/04/01 15:52:09 INFO mapred.LocalJobRunner:
13/04/01 15:52:11 INFO mapred.Task: Task:attempt_local1308159380_0001_m_000001_0 is done. And is in the process of commiting
13/04/01 15:52:11 INFO mapred.LocalJobRunner:
13/04/01 15:52:11 INFO mapred.Task: Task attempt_local1308159380_0001_m_000001_0 is allowed to commit now
13/04/01 15:52:11 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local1308159380_0001_m_000001_0' to file:/tmp/temp745153607017456572211708411319110
13/04/01 15:52:11 INFO mapred.LocalJobRunner:
13/04/01 15:52:11 INFO mapred.Task: Task 'attempt_local1308159380_0001_m_000001_0' done.
13/04/01 15:52:11 INFO mapred.LocalJobRunner: Finishing task: attempt_local1308159380_0001_m_000001_0
13/04/01 15:52:11 INFO mapred.LocalJobRunner: Map task executor complete.
13/04/01 15:52:11 INFO mapred.FileInputFormat: Total input paths to process : 2
13/04/01 15:52:11 INFO hadoop.TupleSerialization: using default comparator: cascalog.hadoop.DefaultComparator
RESULTS
-----------------------
8c2805d2_387792338
8c29807b_118123081
.....
does the same query work if the source tap is textline? Let's figure out if it's the hbase tap or the query causing the problem.