mrjob icon indicating copy to clipboard operation
mrjob copied to clipboard

--cleanup TMP does not clean HDFS with EMR Runner

Open chetmancini opened this issue 7 years ago • 5 comments

I'm noticing some issues of my jobs frequently running out of disk on EMR. I'm using an outdated AMI (2.4.11) but mrjob-5.10. I'm leveraging the great pooling features in mrjob to save costs on parallel, multi-job workloads that use similar clusters. It seems however that HDFS is not getting cleaned up after each job even though I have my --cleanup param set to TMP.

The jobs running: image

And HDFS resource utilization: image

In many cases the utilization will hit 100% and the job will fail (and cluster will terminate since I use that emr_action_on_failure=TERMINATE_CLUSTER)

I dug into the code and I think I see the problem. I'm happy to take a stab at contributing a fix but wanted input beforehand.

It looks like in 5.10 EMRJobRunner subclasses MRJobRunner which calls _cleanup_hadoop_tmp for TMP, but that is only defined as a pass. The useful bits of that method are in HadoopJobRunner which doesn't appear to be referenced in the class hierarchy. I notice in 6.0 HadoopInTheCloudRunner is introduced as an extra abstraction but that doesn't seem to implement temp file cleanup either. It seems like it may be appropriate to copy some of the logic currently in HadoopJobRunner in there?

Is this intended behavior or an incomplete implementation of the spec? Thanks!

chetmancini avatar Jun 30 '17 23:06 chetmancini

Huh! Yeah, from a use case standpoint, this is a bug, but there's not a way to reach HDFS through the EMR API, only via SSH.

The Hadoop runner would benefit from this capability as well (see #990, I guess, though it needs to be updated), so I think it's something I'll build eventually. It'd also pair well with automatically setting up SSH (see #1257).

coyotemarin avatar Jul 07 '17 21:07 coyotemarin

@davidmarin thanks for the note!

SSH seems reasonable, but would something like this not work as a final step on a job?

def clear_hdfs(boto_client, cluster_id, name, hdfs_path):
    response = boto_client.add_job_flow_steps(
    JobFlowId=cluster_id,
    Steps=[{
        'Name': name,
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['hadoop', 'fs', '-rm', '-R', '-f', hdfs_path]
        }
    }, ])
    return response['StepIds'][0]

chetmancini avatar Jul 07 '17 21:07 chetmancini

Oh yeah, that'd totally work. Thanks!

If we want to condition it on the job being successful (it's useful to have intermediate data from failed jobs), that's just a short shell script; just check for the _SUCCESS file in the output directory.

coyotemarin avatar Jul 07 '17 22:07 coyotemarin

@chetmancini how do you get the hdfs_path for EMR cluster?

quidproquo avatar Apr 24 '18 17:04 quidproquo

@quidproquo this isn't so much an issue for me anymore since I don't use pooling now that Amazon has per-minute billing.

I ended up not needing to put this in mrjob and got it to work like so. Good luck!

    def _build_hadoop_tmp_cleanup_step(self, path):
        return JarStep(
            '%s: HDFS cleanup' % self.job_name,
            'command-runner.jar',
            action_on_failure='CONTINUE',
            step_args=['hdfs', 'dfs', '-rm', '-r', '-skipTrash', path]
        )

    def _cleanup_emr(self, runner):
        cluster_id = runner.get_cluster_id()
        path = runner._default_step_output_dir().replace('/step-output', '')
        emr_conn = runner.make_emr_conn()
        if runner.get_opts().get('pool_clusters'):
            emr_conn.add_jobflow_steps(
                cluster_id,
                [self._build_hadoop_tmp_cleanup_step(path)])

    def run(self):
        mr_job = self.job(args=self.conf)
        with mr_job.make_runner() as runner:
            runner.run()
            if self.runner == 'emr':
                self._cleanup_emr(runner)

chetmancini avatar Apr 24 '18 17:04 chetmancini