NVTabular icon indicating copy to clipboard operation
NVTabular copied to clipboard

Fix `GroupBy` column names when grouping and outputting same column

Open karlhigley opened this issue 3 years ago • 6 comments

In this case, the piece of code that should be connecting the column name with the aggregation name incorrectly tries to join every character of the column name with the name separator, resulting in a column called (for example) c_u_s_t_o_m_e_r___i_d. We can avoid that by explicitly checking if we have a string (which is iterable like a list) instead of an actual list.

karlhigley avatar Jul 27 '22 16:07 karlhigley

Click to view CI Results
GitHub pull request #1636 of commit e76be221837f1e6a1040a1276a2397cf12cac00e, no merge conflicts.
Running as SYSTEM
Setting status of e76be221837f1e6a1040a1276a2397cf12cac00e to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4607/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
 > git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
 > git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
 > git rev-parse e76be221837f1e6a1040a1276a2397cf12cac00e^{commit} # timeout=10
Checking out Revision e76be221837f1e6a1040a1276a2397cf12cac00e (detached)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f e76be221837f1e6a1040a1276a2397cf12cac00e # timeout=10
Commit message: "Fix `GroupBy` column names when grouping and outputting same column"
 > git rev-list --no-walk 9bfff5406c45b1f72d2ec0fba9841f3c383e0851 # timeout=10
First time build. Skipping changelog.
[nvtabular_tests] $ /bin/bash /tmp/jenkins17662894494447285510.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items

tests/unit/test_dask_nvt.py ............................................ [ 3%] ........................................................................ [ 8%] .... [ 8%] tests/unit/test_notebooks.py ...... [ 8%] tests/unit/test_s3.py .. [ 8%] tests/unit/test_tf4rec.py . [ 9%] tests/unit/test_tools.py ...................... [ 10%] tests/unit/test_triton_inference.py ................................ [ 12%] tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%] tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%] ................................................... [ 18%] tests/unit/framework_utils/test_torch_layers.py . [ 18%] tests/unit/loader/test_dataloader_backend.py ...... [ 18%] tests/unit/loader/test_tf_dataloader.py ................................ [ 21%] ........................................s.. [ 24%] tests/unit/loader/test_torch_dataloader.py ............................. [ 26%] ...................................................... [ 29%] tests/unit/ops/test_categorify.py ...................................... [ 32%] ........................................................................ [ 37%] ........................................... [ 40%] tests/unit/ops/test_column_similarity.py ........................ [ 42%] tests/unit/ops/test_drop_low_cardinality.py .. [ 42%] tests/unit/ops/test_fill.py ............................................ [ 45%] ........ [ 45%] tests/unit/ops/test_groupyby.py ...................... [ 47%] tests/unit/ops/test_hash_bucket.py ......................... [ 49%] tests/unit/ops/test_join.py ............................................ [ 52%] ........................................................................ [ 57%] .................................. [ 59%] tests/unit/ops/test_lambda.py .......... [ 60%] tests/unit/ops/test_normalize.py ....................................... [ 63%] .. [ 63%] tests/unit/ops/test_ops.py ............................................. [ 66%] .................... [ 67%] tests/unit/ops/test_ops_schema.py ...................................... [ 70%] ........................................................................ [ 75%] ........................................................................ [ 80%] ........................................................................ [ 85%] ....................................... [ 88%] tests/unit/ops/test_reduce_dtype_size.py .. [ 88%] tests/unit/ops/test_target_encode.py ..................... [ 89%] tests/unit/workflow/test_cpu_workflow.py ...... [ 90%] tests/unit/workflow/test_workflow.py ................................... [ 92%] .......................................................... [ 96%] tests/unit/workflow/test_workflow_chaining.py ... [ 96%] tests/unit/workflow/test_workflow_node.py ........... [ 97%] tests/unit/workflow/test_workflow_ops.py ... [ 97%] tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%] ... [100%]

=============================== warnings summary =============================== ../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33 /usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. DASK_VERSION = LooseVersion(dask.version)

../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings /var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. other = LooseVersion(other)

