astronomer-cosmos
astronomer-cosmos copied to clipboard
[feature] support for dbt clone or dbt build ? DbtCloneOperator
Hi there, our cicd runs "write-audit-publish", basically we do
- a
dbt build
command to run and test models in staging - a
dbt clone
command to clone models created in step 1 to a production environment. Can we implement aDbtCloneOperator
? If so what's the recommended place? Any suggestions on how to implement?
Also given the flexibility of cosmos, say if we want to change our process to
- for all models, following the order of the dag,
dbt run
the model in staging -
dbt test
the model in staging - if test passes,
dbt clone
the model from staging to production
ideally, in the dag, each model should have their own task group with 4 tasks, run, test, snapshot, clone. It's impossible in the dbt cli because clone
is a different command (but we get run, test, snapshot in dbt build
). what's the recommended path for implementing something like this ^, I know it's difficult because given our current implementation, the environments are configured using different targets with different dbt profiles.
Hey @SPTKL! Great to see you back with another interesting challenge! Hope you've been doing well.
To implement a DbtCloneOperator
in the Cosmos repository to support a dbt clone
command, you can follow the pattern established by the existing operators in the cosmos/operators/kubernetes.py
file. Given that dbt clone
is not a standard dbt command, this implementation assumes that you have a custom dbt command or script that performs the cloning operation, which can be executed via the dbt CLI.
Here's a step-by-step guide to creating the DbtCloneOperator
:
-
Define the
DbtCloneMixin
: This mixin will contain the specific logic for building thedbt clone
command. Sincedbt clone
is not a standard command, you'll need to adapt this part based on your actual command or script. -
Implement the
DbtCloneKubernetesOperator
: This operator will use theDbtCloneMixin
andDbtKubernetesBaseOperator
to execute the clone command in a Kubernetes Pod.
Here's an example implementation based on the provided context:
class DbtCloneMixin:
"""
Mixin for dbt clone command.
"""
template_fields: Sequence[str] = ("clone_flags",)
def build_clone_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> tuple[list[str], dict]:
"""
Builds the dbt clone command.
"""
cmd = [self.dbt_executable_path, "clone"] # Assuming 'dbt clone' is the command.
env_vars = self.build_env_vars(context)
if cmd_flags:
cmd.extend(cmd_flags)
return cmd, env_vars
class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt clone command in a Kubernetes Pod.
"""
template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtCloneMixin.template_fields # type: ignore[operator]
def execute(self, context: Context) -> None:
cmd, env_vars = self.build_clone_cmd(context=context, cmd_flags=self.cmd_flags)
self.build_and_run_cmd(context=context, cmd_flags=cmd)
To modify your process to sequentially run, test, and clone each model, you can define an Airflow DAG that uses the DbtRunKubernetesOperator
, DbtTestKubernetesOperator
, and the newly created DbtCloneKubernetesOperator
. Ensure you configure each operator with the appropriate profile_config
to target the correct dbt profile for staging and production environments.
Here's a simplified example of how you might set up the DAG:
from airflow import DAG
from airflow.utils.dates import days_ago
from cosmos.operators.kubernetes import DbtRunKubernetesOperator, DbtTestKubernetesOperator, DbtCloneKubernetesOperator
with DAG("dbt_workflow", start_date=days_ago(1), schedule_interval=None) as dag:
run_model = DbtRunKubernetesOperator(
task_id="run_model",
profile_config=ProfileConfig(profile_name="staging"),
# Additional parameters as needed
)
test_model = DbtTestKubernetesOperator(
task_id="test_model",
profile_config=ProfileConfig(profile_name="staging"),
# Additional parameters as needed
)
clone_model = DbtCloneKubernetesOperator(
task_id="clone_model",
profile_config=ProfileConfig(profile_name="production"),
# Additional parameters as needed
)
run_model >> test_model >> clone_model
This example assumes that you have a ProfileConfig
object or similar configuration management in place to handle dbt profiles and targets. Adjust the implementation details based on your actual command for cloning and your environment setup.
For more details, you can refer to the existing operators in the cosmos/operators/kubernetes.py
file here.
Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
HI @SPTKL, This is a very useful feature. I'm still learning about the dbt clone
command.
Your feature request has similarities with #903 .
One of the challenges is that we may need to introduce Cosmos intermediate storage so that independent tasks, running potentially in different worker nodes/containers / VMs, could share the build artifacts needed by dbt clone
. This is being discussed in a separate ticket: #894
Would you be interested in contributing to Cosmos in this area?
Yes, I'm interested!