airflow-dbt
airflow-dbt copied to clipboard
Add missing dbt commands
According to the DBT CLI reference there are still some missing commands.
This solves:
- close #59 cause it implements
DbtSourceOperator
- close #51 cause it implements
DbtBuildOperator
I think the DbtRunOperationOperator
requires a bit more work. It is, as far as I know, the only dbt command that takes a positional argument (the macro you want to run), and uses --args
(why they don't use --vars
like other commands I don't know) for passing extra flags to the operation (it's the only command I could find with that argument).
I got it working by doing something like this:
class DbtBaseOperator:
@apply_defaults
def __init__(self,
macro=None,
args=None,
*positional_args,
**kwargs):
super(DbtBaseOperator, self).__init__(*positional_args, **kwargs)
if args is not None and not isinstance(args, dict):
raise AirflowException('The argument "args" should be a dictionary')
self.macro = macro
self.args = args
def create_hook(self):
self.hook = DbtHook(
macro=self.macro,
args=self.args)
return self.hook
class DbtRunOperationOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtRunOperationOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)
def execute(self, context):
self.create_hook().execute('run-operation', self.macro)
And then of course adding the --args
keyword in the hook as well. I don't run dbt locally, so I don't have an example patch for that.