nvtabular/loader/init.py:19 /var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader. warnings.warn(

tests/unit/test_dask_nvt.py: 1 warning tests/unit/test_s3.py: 2 warnings tests/unit/test_tf4rec.py: 1 warning tests/unit/test_tools.py: 5 warnings tests/unit/test_triton_inference.py: 8 warnings tests/unit/loader/test_dataloader_backend.py: 6 warnings tests/unit/loader/test_tf_dataloader.py: 66 warnings tests/unit/loader/test_torch_dataloader.py: 67 warnings tests/unit/ops/test_categorify.py: 69 warnings tests/unit/ops/test_drop_low_cardinality.py: 2 warnings tests/unit/ops/test_fill.py: 8 warnings tests/unit/ops/test_hash_bucket.py: 4 warnings tests/unit/ops/test_join.py: 88 warnings tests/unit/ops/test_lambda.py: 1 warning tests/unit/ops/test_normalize.py: 9 warnings tests/unit/ops/test_ops.py: 11 warnings tests/unit/ops/test_ops_schema.py: 17 warnings tests/unit/workflow/test_workflow.py: 27 warnings tests/unit/workflow/test_workflow_chaining.py: 1 warning tests/unit/workflow/test_workflow_node.py: 1 warning tests/unit/workflow/test_workflow_schemas.py: 1 warning /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn(

tests/unit/test_dask_nvt.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files. warnings.warn(

tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers /usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters. warnings.warn(

tests/unit/test_notebooks.py::test_optimize_criteo /usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 37465 instead warnings.warn(

tests/unit/test_notebooks.py: 1 warning tests/unit/test_tools.py: 17 warnings tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 54 warnings /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future warnings.warn(

tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 12 warnings tests/unit/workflow/test_workflow.py: 9 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files. warnings.warn(

tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet] tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet] tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True] /usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy self._setitem_single_block(indexer, value, name)

tests/unit/workflow/test_cpu_workflow.py: 6 warnings tests/unit/workflow/test_workflow.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files. warnings.warn(

tests/unit/workflow/test_workflow.py: 48 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files. warnings.warn(

tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_parquet_output[True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None] /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files. warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html ========== 1432 passed, 1 skipped, 620 warnings in 702.02s (0:11:42) =========== Performing Post build task... Match found for : : True Logical operation result is TRUE Running script : #!/bin/bash cd /var/jenkins_home/ CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log" [nvtabular_tests] $ /bin/bash /tmp/jenkins10118772471946213032.sh

nvidia-merlin-bot avatar Jul 27 '22 16:07 nvidia-merlin-bot

Click to view CI Results
GitHub pull request #1636 of commit f905762705166551ca6f66b7e076ee7fcfbbd443, no merge conflicts.
Running as SYSTEM
Setting status of f905762705166551ca6f66b7e076ee7fcfbbd443 to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4608/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
 > git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
 > git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
 > git rev-parse f905762705166551ca6f66b7e076ee7fcfbbd443^{commit} # timeout=10
Checking out Revision f905762705166551ca6f66b7e076ee7fcfbbd443 (detached)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f f905762705166551ca6f66b7e076ee7fcfbbd443 # timeout=10
Commit message: "Improve test compatibility with CPU by using `make_df`"
 > git rev-list --no-walk e76be221837f1e6a1040a1276a2397cf12cac00e # timeout=10
[nvtabular_tests] $ /bin/bash /tmp/jenkins8349620494153300950.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items

tests/unit/test_dask_nvt.py ............................................ [ 3%] ........................................................................ [ 8%] .... [ 8%] tests/unit/test_notebooks.py ...... [ 8%] tests/unit/test_s3.py .. [ 8%] tests/unit/test_tf4rec.py . [ 9%] tests/unit/test_tools.py ...................... [ 10%] tests/unit/test_triton_inference.py ................................ [ 12%] tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%] tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%] ................................................... [ 18%] tests/unit/framework_utils/test_torch_layers.py . [ 18%] tests/unit/loader/test_dataloader_backend.py ...... [ 18%] tests/unit/loader/test_tf_dataloader.py ................................ [ 21%] ........................................s.. [ 24%] tests/unit/loader/test_torch_dataloader.py ............................. [ 26%] ...................................................... [ 29%] tests/unit/ops/test_categorify.py ...................................... [ 32%] ........................................................................ [ 37%] ........................................... [ 40%] tests/unit/ops/test_column_similarity.py ........................ [ 42%] tests/unit/ops/test_drop_low_cardinality.py .. [ 42%] tests/unit/ops/test_fill.py ............................................ [ 45%] ........ [ 45%] tests/unit/ops/test_groupyby.py .....................F [ 47%] tests/unit/ops/test_hash_bucket.py ......................... [ 49%] tests/unit/ops/test_join.py ............................................ [ 52%] ........................................................................ [ 57%] .................................. [ 59%] tests/unit/ops/test_lambda.py .......... [ 60%] tests/unit/ops/test_normalize.py ....................................... [ 63%] .. [ 63%] tests/unit/ops/test_ops.py ............................................. [ 66%] .................... [ 67%] tests/unit/ops/test_ops_schema.py ...................................... [ 70%] ........................................................................ [ 75%] ........................................................................ [ 80%] ........................................................................ [ 85%] ....................................... [ 88%] tests/unit/ops/test_reduce_dtype_size.py .. [ 88%] tests/unit/ops/test_target_encode.py ..................... [ 89%] tests/unit/workflow/test_cpu_workflow.py ...... [ 90%] tests/unit/workflow/test_workflow.py ................................... [ 92%] .......................................................... [ 96%] tests/unit/workflow/test_workflow_chaining.py ... [ 96%] tests/unit/workflow/test_workflow_node.py ........... [ 97%] tests/unit/workflow/test_workflow_ops.py ... [ 97%] tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%] ... [100%]

=================================== FAILURES =================================== ______ test_groupby_column_names_when_grouping_and_outputting_same_column ______

def test_groupby_column_names_when_grouping_and_outputting_same_column():
  purchases = make_df(
        data={
            "customer_id": np.random.randint(0, 10, 1000),
            "purchase_date": [
                datetime.date(2022, np.random.randint(1, 13), np.random.randint(1, 29))
                for i in range(1000)
            ],
            "quantity": np.random.randint(1, 50, 1000),
        }
    )

E TypeError: make_df() got an unexpected keyword argument 'data'

tests/unit/ops/test_groupyby.py:262: TypeError =============================== warnings summary =============================== ../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33 /usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. DASK_VERSION = LooseVersion(dask.version)

../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings /var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. other = LooseVersion(other)

nvtabular/loader/init.py:19 /var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader. warnings.warn(

tests/unit/test_dask_nvt.py: 1 warning tests/unit/test_s3.py: 2 warnings tests/unit/test_tf4rec.py: 1 warning tests/unit/test_tools.py: 5 warnings tests/unit/test_triton_inference.py: 8 warnings tests/unit/loader/test_dataloader_backend.py: 6 warnings tests/unit/loader/test_tf_dataloader.py: 66 warnings tests/unit/loader/test_torch_dataloader.py: 67 warnings tests/unit/ops/test_categorify.py: 69 warnings tests/unit/ops/test_drop_low_cardinality.py: 2 warnings tests/unit/ops/test_fill.py: 8 warnings tests/unit/ops/test_hash_bucket.py: 4 warnings tests/unit/ops/test_join.py: 88 warnings tests/unit/ops/test_lambda.py: 1 warning tests/unit/ops/test_normalize.py: 9 warnings tests/unit/ops/test_ops.py: 11 warnings tests/unit/ops/test_ops_schema.py: 17 warnings tests/unit/workflow/test_workflow.py: 27 warnings tests/unit/workflow/test_workflow_chaining.py: 1 warning tests/unit/workflow/test_workflow_node.py: 1 warning tests/unit/workflow/test_workflow_schemas.py: 1 warning /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn(

tests/unit/test_dask_nvt.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files. warnings.warn(

tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers /usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters. warnings.warn(

tests/unit/test_notebooks.py: 1 warning tests/unit/test_tools.py: 17 warnings tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 54 warnings /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future warnings.warn(

tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 12 warnings tests/unit/workflow/test_workflow.py: 9 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files. warnings.warn(

tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet] tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet] tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True] /usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy self._setitem_single_block(indexer, value, name)

tests/unit/workflow/test_cpu_workflow.py: 6 warnings tests/unit/workflow/test_workflow.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files. warnings.warn(

tests/unit/workflow/test_workflow.py: 48 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files. warnings.warn(

tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_parquet_output[True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None] /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files. warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html =========================== short test summary info ============================ FAILED tests/unit/ops/test_groupyby.py::test_groupby_column_names_when_grouping_and_outputting_same_column ===== 1 failed, 1431 passed, 1 skipped, 619 warnings in 689.24s (0:11:29) ====== Build step 'Execute shell' marked build as failure Performing Post build task... Match found for : : True Logical operation result is TRUE Running script : #!/bin/bash cd /var/jenkins_home/ CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log" [nvtabular_tests] $ /bin/bash /tmp/jenkins15104071637361123180.sh

nvidia-merlin-bot avatar Jul 27 '22 17:07 nvidia-merlin-bot

Click to view CI Results
GitHub pull request #1636 of commit b7b332b1d018d7ce9adaa97478e297a449731c0c, no merge conflicts.
Running as SYSTEM
Setting status of b7b332b1d018d7ce9adaa97478e297a449731c0c to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4609/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
 > git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
 > git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
 > git rev-parse b7b332b1d018d7ce9adaa97478e297a449731c0c^{commit} # timeout=10
Checking out Revision b7b332b1d018d7ce9adaa97478e297a449731c0c (detached)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f b7b332b1d018d7ce9adaa97478e297a449731c0c # timeout=10
Commit message: "Improve test compatibility with CPU by using `make_df`"
 > git rev-list --no-walk f905762705166551ca6f66b7e076ee7fcfbbd443 # timeout=10
First time build. Skipping changelog.
[nvtabular_tests] $ /bin/bash /tmp/jenkins438049645549617170.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items

tests/unit/test_dask_nvt.py ............................................ [ 3%] ........................................................................ [ 8%] .... [ 8%] tests/unit/test_notebooks.py ...... [ 8%] tests/unit/test_s3.py .. [ 8%] tests/unit/test_tf4rec.py . [ 9%] tests/unit/test_tools.py ...................... [ 10%] tests/unit/test_triton_inference.py ................................ [ 12%] tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%] tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%] ................................................... [ 18%] tests/unit/framework_utils/test_torch_layers.py . [ 18%] tests/unit/loader/test_dataloader_backend.py ...... [ 18%] tests/unit/loader/test_tf_dataloader.py ................................ [ 21%] ........................................s.. [ 24%] tests/unit/loader/test_torch_dataloader.py ............................. [ 26%] ...................................................... [ 29%] tests/unit/ops/test_categorify.py ...................................... [ 32%] ........................................................................ [ 37%] ........................................... [ 40%] tests/unit/ops/test_column_similarity.py ........................ [ 42%] tests/unit/ops/test_drop_low_cardinality.py .. [ 42%] tests/unit/ops/test_fill.py ............................................ [ 45%] ........ [ 45%] tests/unit/ops/test_groupyby.py ...................... [ 47%] tests/unit/ops/test_hash_bucket.py ......................... [ 49%] tests/unit/ops/test_join.py ............................................ [ 52%] ........................................................................ [ 57%] .................................. [ 59%] tests/unit/ops/test_lambda.py .......... [ 60%] tests/unit/ops/test_normalize.py ....................................... [ 63%] .. [ 63%] tests/unit/ops/test_ops.py ............................................. [ 66%] .................... [ 67%] tests/unit/ops/test_ops_schema.py ...................................... [ 70%] ........................................................................ [ 75%] ........................................................................ [ 80%] ........................................................................ [ 85%] ....................................... [ 88%] tests/unit/ops/test_reduce_dtype_size.py .. [ 88%] tests/unit/ops/test_target_encode.py ..................... [ 89%] tests/unit/workflow/test_cpu_workflow.py ...... [ 90%] tests/unit/workflow/test_workflow.py ................................... [ 92%] .......................................................... [ 96%] tests/unit/workflow/test_workflow_chaining.py ... [ 96%] tests/unit/workflow/test_workflow_node.py ........... [ 97%] tests/unit/workflow/test_workflow_ops.py ... [ 97%] tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%] ... [100%]

=============================== warnings summary =============================== ../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33 /usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. DASK_VERSION = LooseVersion(dask.version)

../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings /var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. other = LooseVersion(other)

nvtabular/loader/init.py:19 /var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader. warnings.warn(

tests/unit/test_dask_nvt.py: 1 warning tests/unit/test_s3.py: 2 warnings tests/unit/test_tf4rec.py: 1 warning tests/unit/test_tools.py: 5 warnings tests/unit/test_triton_inference.py: 8 warnings tests/unit/loader/test_dataloader_backend.py: 6 warnings tests/unit/loader/test_tf_dataloader.py: 66 warnings tests/unit/loader/test_torch_dataloader.py: 67 warnings tests/unit/ops/test_categorify.py: 69 warnings tests/unit/ops/test_drop_low_cardinality.py: 2 warnings tests/unit/ops/test_fill.py: 8 warnings tests/unit/ops/test_hash_bucket.py: 4 warnings tests/unit/ops/test_join.py: 88 warnings tests/unit/ops/test_lambda.py: 1 warning tests/unit/ops/test_normalize.py: 9 warnings tests/unit/ops/test_ops.py: 11 warnings tests/unit/ops/test_ops_schema.py: 17 warnings tests/unit/workflow/test_workflow.py: 27 warnings tests/unit/workflow/test_workflow_chaining.py: 1 warning tests/unit/workflow/test_workflow_node.py: 1 warning tests/unit/workflow/test_workflow_schemas.py: 1 warning /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn(

tests/unit/test_dask_nvt.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files. warnings.warn(

tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers /usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters. warnings.warn(

tests/unit/test_notebooks.py: 1 warning tests/unit/test_tools.py: 17 warnings tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 54 warnings /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future warnings.warn(

tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 12 warnings tests/unit/workflow/test_workflow.py: 9 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files. warnings.warn(

tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet] tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet] tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True] /usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy self._setitem_single_block(indexer, value, name)

tests/unit/workflow/test_cpu_workflow.py: 6 warnings tests/unit/workflow/test_workflow.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files. warnings.warn(

tests/unit/workflow/test_workflow.py: 48 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files. warnings.warn(

tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_parquet_output[True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None] /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files. warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html ========== 1432 passed, 1 skipped, 619 warnings in 702.65s (0:11:42) =========== Performing Post build task... Match found for : : True Logical operation result is TRUE Running script : #!/bin/bash cd /var/jenkins_home/ CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log" [nvtabular_tests] $ /bin/bash /tmp/jenkins6775373465073174909.sh

nvidia-merlin-bot avatar Jul 27 '22 17:07 nvidia-merlin-bot

Click to view CI Results
GitHub pull request #1636 of commit 302f7c355a27bd485f293a4494785ea89d29949e, no merge conflicts.
Running as SYSTEM
Setting status of 302f7c355a27bd485f293a4494785ea89d29949e to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4611/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
 > git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
 > git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
 > git rev-parse 302f7c355a27bd485f293a4494785ea89d29949e^{commit} # timeout=10
Checking out Revision 302f7c355a27bd485f293a4494785ea89d29949e (detached)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 302f7c355a27bd485f293a4494785ea89d29949e # timeout=10
Commit message: "Merge branch 'main' into fix/groupby-columns"
 > git rev-list --no-walk 468842b0ec006fe55f83ae52c6d5163041ce7bea # timeout=10
[nvtabular_tests] $ /bin/bash /tmp/jenkins1525259498990655070.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items

tests/unit/test_dask_nvt.py ............................F..F..F..F...... [ 3%] ..F...............................................................F.F... [ 8%] .... [ 8%] tests/unit/test_notebooks.py ...... [ 8%] tests/unit/test_s3.py .. [ 8%] tests/unit/test_tf4rec.py . [ 9%] tests/unit/test_tools.py ...................... [ 10%] tests/unit/test_triton_inference.py ................................ [ 12%] tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%] tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%] ................................................... [ 18%] tests/unit/framework_utils/test_torch_layers.py . [ 18%] tests/unit/loader/test_dataloader_backend.py ...... [ 18%] tests/unit/loader/test_tf_dataloader.py ................................ [ 21%] ........................................s.. [ 24%] tests/unit/loader/test_torch_dataloader.py ............................. [ 26%] ...................................................... [ 29%] tests/unit/ops/test_categorify.py ...................................... [ 32%] ........................................................................ [ 37%] ........................................... [ 40%] tests/unit/ops/test_column_similarity.py ........................ [ 42%] tests/unit/ops/test_drop_low_cardinality.py .. [ 42%] tests/unit/ops/test_fill.py ............................................ [ 45%] ........ [ 45%] tests/unit/ops/test_groupyby.py ...................... [ 47%] tests/unit/ops/test_hash_bucket.py ......................... [ 49%] tests/unit/ops/test_join.py ............................................ [ 52%] ........................................................................ [ 57%] .................................. [ 59%] tests/unit/ops/test_lambda.py .......... [ 60%] tests/unit/ops/test_normalize.py ....................................... [ 63%] .. [ 63%] tests/unit/ops/test_ops.py ............................................. [ 66%] .................... [ 67%] tests/unit/ops/test_ops_schema.py ...................................... [ 70%] ........................................................................ [ 75%] ........................................................................ [ 80%] ........................................................................ [ 85%] ....................................... [ 88%] tests/unit/ops/test_reduce_dtype_size.py .. [ 88%] tests/unit/ops/test_target_encode.py ..................... [ 89%] tests/unit/workflow/test_cpu_workflow.py FFFFFF [ 90%] tests/unit/workflow/test_workflow.py ................................... [ 92%] .......................................................... [ 96%] tests/unit/workflow/test_workflow_chaining.py ... [ 96%] tests/unit/workflow/test_workflow_node.py ........... [ 97%] tests/unit/workflow/test_workflow_ops.py ... [ 97%] tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%] ... [100%]

=================================== FAILURES =================================== ________ test_dask_workflow_api_dlrm[True-None-True-device-150-csv-0.1] ________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr28') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} freq_threshold = 150, part_mem_fraction = 0.1, engine = 'csv' cat_cache = 'device', on_host = True, shuffle = None, cpu = True

@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
    client,
    tmpdir,
    datasets,
    freq_threshold,
    part_mem_fraction,
    engine,
    cat_cache,
    on_host,
    shuffle,
    cpu,
):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    paths = sorted(paths)
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)
    df0 = df0.to_pandas() if cpu else df0

    if engine == "parquet":
        cat_names = ["name-cat", "name-string"]
    else:
        cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    cats = cat_names >> ops.Categorify(
        freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
    )

    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()

    workflow = Workflow(cats + conts + label_name)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
    else:
        dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)

    output_path = os.path.join(tmpdir, "processed")

    transformed = workflow.fit_transform(dataset)
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)

    result = transformed.to_ddf().compute()
    assert len(df0) == len(result)
    assert result["x"].min() == 0.0
    assert result["x"].isna().sum() == 0
    assert result["y"].min() == 0.0
    assert result["y"].isna().sum() == 0

    # Check categories.  Need to sort first to make sure we are comparing
    # "apples to apples"
    expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
    dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
        {"name-string_x": "count", "name-string_y": "count"}
    )
    if freq_threshold:
        dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
    assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)

    # Read back from disk
    if cpu:
      df_disk = dd_read_parquet(output_path).compute()

tests/unit/test_dask_nvt.py:130:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- 2022-08-01 15:37:13,228 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-b8bb1408c849bf08e86780790a9b9064', 1) Function: subgraph_callable-8e0b4c17-558a-4191-8b2a-b0b8e850 args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr28/processed/part_1.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

__________ test_dask_workflow_api_dlrm[True-None-True-None-0-csv-0.1] __________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr31') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} freq_threshold = 0, part_mem_fraction = 0.1, engine = 'csv', cat_cache = None on_host = True, shuffle = None, cpu = True

@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
    client,
    tmpdir,
    datasets,
    freq_threshold,
    part_mem_fraction,
    engine,
    cat_cache,
    on_host,
    shuffle,
    cpu,
):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    paths = sorted(paths)
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)
    df0 = df0.to_pandas() if cpu else df0

    if engine == "parquet":
        cat_names = ["name-cat", "name-string"]
    else:
        cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    cats = cat_names >> ops.Categorify(
        freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
    )

    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()

    workflow = Workflow(cats + conts + label_name)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
    else:
        dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)

    output_path = os.path.join(tmpdir, "processed")

    transformed = workflow.fit_transform(dataset)
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)

    result = transformed.to_ddf().compute()
    assert len(df0) == len(result)
    assert result["x"].min() == 0.0
    assert result["x"].isna().sum() == 0
    assert result["y"].min() == 0.0
    assert result["y"].isna().sum() == 0

    # Check categories.  Need to sort first to make sure we are comparing
    # "apples to apples"
    expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
    dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
        {"name-string_x": "count", "name-string_y": "count"}
    )
    if freq_threshold:
        dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
    assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)

    # Read back from disk
    if cpu:
      df_disk = dd_read_parquet(output_path).compute()

