luigi
luigi copied to clipboard
Permission denied
I've been trying to explore python+luigi to replace long running ETL processes done on a Windows platform and running into a peculiar issue.
- Due to limitations out of my control - I need to download a large file from an SFTP server that does not support PPK authentication and will only allow password based. Additionally, the server is forcing ssh-dss to be required. The existing luigi.contrib packages for ftp and/or ssh are not very accommodating of the ssh-dss hostkey option being properly passed - so I've ruled this out for now (seeking the path of least resistance).
- I've written a shell script that when tested from the command line - does everything I need it to do successfully -- download my file (sshpass -e sftp -v "user@host:/sourcepath/file.zip" "/targetpath/file.zip"), unzip it, run a database restore, etc.
- I have python module that has a "luigi.contrib.external_program.ExternalProgramTask" class with everything seemingly correct (it runs -- to a point) -- which calls the shell script. I have the program_args correct, program_environment is set to use "os.environ.copy()".
The issue I am having is that it seems that since luigi.contrib.external_program uses subprocess.Popen - some sort of user context is not being passed. The shell script targeted does not need to run as sudo - it executes fine when run from the command line (i.e. chmod +x ./myshellscript.sh) - however from python - it looks like sshpass run from python, the sshpass -e does not seem to work - this is the step it fails. "sshpass -e" pulls the required password from $SSHPASS environment variable and I suspect that somehow when run from python it has the wrong context?
My understanding so far is that this is what "program_environment" override is for.
What am I doing wrong in code below? In my debugging efforts - it looks like "def program_environment(self) never gets instantiated.
class RestoreDB(luigi.contrib.external_program.ExternalProgramTask):
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
database = luigi.Parameter(default="-")
param_a = luigi.Parameter(default="localhost") # sftp_host
param_b = luigi.Parameter(default="/") # remote_file_path
param_c = luigi.Parameter(default=data_path) # local_file_target
param_d = luigi.Parameter(default=data_path) # unzip_to_file_target
param_e = luigi.Parameter() # restoredb_script_path
def program_args(self):
_cmd = [
"./restore_db.sh",
"-a %s" % self.param_a,
"-b %s" % self.param_b,
"-c %s" % self.param_c,
"-d %s" % self.param_d,
"-e %s" % self.param_e,
]
return _cmd
def program_environent(self):
env = os.environ.copy()
# print(env["SSHPASS"])
# breakpoint()
return env
def output(self):
_log_file = os.path.join(self.log_path,
'restoredb-%s.log' % self.database)
_target = luigi.LocalTarget(_log_file)
# breakpoint()
return _target