mrjob
mrjob copied to clipboard
--cleanup TMP does not clean HDFS with EMR Runner
I'm noticing some issues of my jobs frequently running out of disk on EMR. I'm using an outdated AMI (2.4.11
) but mrjob-5.10
. I'm leveraging the great pooling features in mrjob
to save costs on parallel, multi-job workloads that use similar clusters. It seems however that HDFS is not getting cleaned up after each job even though I have my --cleanup
param set to TMP
.
The jobs running:
And HDFS resource utilization:
In many cases the utilization will hit 100% and the job will fail (and cluster will terminate since I use that emr_action_on_failure=TERMINATE_CLUSTER
)
I dug into the code and I think I see the problem. I'm happy to take a stab at contributing a fix but wanted input beforehand.
It looks like in 5.10 EMRJobRunner
subclasses MRJobRunner
which calls _cleanup_hadoop_tmp
for TMP, but that is only defined as a pass
. The useful bits of that method are in HadoopJobRunner
which doesn't appear to be referenced in the class hierarchy. I notice in 6.0 HadoopInTheCloudRunner
is introduced as an extra abstraction but that doesn't seem to implement temp file cleanup either. It seems like it may be appropriate to copy some of the logic currently in HadoopJobRunner
in there?
Is this intended behavior or an incomplete implementation of the spec? Thanks!
Huh! Yeah, from a use case standpoint, this is a bug, but there's not a way to reach HDFS through the EMR API, only via SSH.
The Hadoop runner would benefit from this capability as well (see #990, I guess, though it needs to be updated), so I think it's something I'll build eventually. It'd also pair well with automatically setting up SSH (see #1257).
@davidmarin thanks for the note!
SSH seems reasonable, but would something like this not work as a final step on a job?
def clear_hdfs(boto_client, cluster_id, name, hdfs_path):
response = boto_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[{
'Name': name,
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['hadoop', 'fs', '-rm', '-R', '-f', hdfs_path]
}
}, ])
return response['StepIds'][0]
Oh yeah, that'd totally work. Thanks!
If we want to condition it on the job being successful (it's useful to have intermediate data from failed jobs), that's just a short shell script; just check for the _SUCCESS
file in the output directory.
@chetmancini how do you get the hdfs_path for EMR cluster?
@quidproquo this isn't so much an issue for me anymore since I don't use pooling now that Amazon has per-minute billing.
I ended up not needing to put this in mrjob and got it to work like so. Good luck!
def _build_hadoop_tmp_cleanup_step(self, path):
return JarStep(
'%s: HDFS cleanup' % self.job_name,
'command-runner.jar',
action_on_failure='CONTINUE',
step_args=['hdfs', 'dfs', '-rm', '-r', '-skipTrash', path]
)
def _cleanup_emr(self, runner):
cluster_id = runner.get_cluster_id()
path = runner._default_step_output_dir().replace('/step-output', '')
emr_conn = runner.make_emr_conn()
if runner.get_opts().get('pool_clusters'):
emr_conn.add_jobflow_steps(
cluster_id,
[self._build_hadoop_tmp_cleanup_step(path)])
def run(self):
mr_job = self.job(args=self.conf)
with mr_job.make_runner() as runner:
runner.run()
if self.runner == 'emr':
self._cleanup_emr(runner)