tests/unit/test_dask_nvt.py:130:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- 2022-08-01 15:37:15,289 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-790d36cd0968420525c1f1861e517463', 1) Function: subgraph_callable-803ab848-8b23-4854-b130-905f4695 args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr31/processed/part_1.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

_________ test_dask_workflow_api_dlrm[True-None-True-None-150-csv-0.1] _________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr34') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} freq_threshold = 150, part_mem_fraction = 0.1, engine = 'csv', cat_cache = None on_host = True, shuffle = None, cpu = True

@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
    client,
    tmpdir,
    datasets,
    freq_threshold,
    part_mem_fraction,
    engine,
    cat_cache,
    on_host,
    shuffle,
    cpu,
):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    paths = sorted(paths)
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)
    df0 = df0.to_pandas() if cpu else df0

    if engine == "parquet":
        cat_names = ["name-cat", "name-string"]
    else:
        cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    cats = cat_names >> ops.Categorify(
        freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
    )

    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()

    workflow = Workflow(cats + conts + label_name)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
    else:
        dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)

    output_path = os.path.join(tmpdir, "processed")

    transformed = workflow.fit_transform(dataset)
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)

    result = transformed.to_ddf().compute()
    assert len(df0) == len(result)
    assert result["x"].min() == 0.0
    assert result["x"].isna().sum() == 0
    assert result["y"].min() == 0.0
    assert result["y"].isna().sum() == 0

    # Check categories.  Need to sort first to make sure we are comparing
    # "apples to apples"
    expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
    dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
        {"name-string_x": "count", "name-string_y": "count"}
    )
    if freq_threshold:
        dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
    assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)

    # Read back from disk
    if cpu:
      df_disk = dd_read_parquet(output_path).compute()

tests/unit/test_dask_nvt.py:130:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- 2022-08-01 15:37:17,095 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-148dffefce082a24facb8b758f5e0617', 1) Function: subgraph_callable-b0299027-51f2-43df-b7f8-7f329c6e args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr34/processed/part_1.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

________ test_dask_workflow_api_dlrm[True-None-False-device-0-csv-0.1] _________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr37') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} freq_threshold = 0, part_mem_fraction = 0.1, engine = 'csv' cat_cache = 'device', on_host = False, shuffle = None, cpu = True

@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
    client,
    tmpdir,
    datasets,
    freq_threshold,
    part_mem_fraction,
    engine,
    cat_cache,
    on_host,
    shuffle,
    cpu,
):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    paths = sorted(paths)
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)
    df0 = df0.to_pandas() if cpu else df0

    if engine == "parquet":
        cat_names = ["name-cat", "name-string"]
    else:
        cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    cats = cat_names >> ops.Categorify(
        freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
    )

    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()

    workflow = Workflow(cats + conts + label_name)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
    else:
        dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)

    output_path = os.path.join(tmpdir, "processed")

    transformed = workflow.fit_transform(dataset)
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)

    result = transformed.to_ddf().compute()
    assert len(df0) == len(result)
    assert result["x"].min() == 0.0
    assert result["x"].isna().sum() == 0
    assert result["y"].min() == 0.0
    assert result["y"].isna().sum() == 0

    # Check categories.  Need to sort first to make sure we are comparing
    # "apples to apples"
    expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
    dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
        {"name-string_x": "count", "name-string_y": "count"}
    )
    if freq_threshold:
        dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
    assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)

    # Read back from disk
    if cpu:
      df_disk = dd_read_parquet(output_path).compute()

tests/unit/test_dask_nvt.py:130:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- 2022-08-01 15:37:19,274 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-b0fea75568422243c464d3466ab935eb', 1) Function: subgraph_callable-9c051324-4bed-4f4a-99bf-bb976407 args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr37/processed/part_1.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

________ test_dask_workflow_api_dlrm[True-None-False-None-150-csv-0.1] _________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr46') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} freq_threshold = 150, part_mem_fraction = 0.1, engine = 'csv', cat_cache = None on_host = False, shuffle = None, cpu = True

@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
    client,
    tmpdir,
    datasets,
    freq_threshold,
    part_mem_fraction,
    engine,
    cat_cache,
    on_host,
    shuffle,
    cpu,
):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    paths = sorted(paths)
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)
    df0 = df0.to_pandas() if cpu else df0

    if engine == "parquet":
        cat_names = ["name-cat", "name-string"]
    else:
        cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    cats = cat_names >> ops.Categorify(
        freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
    )

    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()

    workflow = Workflow(cats + conts + label_name)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
    else:
        dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)

    output_path = os.path.join(tmpdir, "processed")

    transformed = workflow.fit_transform(dataset)
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)

    result = transformed.to_ddf().compute()
    assert len(df0) == len(result)
    assert result["x"].min() == 0.0
    assert result["x"].isna().sum() == 0
    assert result["y"].min() == 0.0
    assert result["y"].isna().sum() == 0

    # Check categories.  Need to sort first to make sure we are comparing
    # "apples to apples"
    expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
    dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
    dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
        {"name-string_x": "count", "name-string_y": "count"}
    )
    if freq_threshold:
        dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
    assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)

    # Read back from disk
    if cpu:
      df_disk = dd_read_parquet(output_path).compute()

tests/unit/test_dask_nvt.py:130:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- 2022-08-01 15:37:24,491 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-6d382dc8677530e6ffd60266e7f6c375', 1) Function: subgraph_callable-2b11a943-a1ba-421b-903a-7916f0b0 args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr46/processed/part_1.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

___________________ test_dask_preproc_cpu[True-None-parquet] ___________________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} engine = 'parquet', shuffle = None, cpu = True

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_preproc_cpu(client, tmpdir, datasets, engine, shuffle, cpu):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, part_size="1MB", cpu=cpu)
    else:
        dataset = Dataset(paths, names=allcols_csv, part_size="1MB", cpu=cpu)

    # Simple transform (normalize)
    cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]
    conts = cont_names >> ops.FillMissing() >> ops.Normalize()
    workflow = Workflow(conts + cat_names + label_name)
    transformed = workflow.fit_transform(dataset)

    # Write out dataset
    output_path = os.path.join(tmpdir, "processed")
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=4)

    # Check the final result
  df_disk = dd_read_parquet(output_path, engine="pyarrow").compute()

tests/unit/test_dask_nvt.py:277:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn( 2022-08-01 15:38:05,422 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 15) Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_3.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn( /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn( 2022-08-01 15:38:05,423 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 0) Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_0.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:05,423 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 13) Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_3.parquet', [1], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:05,424 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 10) Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_2.parquet', [2], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:05,424 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 12) Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_3.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

________________ test_dask_preproc_cpu[True-None-csv-no-header] ________________

client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB> tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2') datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')} engine = 'csv-no-header', shuffle = None, cpu = True

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_preproc_cpu(client, tmpdir, datasets, engine, shuffle, cpu):
    set_dask_client(client=client)
    paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
    if engine == "parquet":
        df1 = cudf.read_parquet(paths[0])[mycols_pq]
        df2 = cudf.read_parquet(paths[1])[mycols_pq]
    elif engine == "csv":
        df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
        df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
    else:
        df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
        df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
    df0 = cudf.concat([df1, df2], axis=0)

    if engine in ("parquet", "csv"):
        dataset = Dataset(paths, part_size="1MB", cpu=cpu)
    else:
        dataset = Dataset(paths, names=allcols_csv, part_size="1MB", cpu=cpu)

    # Simple transform (normalize)
    cat_names = ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]
    conts = cont_names >> ops.FillMissing() >> ops.Normalize()
    workflow = Workflow(conts + cat_names + label_name)
    transformed = workflow.fit_transform(dataset)

    # Write out dataset
    output_path = os.path.join(tmpdir, "processed")
    transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=4)

    # Check the final result
  df_disk = dd_read_parquet(output_path, engine="pyarrow").compute()

tests/unit/test_dask_nvt.py:277:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute (result,) = compute(self, traverse=False, **kwargs) /usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute results = schedule(dsk, keys, **kwargs) /usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) /usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather return self.sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync return sync( /usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync raise exc.with_traceback(tb) /usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f result = yield future /usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run value = future.result() /usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather raise exception.with_traceback(traceback) /usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) /usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get result = _execute_task(task, cache) /usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call return read_parquet_part( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part dfs = [ /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition arrow_table = cls._read_table( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table arrow_table = _read_table_from_path( /usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path return pq.ParquetFile(fil).read_row_groups( /usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init self.reader.open( pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open ???


??? E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

pyarrow/error.pxi:99: ArrowInvalid ----------------------------- Captured stderr call ----------------------------- 2022-08-01 15:38:06,951 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 15) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_3.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,952 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 13) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_3.parquet', [1], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,954 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 1) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_0.parquet', [1], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,954 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 11) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_2.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,960 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 17) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_4.parquet', [1], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,962 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 22) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_5.parquet', [2], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,962 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 20) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_5.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,963 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 19) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_4.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,976 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 9) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_2.parquet', [1], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,978 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 26) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_6.parquet', [2], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

--------------------------- Captured stderr teardown --------------------------- 2022-08-01 15:38:06,985 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 24) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_6.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,991 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 5) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_1.parquet', [1], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,992 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 31) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_7.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,993 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 28) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_7.parquet', [0], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,993 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 3) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_0.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

2022-08-01 15:38:06,994 - distributed.worker - WARNING - Compute Failed Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 7) Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_1.parquet', [3], [])}) kwargs: {} Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"

_____________________ test_cpu_workflow[True-True-parquet] _____________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_pa0') df = name-cat name-string id label x y 0 Edith Victor 972 995 0.185652 -0.79195...b 974 913 -0.483125 -0.863262 4320 Kevin Ray 1000 929 -0.768262 0.630309

[4321 rows x 6 columns] dataset = <merlin.io.dataset.Dataset object at 0x7fda5bdf4b80>, cpu = True engine = 'parquet', dump = True

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
    # Make sure we are in cpu formats
    if cudf and isinstance(df, cudf.DataFrame):
        df = df.to_pandas()

    if cpu:
        dataset.to_cpu()

    cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    norms = ops.Normalize()
    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
    cats = cat_names >> ops.Categorify()
    workflow = nvt.Workflow(conts + cats + label_name)

    workflow.fit(dataset)
    if dump:
        workflow_dir = os.path.join(tmpdir, "workflow")
        workflow.save(workflow_dir)
        workflow = None

        workflow = Workflow.load(workflow_dir)

    def get_norms(tar: pd.Series):
        df = tar.fillna(0)
        df = df * (df >= 0).astype("int")
        return df

    assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
    assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)

    # Check that categories match
    if engine == "parquet":
        cats_expected0 = df["name-cat"].unique()
        cats0 = get_cats(workflow, "name-cat", cpu=True)
        # adding the None entry as a string because of move from gpu
        assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
        assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
    cats_expected1 = df["name-string"].unique()
    cats1 = get_cats(workflow, "name-string", cpu=True)
    # adding the None entry as a string because of move from gpu
    assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
    assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])

    # Write to new "shuffled" and "processed" dataset
    workflow.transform(dataset).to_parquet(
        output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
    )
  dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)

tests/unit/workflow/test_cpu_workflow.py:76:


/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init self.engine = ParquetDatasetEngine( /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init self._path0, /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0 return next(self._dataset.get_fragments()).path /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset dataset = pa_ds.dataset(paths, filesystem=fs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset return _filesystem_dataset(source, **kwargs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset return factory.finish(schema) pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish ??? pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status ???


??? E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_pa0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_pa0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid _______________________ test_cpu_workflow[True-True-csv] _______________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs0') df = name-string id label x y 0 Victor 972 995 0.185652 -0.791950 1 Ray ... Bob 974 913 -0.483125 -0.863262 2160 Ray 1000 929 -0.768262 0.630309

[4321 rows x 5 columns] dataset = <merlin.io.dataset.Dataset object at 0x7fda5be6b310>, cpu = True engine = 'csv', dump = True

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
    # Make sure we are in cpu formats
    if cudf and isinstance(df, cudf.DataFrame):
        df = df.to_pandas()

    if cpu:
        dataset.to_cpu()

    cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    norms = ops.Normalize()
    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
    cats = cat_names >> ops.Categorify()
    workflow = nvt.Workflow(conts + cats + label_name)

    workflow.fit(dataset)
    if dump:
        workflow_dir = os.path.join(tmpdir, "workflow")
        workflow.save(workflow_dir)
        workflow = None

        workflow = Workflow.load(workflow_dir)

    def get_norms(tar: pd.Series):
        df = tar.fillna(0)
        df = df * (df >= 0).astype("int")
        return df

    assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
    assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)

    # Check that categories match
    if engine == "parquet":
        cats_expected0 = df["name-cat"].unique()
        cats0 = get_cats(workflow, "name-cat", cpu=True)
        # adding the None entry as a string because of move from gpu
        assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
        assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
    cats_expected1 = df["name-string"].unique()
    cats1 = get_cats(workflow, "name-string", cpu=True)
    # adding the None entry as a string because of move from gpu
    assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
    assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])

    # Write to new "shuffled" and "processed" dataset
    workflow.transform(dataset).to_parquet(
        output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
    )
  dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)

tests/unit/workflow/test_cpu_workflow.py:76:


/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init self.engine = ParquetDatasetEngine( /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init self._path0, /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0 return next(self._dataset.get_fragments()).path /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset dataset = pa_ds.dataset(paths, filesystem=fs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset return _filesystem_dataset(source, **kwargs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset return factory.finish(schema) pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish ??? pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status ???


??? E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid __________________ test_cpu_workflow[True-True-csv-no-header] __________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs1') df = name-string id label x y 0 Victor 972 995 0.185652 -0.791950 1 Ray ... Bob 974 913 -0.483125 -0.863262 2160 Ray 1000 929 -0.768262 0.630309

[4321 rows x 5 columns] dataset = <merlin.io.dataset.Dataset object at 0x7fda2475af70>, cpu = True engine = 'csv-no-header', dump = True

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
    # Make sure we are in cpu formats
    if cudf and isinstance(df, cudf.DataFrame):
        df = df.to_pandas()

    if cpu:
        dataset.to_cpu()

    cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    norms = ops.Normalize()
    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
    cats = cat_names >> ops.Categorify()
    workflow = nvt.Workflow(conts + cats + label_name)

    workflow.fit(dataset)
    if dump:
        workflow_dir = os.path.join(tmpdir, "workflow")
        workflow.save(workflow_dir)
        workflow = None

        workflow = Workflow.load(workflow_dir)

    def get_norms(tar: pd.Series):
        df = tar.fillna(0)
        df = df * (df >= 0).astype("int")
        return df

    assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
    assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)

    # Check that categories match
    if engine == "parquet":
        cats_expected0 = df["name-cat"].unique()
        cats0 = get_cats(workflow, "name-cat", cpu=True)
        # adding the None entry as a string because of move from gpu
        assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
        assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
    cats_expected1 = df["name-string"].unique()
    cats1 = get_cats(workflow, "name-string", cpu=True)
    # adding the None entry as a string because of move from gpu
    assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
    assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])

    # Write to new "shuffled" and "processed" dataset
    workflow.transform(dataset).to_parquet(
        output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
    )
  dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)

tests/unit/workflow/test_cpu_workflow.py:76:


/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init self.engine = ParquetDatasetEngine( /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init self._path0, /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0 return next(self._dataset.get_fragments()).path /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset dataset = pa_ds.dataset(paths, filesystem=fs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset return _filesystem_dataset(source, **kwargs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset return factory.finish(schema) pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish ??? pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status ???


??? E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs1/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs1/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid ____________________ test_cpu_workflow[True-False-parquet] _____________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_p0') df = name-cat name-string id label x y 0 Edith Victor 972 995 0.185652 -0.79195...b 974 913 -0.483125 -0.863262 4320 Kevin Ray 1000 929 -0.768262 0.630309

[4321 rows x 6 columns] dataset = <merlin.io.dataset.Dataset object at 0x7fda5be59220>, cpu = True engine = 'parquet', dump = False

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
    # Make sure we are in cpu formats
    if cudf and isinstance(df, cudf.DataFrame):
        df = df.to_pandas()

    if cpu:
        dataset.to_cpu()

    cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    norms = ops.Normalize()
    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
    cats = cat_names >> ops.Categorify()
    workflow = nvt.Workflow(conts + cats + label_name)

    workflow.fit(dataset)
    if dump:
        workflow_dir = os.path.join(tmpdir, "workflow")
        workflow.save(workflow_dir)
        workflow = None

        workflow = Workflow.load(workflow_dir)

    def get_norms(tar: pd.Series):
        df = tar.fillna(0)
        df = df * (df >= 0).astype("int")
        return df

    assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
    assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)

    # Check that categories match
    if engine == "parquet":
        cats_expected0 = df["name-cat"].unique()
        cats0 = get_cats(workflow, "name-cat", cpu=True)
        # adding the None entry as a string because of move from gpu
        assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
        assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
    cats_expected1 = df["name-string"].unique()
    cats1 = get_cats(workflow, "name-string", cpu=True)
    # adding the None entry as a string because of move from gpu
    assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
    assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])

    # Write to new "shuffled" and "processed" dataset
    workflow.transform(dataset).to_parquet(
        output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
    )
  dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)

tests/unit/workflow/test_cpu_workflow.py:76:


/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init self.engine = ParquetDatasetEngine( /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init self._path0, /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0 return next(self._dataset.get_fragments()).path /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset dataset = pa_ds.dataset(paths, filesystem=fs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset return _filesystem_dataset(source, **kwargs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset return factory.finish(schema) pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish ??? pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status ???


??? E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_p0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_p0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid ______________________ test_cpu_workflow[True-False-csv] _______________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c0') df = name-string id label x y 0 Victor 972 995 0.185652 -0.791950 1 Ray ... Bob 974 913 -0.483125 -0.863262 2160 Ray 1000 929 -0.768262 0.630309

[4321 rows x 5 columns] dataset = <merlin.io.dataset.Dataset object at 0x7fda48146d60>, cpu = True engine = 'csv', dump = False

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
    # Make sure we are in cpu formats
    if cudf and isinstance(df, cudf.DataFrame):
        df = df.to_pandas()

    if cpu:
        dataset.to_cpu()

    cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    norms = ops.Normalize()
    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
    cats = cat_names >> ops.Categorify()
    workflow = nvt.Workflow(conts + cats + label_name)

    workflow.fit(dataset)
    if dump:
        workflow_dir = os.path.join(tmpdir, "workflow")
        workflow.save(workflow_dir)
        workflow = None

        workflow = Workflow.load(workflow_dir)

    def get_norms(tar: pd.Series):
        df = tar.fillna(0)
        df = df * (df >= 0).astype("int")
        return df

    assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
    assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)

    # Check that categories match
    if engine == "parquet":
        cats_expected0 = df["name-cat"].unique()
        cats0 = get_cats(workflow, "name-cat", cpu=True)
        # adding the None entry as a string because of move from gpu
        assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
        assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
    cats_expected1 = df["name-string"].unique()
    cats1 = get_cats(workflow, "name-string", cpu=True)
    # adding the None entry as a string because of move from gpu
    assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
    assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])

    # Write to new "shuffled" and "processed" dataset
    workflow.transform(dataset).to_parquet(
        output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
    )
  dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)

tests/unit/workflow/test_cpu_workflow.py:76:


/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init self.engine = ParquetDatasetEngine( /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init self._path0, /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0 return next(self._dataset.get_fragments()).path /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset dataset = pa_ds.dataset(paths, filesystem=fs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset return _filesystem_dataset(source, **kwargs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset return factory.finish(schema) pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish ??? pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status ???


??? E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid _________________ test_cpu_workflow[True-False-csv-no-header] __________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c1') df = name-string id label x y 0 Victor 972 995 0.185652 -0.791950 1 Ray ... Bob 974 913 -0.483125 -0.863262 2160 Ray 1000 929 -0.768262 0.630309

[4321 rows x 5 columns] dataset = <merlin.io.dataset.Dataset object at 0x7fda486431f0>, cpu = True engine = 'csv-no-header', dump = False

@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
    # Make sure we are in cpu formats
    if cudf and isinstance(df, cudf.DataFrame):
        df = df.to_pandas()

    if cpu:
        dataset.to_cpu()

    cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
    cont_names = ["x", "y", "id"]
    label_name = ["label"]

    norms = ops.Normalize()
    conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
    cats = cat_names >> ops.Categorify()
    workflow = nvt.Workflow(conts + cats + label_name)

    workflow.fit(dataset)
    if dump:
        workflow_dir = os.path.join(tmpdir, "workflow")
        workflow.save(workflow_dir)
        workflow = None

        workflow = Workflow.load(workflow_dir)

    def get_norms(tar: pd.Series):
        df = tar.fillna(0)
        df = df * (df >= 0).astype("int")
        return df

    assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
    assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
    assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)

    # Check that categories match
    if engine == "parquet":
        cats_expected0 = df["name-cat"].unique()
        cats0 = get_cats(workflow, "name-cat", cpu=True)
        # adding the None entry as a string because of move from gpu
        assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
        assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
    cats_expected1 = df["name-string"].unique()
    cats1 = get_cats(workflow, "name-string", cpu=True)
    # adding the None entry as a string because of move from gpu
    assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
    assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])

    # Write to new "shuffled" and "processed" dataset
    workflow.transform(dataset).to_parquet(
        output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
    )
  dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)

tests/unit/workflow/test_cpu_workflow.py:76:


/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init self.engine = ParquetDatasetEngine( /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init self._path0, /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0 return next(self._dataset.get_fragments()).path /usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset dataset = pa_ds.dataset(paths, filesystem=fs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset return _filesystem_dataset(source, **kwargs) /usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset return factory.finish(schema) pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish ??? pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status ???


??? E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c1/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c1/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid =============================== warnings summary =============================== ../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33 /usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. DASK_VERSION = LooseVersion(dask.version)

../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings /var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. other = LooseVersion(other)

nvtabular/loader/init.py:19 /var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader. warnings.warn(

tests/unit/test_dask_nvt.py: 1 warning tests/unit/test_s3.py: 2 warnings tests/unit/test_tf4rec.py: 1 warning tests/unit/test_tools.py: 5 warnings tests/unit/test_triton_inference.py: 8 warnings tests/unit/loader/test_dataloader_backend.py: 6 warnings tests/unit/loader/test_tf_dataloader.py: 66 warnings tests/unit/loader/test_torch_dataloader.py: 67 warnings tests/unit/ops/test_categorify.py: 69 warnings tests/unit/ops/test_drop_low_cardinality.py: 2 warnings tests/unit/ops/test_fill.py: 8 warnings tests/unit/ops/test_hash_bucket.py: 4 warnings tests/unit/ops/test_join.py: 88 warnings tests/unit/ops/test_lambda.py: 1 warning tests/unit/ops/test_normalize.py: 9 warnings tests/unit/ops/test_ops.py: 11 warnings tests/unit/ops/test_ops_schema.py: 17 warnings tests/unit/workflow/test_workflow.py: 27 warnings tests/unit/workflow/test_workflow_chaining.py: 1 warning tests/unit/workflow/test_workflow_node.py: 1 warning tests/unit/workflow/test_workflow_schemas.py: 1 warning /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn(

tests/unit/test_dask_nvt.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files. warnings.warn(

tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers /usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters. warnings.warn(

tests/unit/test_notebooks.py: 1 warning tests/unit/test_tools.py: 17 warnings tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 54 warnings /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future warnings.warn(

tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 12 warnings tests/unit/workflow/test_workflow.py: 9 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files. warnings.warn(

tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet] tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet] tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True] /usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy self._setitem_single_block(indexer, value, name)

tests/unit/workflow/test_cpu_workflow.py: 6 warnings tests/unit/workflow/test_workflow.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files. warnings.warn(

tests/unit/workflow/test_workflow.py: 48 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files. warnings.warn(

tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_parquet_output[True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None] /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files. warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html =========================== short test summary info ============================ FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-True-device-150-csv-0.1] FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-True-None-0-csv-0.1] FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-True-None-150-csv-0.1] FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-False-device-0-csv-0.1] FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-False-None-150-csv-0.1] FAILED tests/unit/test_dask_nvt.py::test_dask_preproc_cpu[True-None-parquet] FAILED tests/unit/test_dask_nvt.py::test_dask_preproc_cpu[True-None-csv-no-header] FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-True-parquet] FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-True-csv] FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-True-csv-no-header] FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-False-parquet] FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-False-csv] FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-False-csv-no-header] ===== 13 failed, 1419 passed, 1 skipped, 619 warnings in 711.56s (0:11:51) ===== Build step 'Execute shell' marked build as failure Performing Post build task... Match found for : : True Logical operation result is TRUE Running script : #!/bin/bash cd /var/jenkins_home/ CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log" [nvtabular_tests] $ /bin/bash /tmp/jenkins242368713350332231.sh

nvidia-merlin-bot avatar Aug 01 '22 15:08 nvidia-merlin-bot

Click to view CI Results
GitHub pull request #1636 of commit a74290155fced269fd77fc726a919d1645bf8cc6, no merge conflicts.
Running as SYSTEM
Setting status of a74290155fced269fd77fc726a919d1645bf8cc6 to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4634/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
 > git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
 > git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
 > git rev-parse a74290155fced269fd77fc726a919d1645bf8cc6^{commit} # timeout=10
Checking out Revision a74290155fced269fd77fc726a919d1645bf8cc6 (detached)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f a74290155fced269fd77fc726a919d1645bf8cc6 # timeout=10
Commit message: "Merge branch 'main' into fix/groupby-columns"
 > git rev-list --no-walk 35f7c158c6023ef878644de0b65dbdfa3d28b609 # timeout=10
First time build. Skipping changelog.
[nvtabular_tests] $ /bin/bash /tmp/jenkins16867787764162096748.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1431 items / 1 skipped

tests/unit/test_dask_nvt.py ............................................ [ 3%] ........................................................................ [ 8%] .... [ 8%] tests/unit/test_notebooks.py ...... [ 8%] tests/unit/test_tf4rec.py . [ 8%] tests/unit/test_tools.py ...................... [ 10%] tests/unit/test_triton_inference.py ................................ [ 12%] tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%] tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%] ................................................... [ 18%] tests/unit/framework_utils/test_torch_layers.py . [ 18%] tests/unit/loader/test_dataloader_backend.py ...... [ 18%] tests/unit/loader/test_tf_dataloader.py ................................ [ 20%] ........................................s.. [ 23%] tests/unit/loader/test_torch_dataloader.py ............................. [ 25%] ...................................................... [ 29%] tests/unit/ops/test_categorify.py ...................................... [ 32%] ........................................................................ [ 37%] ........................................... [ 40%] tests/unit/ops/test_column_similarity.py ........................ [ 42%] tests/unit/ops/test_drop_low_cardinality.py .. [ 42%] tests/unit/ops/test_fill.py ............................................ [ 45%] ........ [ 45%] tests/unit/ops/test_groupyby.py ...................... [ 47%] tests/unit/ops/test_hash_bucket.py ......................... [ 49%] tests/unit/ops/test_join.py ............................................ [ 52%] ........................................................................ [ 57%] .................................. [ 59%] tests/unit/ops/test_lambda.py .......... [ 60%] tests/unit/ops/test_normalize.py ....................................... [ 63%] .. [ 63%] tests/unit/ops/test_ops.py ............................................. [ 66%] .................... [ 67%] tests/unit/ops/test_ops_schema.py ...................................... [ 70%] ........................................................................ [ 75%] ........................................................................ [ 80%] ........................................................................ [ 85%] ....................................... [ 88%] tests/unit/ops/test_reduce_dtype_size.py .. [ 88%] tests/unit/ops/test_target_encode.py ..................... [ 89%] tests/unit/workflow/test_cpu_workflow.py ...... [ 90%] tests/unit/workflow/test_workflow.py ................................... [ 92%] .......................................................... [ 96%] tests/unit/workflow/test_workflow_chaining.py ... [ 96%] tests/unit/workflow/test_workflow_node.py ........... [ 97%] tests/unit/workflow/test_workflow_ops.py ... [ 97%] tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%] ... [100%]

=============================== warnings summary =============================== ../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33 /usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. DASK_VERSION = LooseVersion(dask.version)

../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings /var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. other = LooseVersion(other)

nvtabular/loader/init.py:19 /var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader. warnings.warn(

tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-Shuffle.PER_WORKER-True-device-0-parquet-0.1] /usr/local/lib/python3.8/dist-packages/tornado/ioloop.py:350: DeprecationWarning: make_current is deprecated; start the event loop first self.make_current()

tests/unit/test_dask_nvt.py: 1 warning tests/unit/test_tf4rec.py: 1 warning tests/unit/test_tools.py: 5 warnings tests/unit/test_triton_inference.py: 8 warnings tests/unit/loader/test_dataloader_backend.py: 6 warnings tests/unit/loader/test_tf_dataloader.py: 66 warnings tests/unit/loader/test_torch_dataloader.py: 67 warnings tests/unit/ops/test_categorify.py: 69 warnings tests/unit/ops/test_drop_low_cardinality.py: 2 warnings tests/unit/ops/test_fill.py: 8 warnings tests/unit/ops/test_hash_bucket.py: 4 warnings tests/unit/ops/test_join.py: 88 warnings tests/unit/ops/test_lambda.py: 1 warning tests/unit/ops/test_normalize.py: 9 warnings tests/unit/ops/test_ops.py: 11 warnings tests/unit/ops/test_ops_schema.py: 17 warnings tests/unit/workflow/test_workflow.py: 27 warnings tests/unit/workflow/test_workflow_chaining.py: 1 warning tests/unit/workflow/test_workflow_node.py: 1 warning tests/unit/workflow/test_workflow_schemas.py: 1 warning /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility. warnings.warn(

tests/unit/test_dask_nvt.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files. warnings.warn(

tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers /usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters. warnings.warn(

tests/unit/test_notebooks.py: 1 warning tests/unit/test_tools.py: 17 warnings tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 54 warnings /usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future warnings.warn(

tests/unit/loader/test_tf_dataloader.py: 2 warnings tests/unit/loader/test_torch_dataloader.py: 12 warnings tests/unit/workflow/test_workflow.py: 9 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files. warnings.warn(

tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet] tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet] tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True] /usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy self._setitem_single_block(indexer, value, name)

tests/unit/workflow/test_cpu_workflow.py: 6 warnings tests/unit/workflow/test_workflow.py: 12 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files. warnings.warn(

tests/unit/workflow/test_workflow.py: 48 warnings /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files. warnings.warn(

tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_parquet_output[True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION] tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None] /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files. warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html ========== 1430 passed, 2 skipped, 618 warnings in 694.19s (0:11:34) =========== Performing Post build task... Match found for : : True Logical operation result is TRUE Running script : #!/bin/bash cd /var/jenkins_home/ CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log" [nvtabular_tests] $ /bin/bash /tmp/jenkins15785426595632365215.sh

nvidia-merlin-bot avatar Aug 15 '22 18:08 nvidia-merlin-bot

Having updated the branch to the latest main, it looks like this PR doesn't work on CPU with Pandas. 😅

karlhigley avatar Jan 27 '23 21:01 karlhigley