Fix `GroupBy` column names when grouping and outputting same column
In this case, the piece of code that should be connecting the column name with the aggregation name incorrectly tries to join every character of the column name with the name separator, resulting in a column called (for example) c_u_s_t_o_m_e_r___i_d. We can avoid that by explicitly checking if we have a string (which is iterable like a list) instead of an actual list.
Click to view CI Results
GitHub pull request #1636 of commit e76be221837f1e6a1040a1276a2397cf12cac00e, no merge conflicts.
Running as SYSTEM
Setting status of e76be221837f1e6a1040a1276a2397cf12cac00e to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4607/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
> git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
> git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
> git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
> git rev-parse e76be221837f1e6a1040a1276a2397cf12cac00e^{commit} # timeout=10
Checking out Revision e76be221837f1e6a1040a1276a2397cf12cac00e (detached)
> git config core.sparsecheckout # timeout=10
> git checkout -f e76be221837f1e6a1040a1276a2397cf12cac00e # timeout=10
Commit message: "Fix `GroupBy` column names when grouping and outputting same column"
> git rev-list --no-walk 9bfff5406c45b1f72d2ec0fba9841f3c383e0851 # timeout=10
First time build. Skipping changelog.
[nvtabular_tests] $ /bin/bash /tmp/jenkins17662894494447285510.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items
tests/unit/test_dask_nvt.py ............................................ [ 3%]
........................................................................ [ 8%]
.... [ 8%]
tests/unit/test_notebooks.py ...... [ 8%]
tests/unit/test_s3.py .. [ 8%]
tests/unit/test_tf4rec.py . [ 9%]
tests/unit/test_tools.py ...................... [ 10%]
tests/unit/test_triton_inference.py ................................ [ 12%]
tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%]
tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%]
................................................... [ 18%]
tests/unit/framework_utils/test_torch_layers.py . [ 18%]
tests/unit/loader/test_dataloader_backend.py ...... [ 18%]
tests/unit/loader/test_tf_dataloader.py ................................ [ 21%]
........................................s.. [ 24%]
tests/unit/loader/test_torch_dataloader.py ............................. [ 26%]
...................................................... [ 29%]
tests/unit/ops/test_categorify.py ...................................... [ 32%]
........................................................................ [ 37%]
........................................... [ 40%]
tests/unit/ops/test_column_similarity.py ........................ [ 42%]
tests/unit/ops/test_drop_low_cardinality.py .. [ 42%]
tests/unit/ops/test_fill.py ............................................ [ 45%]
........ [ 45%]
tests/unit/ops/test_groupyby.py ...................... [ 47%]
tests/unit/ops/test_hash_bucket.py ......................... [ 49%]
tests/unit/ops/test_join.py ............................................ [ 52%]
........................................................................ [ 57%]
.................................. [ 59%]
tests/unit/ops/test_lambda.py .......... [ 60%]
tests/unit/ops/test_normalize.py ....................................... [ 63%]
.. [ 63%]
tests/unit/ops/test_ops.py ............................................. [ 66%]
.................... [ 67%]
tests/unit/ops/test_ops_schema.py ...................................... [ 70%]
........................................................................ [ 75%]
........................................................................ [ 80%]
........................................................................ [ 85%]
....................................... [ 88%]
tests/unit/ops/test_reduce_dtype_size.py .. [ 88%]
tests/unit/ops/test_target_encode.py ..................... [ 89%]
tests/unit/workflow/test_cpu_workflow.py ...... [ 90%]
tests/unit/workflow/test_workflow.py ................................... [ 92%]
.......................................................... [ 96%]
tests/unit/workflow/test_workflow_chaining.py ... [ 96%]
tests/unit/workflow/test_workflow_node.py ........... [ 97%]
tests/unit/workflow/test_workflow_ops.py ... [ 97%]
tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%]
... [100%]
=============================== warnings summary ===============================
../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33
/usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
DASK_VERSION = LooseVersion(dask.version)
../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings
/var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
other = LooseVersion(other)
nvtabular/loader/init.py:19
/var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader.
warnings.warn(
tests/unit/test_dask_nvt.py: 1 warning
tests/unit/test_s3.py: 2 warnings
tests/unit/test_tf4rec.py: 1 warning
tests/unit/test_tools.py: 5 warnings
tests/unit/test_triton_inference.py: 8 warnings
tests/unit/loader/test_dataloader_backend.py: 6 warnings
tests/unit/loader/test_tf_dataloader.py: 66 warnings
tests/unit/loader/test_torch_dataloader.py: 67 warnings
tests/unit/ops/test_categorify.py: 69 warnings
tests/unit/ops/test_drop_low_cardinality.py: 2 warnings
tests/unit/ops/test_fill.py: 8 warnings
tests/unit/ops/test_hash_bucket.py: 4 warnings
tests/unit/ops/test_join.py: 88 warnings
tests/unit/ops/test_lambda.py: 1 warning
tests/unit/ops/test_normalize.py: 9 warnings
tests/unit/ops/test_ops.py: 11 warnings
tests/unit/ops/test_ops_schema.py: 17 warnings
tests/unit/workflow/test_workflow.py: 27 warnings
tests/unit/workflow/test_workflow_chaining.py: 1 warning
tests/unit/workflow/test_workflow_node.py: 1 warning
tests/unit/workflow/test_workflow_schemas.py: 1 warning
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
tests/unit/test_dask_nvt.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files.
warnings.warn(
tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers
/usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters.
warnings.warn(
tests/unit/test_notebooks.py::test_optimize_criteo
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37465 instead
warnings.warn(
tests/unit/test_notebooks.py: 1 warning
tests/unit/test_tools.py: 17 warnings
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 54 warnings
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future
warnings.warn(
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 12 warnings
tests/unit/workflow/test_workflow.py: 9 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files.
warnings.warn(
tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet]
tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet]
tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True]
/usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
self._setitem_single_block(indexer, value, name)
tests/unit/workflow/test_cpu_workflow.py: 6 warnings
tests/unit/workflow/test_workflow.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files.
warnings.warn(
tests/unit/workflow/test_workflow.py: 48 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files.
warnings.warn(
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None]
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files.
warnings.warn(
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========== 1432 passed, 1 skipped, 620 warnings in 702.02s (0:11:42) ===========
Performing Post build task...
Match found for : : True
Logical operation result is TRUE
Running script : #!/bin/bash
cd /var/jenkins_home/
CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log"
[nvtabular_tests] $ /bin/bash /tmp/jenkins10118772471946213032.sh
Click to view CI Results
GitHub pull request #1636 of commit f905762705166551ca6f66b7e076ee7fcfbbd443, no merge conflicts.
Running as SYSTEM
Setting status of f905762705166551ca6f66b7e076ee7fcfbbd443 to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4608/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
> git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
> git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
> git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
> git rev-parse f905762705166551ca6f66b7e076ee7fcfbbd443^{commit} # timeout=10
Checking out Revision f905762705166551ca6f66b7e076ee7fcfbbd443 (detached)
> git config core.sparsecheckout # timeout=10
> git checkout -f f905762705166551ca6f66b7e076ee7fcfbbd443 # timeout=10
Commit message: "Improve test compatibility with CPU by using `make_df`"
> git rev-list --no-walk e76be221837f1e6a1040a1276a2397cf12cac00e # timeout=10
[nvtabular_tests] $ /bin/bash /tmp/jenkins8349620494153300950.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items
tests/unit/test_dask_nvt.py ............................................ [ 3%]
........................................................................ [ 8%]
.... [ 8%]
tests/unit/test_notebooks.py ...... [ 8%]
tests/unit/test_s3.py .. [ 8%]
tests/unit/test_tf4rec.py . [ 9%]
tests/unit/test_tools.py ...................... [ 10%]
tests/unit/test_triton_inference.py ................................ [ 12%]
tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%]
tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%]
................................................... [ 18%]
tests/unit/framework_utils/test_torch_layers.py . [ 18%]
tests/unit/loader/test_dataloader_backend.py ...... [ 18%]
tests/unit/loader/test_tf_dataloader.py ................................ [ 21%]
........................................s.. [ 24%]
tests/unit/loader/test_torch_dataloader.py ............................. [ 26%]
...................................................... [ 29%]
tests/unit/ops/test_categorify.py ...................................... [ 32%]
........................................................................ [ 37%]
........................................... [ 40%]
tests/unit/ops/test_column_similarity.py ........................ [ 42%]
tests/unit/ops/test_drop_low_cardinality.py .. [ 42%]
tests/unit/ops/test_fill.py ............................................ [ 45%]
........ [ 45%]
tests/unit/ops/test_groupyby.py .....................F [ 47%]
tests/unit/ops/test_hash_bucket.py ......................... [ 49%]
tests/unit/ops/test_join.py ............................................ [ 52%]
........................................................................ [ 57%]
.................................. [ 59%]
tests/unit/ops/test_lambda.py .......... [ 60%]
tests/unit/ops/test_normalize.py ....................................... [ 63%]
.. [ 63%]
tests/unit/ops/test_ops.py ............................................. [ 66%]
.................... [ 67%]
tests/unit/ops/test_ops_schema.py ...................................... [ 70%]
........................................................................ [ 75%]
........................................................................ [ 80%]
........................................................................ [ 85%]
....................................... [ 88%]
tests/unit/ops/test_reduce_dtype_size.py .. [ 88%]
tests/unit/ops/test_target_encode.py ..................... [ 89%]
tests/unit/workflow/test_cpu_workflow.py ...... [ 90%]
tests/unit/workflow/test_workflow.py ................................... [ 92%]
.......................................................... [ 96%]
tests/unit/workflow/test_workflow_chaining.py ... [ 96%]
tests/unit/workflow/test_workflow_node.py ........... [ 97%]
tests/unit/workflow/test_workflow_ops.py ... [ 97%]
tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%]
... [100%]
=================================== FAILURES ===================================
______ test_groupby_column_names_when_grouping_and_outputting_same_column ______
def test_groupby_column_names_when_grouping_and_outputting_same_column():
purchases = make_df(
data={
"customer_id": np.random.randint(0, 10, 1000),
"purchase_date": [
datetime.date(2022, np.random.randint(1, 13), np.random.randint(1, 29))
for i in range(1000)
],
"quantity": np.random.randint(1, 50, 1000),
}
)
E TypeError: make_df() got an unexpected keyword argument 'data'
tests/unit/ops/test_groupyby.py:262: TypeError
=============================== warnings summary ===============================
../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33
/usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
DASK_VERSION = LooseVersion(dask.version)
../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings
/var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
other = LooseVersion(other)
nvtabular/loader/init.py:19
/var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader.
warnings.warn(
tests/unit/test_dask_nvt.py: 1 warning
tests/unit/test_s3.py: 2 warnings
tests/unit/test_tf4rec.py: 1 warning
tests/unit/test_tools.py: 5 warnings
tests/unit/test_triton_inference.py: 8 warnings
tests/unit/loader/test_dataloader_backend.py: 6 warnings
tests/unit/loader/test_tf_dataloader.py: 66 warnings
tests/unit/loader/test_torch_dataloader.py: 67 warnings
tests/unit/ops/test_categorify.py: 69 warnings
tests/unit/ops/test_drop_low_cardinality.py: 2 warnings
tests/unit/ops/test_fill.py: 8 warnings
tests/unit/ops/test_hash_bucket.py: 4 warnings
tests/unit/ops/test_join.py: 88 warnings
tests/unit/ops/test_lambda.py: 1 warning
tests/unit/ops/test_normalize.py: 9 warnings
tests/unit/ops/test_ops.py: 11 warnings
tests/unit/ops/test_ops_schema.py: 17 warnings
tests/unit/workflow/test_workflow.py: 27 warnings
tests/unit/workflow/test_workflow_chaining.py: 1 warning
tests/unit/workflow/test_workflow_node.py: 1 warning
tests/unit/workflow/test_workflow_schemas.py: 1 warning
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
tests/unit/test_dask_nvt.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files.
warnings.warn(
tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers
/usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters.
warnings.warn(
tests/unit/test_notebooks.py: 1 warning
tests/unit/test_tools.py: 17 warnings
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 54 warnings
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future
warnings.warn(
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 12 warnings
tests/unit/workflow/test_workflow.py: 9 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files.
warnings.warn(
tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet]
tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet]
tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True]
/usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
self._setitem_single_block(indexer, value, name)
tests/unit/workflow/test_cpu_workflow.py: 6 warnings
tests/unit/workflow/test_workflow.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files.
warnings.warn(
tests/unit/workflow/test_workflow.py: 48 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files.
warnings.warn(
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None]
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files.
warnings.warn(
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================== short test summary info ============================
FAILED tests/unit/ops/test_groupyby.py::test_groupby_column_names_when_grouping_and_outputting_same_column
===== 1 failed, 1431 passed, 1 skipped, 619 warnings in 689.24s (0:11:29) ======
Build step 'Execute shell' marked build as failure
Performing Post build task...
Match found for : : True
Logical operation result is TRUE
Running script : #!/bin/bash
cd /var/jenkins_home/
CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log"
[nvtabular_tests] $ /bin/bash /tmp/jenkins15104071637361123180.sh
Click to view CI Results
GitHub pull request #1636 of commit b7b332b1d018d7ce9adaa97478e297a449731c0c, no merge conflicts.
Running as SYSTEM
Setting status of b7b332b1d018d7ce9adaa97478e297a449731c0c to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4609/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
> git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
> git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
> git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
> git rev-parse b7b332b1d018d7ce9adaa97478e297a449731c0c^{commit} # timeout=10
Checking out Revision b7b332b1d018d7ce9adaa97478e297a449731c0c (detached)
> git config core.sparsecheckout # timeout=10
> git checkout -f b7b332b1d018d7ce9adaa97478e297a449731c0c # timeout=10
Commit message: "Improve test compatibility with CPU by using `make_df`"
> git rev-list --no-walk f905762705166551ca6f66b7e076ee7fcfbbd443 # timeout=10
First time build. Skipping changelog.
[nvtabular_tests] $ /bin/bash /tmp/jenkins438049645549617170.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items
tests/unit/test_dask_nvt.py ............................................ [ 3%]
........................................................................ [ 8%]
.... [ 8%]
tests/unit/test_notebooks.py ...... [ 8%]
tests/unit/test_s3.py .. [ 8%]
tests/unit/test_tf4rec.py . [ 9%]
tests/unit/test_tools.py ...................... [ 10%]
tests/unit/test_triton_inference.py ................................ [ 12%]
tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%]
tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%]
................................................... [ 18%]
tests/unit/framework_utils/test_torch_layers.py . [ 18%]
tests/unit/loader/test_dataloader_backend.py ...... [ 18%]
tests/unit/loader/test_tf_dataloader.py ................................ [ 21%]
........................................s.. [ 24%]
tests/unit/loader/test_torch_dataloader.py ............................. [ 26%]
...................................................... [ 29%]
tests/unit/ops/test_categorify.py ...................................... [ 32%]
........................................................................ [ 37%]
........................................... [ 40%]
tests/unit/ops/test_column_similarity.py ........................ [ 42%]
tests/unit/ops/test_drop_low_cardinality.py .. [ 42%]
tests/unit/ops/test_fill.py ............................................ [ 45%]
........ [ 45%]
tests/unit/ops/test_groupyby.py ...................... [ 47%]
tests/unit/ops/test_hash_bucket.py ......................... [ 49%]
tests/unit/ops/test_join.py ............................................ [ 52%]
........................................................................ [ 57%]
.................................. [ 59%]
tests/unit/ops/test_lambda.py .......... [ 60%]
tests/unit/ops/test_normalize.py ....................................... [ 63%]
.. [ 63%]
tests/unit/ops/test_ops.py ............................................. [ 66%]
.................... [ 67%]
tests/unit/ops/test_ops_schema.py ...................................... [ 70%]
........................................................................ [ 75%]
........................................................................ [ 80%]
........................................................................ [ 85%]
....................................... [ 88%]
tests/unit/ops/test_reduce_dtype_size.py .. [ 88%]
tests/unit/ops/test_target_encode.py ..................... [ 89%]
tests/unit/workflow/test_cpu_workflow.py ...... [ 90%]
tests/unit/workflow/test_workflow.py ................................... [ 92%]
.......................................................... [ 96%]
tests/unit/workflow/test_workflow_chaining.py ... [ 96%]
tests/unit/workflow/test_workflow_node.py ........... [ 97%]
tests/unit/workflow/test_workflow_ops.py ... [ 97%]
tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%]
... [100%]
=============================== warnings summary ===============================
../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33
/usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
DASK_VERSION = LooseVersion(dask.version)
../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings
/var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
other = LooseVersion(other)
nvtabular/loader/init.py:19
/var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader.
warnings.warn(
tests/unit/test_dask_nvt.py: 1 warning
tests/unit/test_s3.py: 2 warnings
tests/unit/test_tf4rec.py: 1 warning
tests/unit/test_tools.py: 5 warnings
tests/unit/test_triton_inference.py: 8 warnings
tests/unit/loader/test_dataloader_backend.py: 6 warnings
tests/unit/loader/test_tf_dataloader.py: 66 warnings
tests/unit/loader/test_torch_dataloader.py: 67 warnings
tests/unit/ops/test_categorify.py: 69 warnings
tests/unit/ops/test_drop_low_cardinality.py: 2 warnings
tests/unit/ops/test_fill.py: 8 warnings
tests/unit/ops/test_hash_bucket.py: 4 warnings
tests/unit/ops/test_join.py: 88 warnings
tests/unit/ops/test_lambda.py: 1 warning
tests/unit/ops/test_normalize.py: 9 warnings
tests/unit/ops/test_ops.py: 11 warnings
tests/unit/ops/test_ops_schema.py: 17 warnings
tests/unit/workflow/test_workflow.py: 27 warnings
tests/unit/workflow/test_workflow_chaining.py: 1 warning
tests/unit/workflow/test_workflow_node.py: 1 warning
tests/unit/workflow/test_workflow_schemas.py: 1 warning
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
tests/unit/test_dask_nvt.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files.
warnings.warn(
tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers
/usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters.
warnings.warn(
tests/unit/test_notebooks.py: 1 warning
tests/unit/test_tools.py: 17 warnings
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 54 warnings
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future
warnings.warn(
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 12 warnings
tests/unit/workflow/test_workflow.py: 9 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files.
warnings.warn(
tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet]
tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet]
tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True]
/usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
self._setitem_single_block(indexer, value, name)
tests/unit/workflow/test_cpu_workflow.py: 6 warnings
tests/unit/workflow/test_workflow.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files.
warnings.warn(
tests/unit/workflow/test_workflow.py: 48 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files.
warnings.warn(
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None]
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files.
warnings.warn(
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========== 1432 passed, 1 skipped, 619 warnings in 702.65s (0:11:42) ===========
Performing Post build task...
Match found for : : True
Logical operation result is TRUE
Running script : #!/bin/bash
cd /var/jenkins_home/
CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log"
[nvtabular_tests] $ /bin/bash /tmp/jenkins6775373465073174909.sh
Click to view CI Results
GitHub pull request #1636 of commit 302f7c355a27bd485f293a4494785ea89d29949e, no merge conflicts.
Running as SYSTEM
Setting status of 302f7c355a27bd485f293a4494785ea89d29949e to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4611/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
> git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
> git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
> git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
> git rev-parse 302f7c355a27bd485f293a4494785ea89d29949e^{commit} # timeout=10
Checking out Revision 302f7c355a27bd485f293a4494785ea89d29949e (detached)
> git config core.sparsecheckout # timeout=10
> git checkout -f 302f7c355a27bd485f293a4494785ea89d29949e # timeout=10
Commit message: "Merge branch 'main' into fix/groupby-columns"
> git rev-list --no-walk 468842b0ec006fe55f83ae52c6d5163041ce7bea # timeout=10
[nvtabular_tests] $ /bin/bash /tmp/jenkins1525259498990655070.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1433 items
tests/unit/test_dask_nvt.py ............................F..F..F..F...... [ 3%]
..F...............................................................F.F... [ 8%]
.... [ 8%]
tests/unit/test_notebooks.py ...... [ 8%]
tests/unit/test_s3.py .. [ 8%]
tests/unit/test_tf4rec.py . [ 9%]
tests/unit/test_tools.py ...................... [ 10%]
tests/unit/test_triton_inference.py ................................ [ 12%]
tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%]
tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%]
................................................... [ 18%]
tests/unit/framework_utils/test_torch_layers.py . [ 18%]
tests/unit/loader/test_dataloader_backend.py ...... [ 18%]
tests/unit/loader/test_tf_dataloader.py ................................ [ 21%]
........................................s.. [ 24%]
tests/unit/loader/test_torch_dataloader.py ............................. [ 26%]
...................................................... [ 29%]
tests/unit/ops/test_categorify.py ...................................... [ 32%]
........................................................................ [ 37%]
........................................... [ 40%]
tests/unit/ops/test_column_similarity.py ........................ [ 42%]
tests/unit/ops/test_drop_low_cardinality.py .. [ 42%]
tests/unit/ops/test_fill.py ............................................ [ 45%]
........ [ 45%]
tests/unit/ops/test_groupyby.py ...................... [ 47%]
tests/unit/ops/test_hash_bucket.py ......................... [ 49%]
tests/unit/ops/test_join.py ............................................ [ 52%]
........................................................................ [ 57%]
.................................. [ 59%]
tests/unit/ops/test_lambda.py .......... [ 60%]
tests/unit/ops/test_normalize.py ....................................... [ 63%]
.. [ 63%]
tests/unit/ops/test_ops.py ............................................. [ 66%]
.................... [ 67%]
tests/unit/ops/test_ops_schema.py ...................................... [ 70%]
........................................................................ [ 75%]
........................................................................ [ 80%]
........................................................................ [ 85%]
....................................... [ 88%]
tests/unit/ops/test_reduce_dtype_size.py .. [ 88%]
tests/unit/ops/test_target_encode.py ..................... [ 89%]
tests/unit/workflow/test_cpu_workflow.py FFFFFF [ 90%]
tests/unit/workflow/test_workflow.py ................................... [ 92%]
.......................................................... [ 96%]
tests/unit/workflow/test_workflow_chaining.py ... [ 96%]
tests/unit/workflow/test_workflow_node.py ........... [ 97%]
tests/unit/workflow/test_workflow_ops.py ... [ 97%]
tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%]
... [100%]
=================================== FAILURES ===================================
________ test_dask_workflow_api_dlrm[True-None-True-device-150-csv-0.1] ________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr28')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
freq_threshold = 150, part_mem_fraction = 0.1, engine = 'csv'
cat_cache = 'device', on_host = True, shuffle = None, cpu = True
@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
client,
tmpdir,
datasets,
freq_threshold,
part_mem_fraction,
engine,
cat_cache,
on_host,
shuffle,
cpu,
):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
paths = sorted(paths)
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
df0 = df0.to_pandas() if cpu else df0
if engine == "parquet":
cat_names = ["name-cat", "name-string"]
else:
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
cats = cat_names >> ops.Categorify(
freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
)
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()
workflow = Workflow(cats + conts + label_name)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
else:
dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)
output_path = os.path.join(tmpdir, "processed")
transformed = workflow.fit_transform(dataset)
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)
result = transformed.to_ddf().compute()
assert len(df0) == len(result)
assert result["x"].min() == 0.0
assert result["x"].isna().sum() == 0
assert result["y"].min() == 0.0
assert result["y"].isna().sum() == 0
# Check categories. Need to sort first to make sure we are comparing
# "apples to apples"
expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
{"name-string_x": "count", "name-string_y": "count"}
)
if freq_threshold:
dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)
# Read back from disk
if cpu:
df_disk = dd_read_parquet(output_path).compute()
tests/unit/test_dask_nvt.py:130:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
2022-08-01 15:37:13,228 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-b8bb1408c849bf08e86780790a9b9064', 1)
Function: subgraph_callable-8e0b4c17-558a-4191-8b2a-b0b8e850
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr28/processed/part_1.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
__________ test_dask_workflow_api_dlrm[True-None-True-None-0-csv-0.1] __________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr31')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
freq_threshold = 0, part_mem_fraction = 0.1, engine = 'csv', cat_cache = None
on_host = True, shuffle = None, cpu = True
@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
client,
tmpdir,
datasets,
freq_threshold,
part_mem_fraction,
engine,
cat_cache,
on_host,
shuffle,
cpu,
):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
paths = sorted(paths)
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
df0 = df0.to_pandas() if cpu else df0
if engine == "parquet":
cat_names = ["name-cat", "name-string"]
else:
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
cats = cat_names >> ops.Categorify(
freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
)
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()
workflow = Workflow(cats + conts + label_name)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
else:
dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)
output_path = os.path.join(tmpdir, "processed")
transformed = workflow.fit_transform(dataset)
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)
result = transformed.to_ddf().compute()
assert len(df0) == len(result)
assert result["x"].min() == 0.0
assert result["x"].isna().sum() == 0
assert result["y"].min() == 0.0
assert result["y"].isna().sum() == 0
# Check categories. Need to sort first to make sure we are comparing
# "apples to apples"
expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
{"name-string_x": "count", "name-string_y": "count"}
)
if freq_threshold:
dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)
# Read back from disk
if cpu:
df_disk = dd_read_parquet(output_path).compute()
tests/unit/test_dask_nvt.py:130:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
2022-08-01 15:37:15,289 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-790d36cd0968420525c1f1861e517463', 1)
Function: subgraph_callable-803ab848-8b23-4854-b130-905f4695
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr31/processed/part_1.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
_________ test_dask_workflow_api_dlrm[True-None-True-None-150-csv-0.1] _________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr34')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
freq_threshold = 150, part_mem_fraction = 0.1, engine = 'csv', cat_cache = None
on_host = True, shuffle = None, cpu = True
@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
client,
tmpdir,
datasets,
freq_threshold,
part_mem_fraction,
engine,
cat_cache,
on_host,
shuffle,
cpu,
):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
paths = sorted(paths)
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
df0 = df0.to_pandas() if cpu else df0
if engine == "parquet":
cat_names = ["name-cat", "name-string"]
else:
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
cats = cat_names >> ops.Categorify(
freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
)
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()
workflow = Workflow(cats + conts + label_name)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
else:
dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)
output_path = os.path.join(tmpdir, "processed")
transformed = workflow.fit_transform(dataset)
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)
result = transformed.to_ddf().compute()
assert len(df0) == len(result)
assert result["x"].min() == 0.0
assert result["x"].isna().sum() == 0
assert result["y"].min() == 0.0
assert result["y"].isna().sum() == 0
# Check categories. Need to sort first to make sure we are comparing
# "apples to apples"
expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
{"name-string_x": "count", "name-string_y": "count"}
)
if freq_threshold:
dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)
# Read back from disk
if cpu:
df_disk = dd_read_parquet(output_path).compute()
tests/unit/test_dask_nvt.py:130:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
2022-08-01 15:37:17,095 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-148dffefce082a24facb8b758f5e0617', 1)
Function: subgraph_callable-b0299027-51f2-43df-b7f8-7f329c6e
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr34/processed/part_1.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
________ test_dask_workflow_api_dlrm[True-None-False-device-0-csv-0.1] _________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr37')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
freq_threshold = 0, part_mem_fraction = 0.1, engine = 'csv'
cat_cache = 'device', on_host = False, shuffle = None, cpu = True
@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
client,
tmpdir,
datasets,
freq_threshold,
part_mem_fraction,
engine,
cat_cache,
on_host,
shuffle,
cpu,
):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
paths = sorted(paths)
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
df0 = df0.to_pandas() if cpu else df0
if engine == "parquet":
cat_names = ["name-cat", "name-string"]
else:
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
cats = cat_names >> ops.Categorify(
freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
)
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()
workflow = Workflow(cats + conts + label_name)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
else:
dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)
output_path = os.path.join(tmpdir, "processed")
transformed = workflow.fit_transform(dataset)
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)
result = transformed.to_ddf().compute()
assert len(df0) == len(result)
assert result["x"].min() == 0.0
assert result["x"].isna().sum() == 0
assert result["y"].min() == 0.0
assert result["y"].isna().sum() == 0
# Check categories. Need to sort first to make sure we are comparing
# "apples to apples"
expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
{"name-string_x": "count", "name-string_y": "count"}
)
if freq_threshold:
dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)
# Read back from disk
if cpu:
df_disk = dd_read_parquet(output_path).compute()
tests/unit/test_dask_nvt.py:130:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
2022-08-01 15:37:19,274 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-b0fea75568422243c464d3466ab935eb', 1)
Function: subgraph_callable-9c051324-4bed-4f4a-99bf-bb976407
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr37/processed/part_1.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
________ test_dask_workflow_api_dlrm[True-None-False-None-150-csv-0.1] _________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr46')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
freq_threshold = 150, part_mem_fraction = 0.1, engine = 'csv', cat_cache = None
on_host = False, shuffle = None, cpu = True
@pytest.mark.parametrize("part_mem_fraction", [0.1])
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("freq_threshold", [0, 150])
@pytest.mark.parametrize("cat_cache", ["device", None])
@pytest.mark.parametrize("on_host", [True, False])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [True, False])
def test_dask_workflow_api_dlrm(
client,
tmpdir,
datasets,
freq_threshold,
part_mem_fraction,
engine,
cat_cache,
on_host,
shuffle,
cpu,
):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
paths = sorted(paths)
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
df0 = df0.to_pandas() if cpu else df0
if engine == "parquet":
cat_names = ["name-cat", "name-string"]
else:
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
cats = cat_names >> ops.Categorify(
freq_threshold=freq_threshold, out_path=str(tmpdir), cat_cache=cat_cache, on_host=on_host
)
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> ops.LogOp()
workflow = Workflow(cats + conts + label_name)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, cpu=cpu, part_mem_fraction=part_mem_fraction)
else:
dataset = Dataset(paths, cpu=cpu, names=allcols_csv, part_mem_fraction=part_mem_fraction)
output_path = os.path.join(tmpdir, "processed")
transformed = workflow.fit_transform(dataset)
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=1)
result = transformed.to_ddf().compute()
assert len(df0) == len(result)
assert result["x"].min() == 0.0
assert result["x"].isna().sum() == 0
assert result["y"].min() == 0.0
assert result["y"].isna().sum() == 0
# Check categories. Need to sort first to make sure we are comparing
# "apples to apples"
expect = df0.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
got = result.sort_values(["label", "x", "y", "id"]).reset_index(drop=True).reset_index()
dfm = expect.merge(got, on="index", how="inner")[["name-string_x", "name-string_y"]]
dfm_gb = dfm.groupby(["name-string_x", "name-string_y"]).agg(
{"name-string_x": "count", "name-string_y": "count"}
)
if freq_threshold:
dfm_gb = dfm_gb[dfm_gb["name-string_x"] >= freq_threshold]
assert_eq(dfm_gb["name-string_x"], dfm_gb["name-string_y"], check_names=False)
# Read back from disk
if cpu:
df_disk = dd_read_parquet(output_path).compute()
tests/unit/test_dask_nvt.py:130:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
2022-08-01 15:37:24,491 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-6d382dc8677530e6ffd60266e7f6c375', 1)
Function: subgraph_callable-2b11a943-a1ba-421b-903a-7916f0b0
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_workflow_api_dlrm_Tr46/processed/part_1.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
___________________ test_dask_preproc_cpu[True-None-parquet] ___________________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
engine = 'parquet', shuffle = None, cpu = True
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_preproc_cpu(client, tmpdir, datasets, engine, shuffle, cpu):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, part_size="1MB", cpu=cpu)
else:
dataset = Dataset(paths, names=allcols_csv, part_size="1MB", cpu=cpu)
# Simple transform (normalize)
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
conts = cont_names >> ops.FillMissing() >> ops.Normalize()
workflow = Workflow(conts + cat_names + label_name)
transformed = workflow.fit_transform(dataset)
# Write out dataset
output_path = os.path.join(tmpdir, "processed")
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=4)
# Check the final result
df_disk = dd_read_parquet(output_path, engine="pyarrow").compute()
tests/unit/test_dask_nvt.py:277:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
2022-08-01 15:38:05,422 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 15)
Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_3.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
2022-08-01 15:38:05,423 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 0)
Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_0.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:05,423 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 13)
Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_3.parquet', [1], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:05,424 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 10)
Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_2.parquet', [2], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:05,424 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-bd285be180e6a88517d6dd8a6e00454b', 12)
Function: subgraph_callable-f5ca3b7e-6374-41b0-b651-a652864d
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non0/processed/part_3.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
________________ test_dask_preproc_cpu[True-None-csv-no-header] ________________
client = <Client: 'tcp://127.0.0.1:39277' processes=2 threads=16, memory=125.83 GiB>
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2')
datasets = {'cats': local('/tmp/pytest-of-jenkins/pytest-4/cats0'), 'csv': local('/tmp/pytest-of-jenkins/pytest-4/csv0'), 'csv-no... local('/tmp/pytest-of-jenkins/pytest-4/csv-no-header0'), 'parquet': local('/tmp/pytest-of-jenkins/pytest-4/parquet0')}
engine = 'csv-no-header', shuffle = None, cpu = True
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("shuffle", [Shuffle.PER_WORKER, None])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_preproc_cpu(client, tmpdir, datasets, engine, shuffle, cpu):
set_dask_client(client=client)
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
if engine == "parquet":
df1 = cudf.read_parquet(paths[0])[mycols_pq]
df2 = cudf.read_parquet(paths[1])[mycols_pq]
elif engine == "csv":
df1 = cudf.read_csv(paths[0], header=0)[mycols_csv]
df2 = cudf.read_csv(paths[1], header=0)[mycols_csv]
else:
df1 = cudf.read_csv(paths[0], names=allcols_csv)[mycols_csv]
df2 = cudf.read_csv(paths[1], names=allcols_csv)[mycols_csv]
df0 = cudf.concat([df1, df2], axis=0)
if engine in ("parquet", "csv"):
dataset = Dataset(paths, part_size="1MB", cpu=cpu)
else:
dataset = Dataset(paths, names=allcols_csv, part_size="1MB", cpu=cpu)
# Simple transform (normalize)
cat_names = ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
conts = cont_names >> ops.FillMissing() >> ops.Normalize()
workflow = Workflow(conts + cat_names + label_name)
transformed = workflow.fit_transform(dataset)
# Write out dataset
output_path = os.path.join(tmpdir, "processed")
transformed.to_parquet(output_path=output_path, shuffle=shuffle, out_files_per_proc=4)
# Check the final result
df_disk = dd_read_parquet(output_path, engine="pyarrow").compute()
tests/unit/test_dask_nvt.py:277:
/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:3015: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2167: in gather
return self.sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:309: in sync
return sync(
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:376: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.8/dist-packages/distributed/utils.py:349: in f
result = yield future
/usr/local/lib/python3.8/dist-packages/tornado/gen.py:762: in run
value = future.result()
/usr/local/lib/python3.8/dist-packages/distributed/client.py:2030: in _gather
raise exception.with_traceback(traceback)
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:466: in read_partition
arrow_table = cls._read_table(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:1606: in _read_table
arrow_table = _read_table_from_path(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/arrow.py:277: in _read_table_from_path
return pq.ParquetFile(fil).read_row_groups(
/usr/local/lib/python3.8/dist-packages/pyarrow/parquet.py:230: in init
self.reader.open(
pyarrow/_parquet.pyx:972: in pyarrow._parquet.ParquetReader.open
???
???
E pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
pyarrow/error.pxi:99: ArrowInvalid
----------------------------- Captured stderr call -----------------------------
2022-08-01 15:38:06,951 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 15)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_3.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,952 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 13)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_3.parquet', [1], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,954 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 1)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_0.parquet', [1], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,954 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 11)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_2.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,960 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 17)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_4.parquet', [1], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,962 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 22)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_5.parquet', [2], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,962 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 20)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_5.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,963 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 19)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_4.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,976 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 9)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_2.parquet', [1], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,978 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 26)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_6.parquet', [2], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
--------------------------- Captured stderr teardown ---------------------------
2022-08-01 15:38:06,985 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 24)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_6.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,991 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 5)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_1.parquet', [1], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,992 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 31)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_7.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,993 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 28)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_7.parquet', [0], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,993 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 3)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_0.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
2022-08-01 15:38:06,994 - distributed.worker - WARNING - Compute Failed
Key: ('read-parquet-110e9a4c3d90230d67ee55ef644df032', 7)
Function: subgraph_callable-91791866-8a14-4a57-b3af-ab60e0ec
args: ({'piece': ('/tmp/pytest-of-jenkins/pytest-4/test_dask_preproc_cpu_True_Non2/processed/part_1.parquet', [3], [])})
kwargs: {}
Exception: "ArrowInvalid('Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.')"
_____________________ test_cpu_workflow[True-True-parquet] _____________________
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_pa0')
df = name-cat name-string id label x y
0 Edith Victor 972 995 0.185652 -0.79195...b 974 913 -0.483125 -0.863262
4320 Kevin Ray 1000 929 -0.768262 0.630309
[4321 rows x 6 columns]
dataset = <merlin.io.dataset.Dataset object at 0x7fda5bdf4b80>, cpu = True
engine = 'parquet', dump = True
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
# Make sure we are in cpu formats
if cudf and isinstance(df, cudf.DataFrame):
df = df.to_pandas()
if cpu:
dataset.to_cpu()
cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
norms = ops.Normalize()
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
cats = cat_names >> ops.Categorify()
workflow = nvt.Workflow(conts + cats + label_name)
workflow.fit(dataset)
if dump:
workflow_dir = os.path.join(tmpdir, "workflow")
workflow.save(workflow_dir)
workflow = None
workflow = Workflow.load(workflow_dir)
def get_norms(tar: pd.Series):
df = tar.fillna(0)
df = df * (df >= 0).astype("int")
return df
assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)
# Check that categories match
if engine == "parquet":
cats_expected0 = df["name-cat"].unique()
cats0 = get_cats(workflow, "name-cat", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
cats_expected1 = df["name-string"].unique()
cats1 = get_cats(workflow, "name-string", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])
# Write to new "shuffled" and "processed" dataset
workflow.transform(dataset).to_parquet(
output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
)
dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)
tests/unit/workflow/test_cpu_workflow.py:76:
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init
self.engine = ParquetDatasetEngine(
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init
self._path0,
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0
return next(self._dataset.get_fragments()).path
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset
dataset = pa_ds.dataset(paths, filesystem=fs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???
???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_pa0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_pa0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
pyarrow/error.pxi:99: ArrowInvalid
_______________________ test_cpu_workflow[True-True-csv] _______________________
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs0')
df = name-string id label x y
0 Victor 972 995 0.185652 -0.791950
1 Ray ... Bob 974 913 -0.483125 -0.863262
2160 Ray 1000 929 -0.768262 0.630309
[4321 rows x 5 columns]
dataset = <merlin.io.dataset.Dataset object at 0x7fda5be6b310>, cpu = True
engine = 'csv', dump = True
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
# Make sure we are in cpu formats
if cudf and isinstance(df, cudf.DataFrame):
df = df.to_pandas()
if cpu:
dataset.to_cpu()
cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
norms = ops.Normalize()
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
cats = cat_names >> ops.Categorify()
workflow = nvt.Workflow(conts + cats + label_name)
workflow.fit(dataset)
if dump:
workflow_dir = os.path.join(tmpdir, "workflow")
workflow.save(workflow_dir)
workflow = None
workflow = Workflow.load(workflow_dir)
def get_norms(tar: pd.Series):
df = tar.fillna(0)
df = df * (df >= 0).astype("int")
return df
assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)
# Check that categories match
if engine == "parquet":
cats_expected0 = df["name-cat"].unique()
cats0 = get_cats(workflow, "name-cat", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
cats_expected1 = df["name-string"].unique()
cats1 = get_cats(workflow, "name-string", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])
# Write to new "shuffled" and "processed" dataset
workflow.transform(dataset).to_parquet(
output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
)
dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)
tests/unit/workflow/test_cpu_workflow.py:76:
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init
self.engine = ParquetDatasetEngine(
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init
self._path0,
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0
return next(self._dataset.get_fragments()).path
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset
dataset = pa_ds.dataset(paths, filesystem=fs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???
???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
pyarrow/error.pxi:99: ArrowInvalid
__________________ test_cpu_workflow[True-True-csv-no-header] __________________
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs1')
df = name-string id label x y
0 Victor 972 995 0.185652 -0.791950
1 Ray ... Bob 974 913 -0.483125 -0.863262
2160 Ray 1000 929 -0.768262 0.630309
[4321 rows x 5 columns]
dataset = <merlin.io.dataset.Dataset object at 0x7fda2475af70>, cpu = True
engine = 'csv-no-header', dump = True
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
# Make sure we are in cpu formats
if cudf and isinstance(df, cudf.DataFrame):
df = df.to_pandas()
if cpu:
dataset.to_cpu()
cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
norms = ops.Normalize()
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
cats = cat_names >> ops.Categorify()
workflow = nvt.Workflow(conts + cats + label_name)
workflow.fit(dataset)
if dump:
workflow_dir = os.path.join(tmpdir, "workflow")
workflow.save(workflow_dir)
workflow = None
workflow = Workflow.load(workflow_dir)
def get_norms(tar: pd.Series):
df = tar.fillna(0)
df = df * (df >= 0).astype("int")
return df
assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)
# Check that categories match
if engine == "parquet":
cats_expected0 = df["name-cat"].unique()
cats0 = get_cats(workflow, "name-cat", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
cats_expected1 = df["name-string"].unique()
cats1 = get_cats(workflow, "name-string", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])
# Write to new "shuffled" and "processed" dataset
workflow.transform(dataset).to_parquet(
output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
)
dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)
tests/unit/workflow/test_cpu_workflow.py:76:
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init
self.engine = ParquetDatasetEngine(
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init
self._path0,
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0
return next(self._dataset.get_fragments()).path
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset
dataset = pa_ds.dataset(paths, filesystem=fs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???
???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs1/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_True_cs1/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
pyarrow/error.pxi:99: ArrowInvalid
____________________ test_cpu_workflow[True-False-parquet] _____________________
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_p0')
df = name-cat name-string id label x y
0 Edith Victor 972 995 0.185652 -0.79195...b 974 913 -0.483125 -0.863262
4320 Kevin Ray 1000 929 -0.768262 0.630309
[4321 rows x 6 columns]
dataset = <merlin.io.dataset.Dataset object at 0x7fda5be59220>, cpu = True
engine = 'parquet', dump = False
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
# Make sure we are in cpu formats
if cudf and isinstance(df, cudf.DataFrame):
df = df.to_pandas()
if cpu:
dataset.to_cpu()
cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
norms = ops.Normalize()
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
cats = cat_names >> ops.Categorify()
workflow = nvt.Workflow(conts + cats + label_name)
workflow.fit(dataset)
if dump:
workflow_dir = os.path.join(tmpdir, "workflow")
workflow.save(workflow_dir)
workflow = None
workflow = Workflow.load(workflow_dir)
def get_norms(tar: pd.Series):
df = tar.fillna(0)
df = df * (df >= 0).astype("int")
return df
assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)
# Check that categories match
if engine == "parquet":
cats_expected0 = df["name-cat"].unique()
cats0 = get_cats(workflow, "name-cat", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
cats_expected1 = df["name-string"].unique()
cats1 = get_cats(workflow, "name-string", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])
# Write to new "shuffled" and "processed" dataset
workflow.transform(dataset).to_parquet(
output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
)
dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)
tests/unit/workflow/test_cpu_workflow.py:76:
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init
self.engine = ParquetDatasetEngine(
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init
self._path0,
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0
return next(self._dataset.get_fragments()).path
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset
dataset = pa_ds.dataset(paths, filesystem=fs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???
???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_p0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_p0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
pyarrow/error.pxi:99: ArrowInvalid
______________________ test_cpu_workflow[True-False-csv] _______________________
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c0')
df = name-string id label x y
0 Victor 972 995 0.185652 -0.791950
1 Ray ... Bob 974 913 -0.483125 -0.863262
2160 Ray 1000 929 -0.768262 0.630309
[4321 rows x 5 columns]
dataset = <merlin.io.dataset.Dataset object at 0x7fda48146d60>, cpu = True
engine = 'csv', dump = False
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
# Make sure we are in cpu formats
if cudf and isinstance(df, cudf.DataFrame):
df = df.to_pandas()
if cpu:
dataset.to_cpu()
cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
norms = ops.Normalize()
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
cats = cat_names >> ops.Categorify()
workflow = nvt.Workflow(conts + cats + label_name)
workflow.fit(dataset)
if dump:
workflow_dir = os.path.join(tmpdir, "workflow")
workflow.save(workflow_dir)
workflow = None
workflow = Workflow.load(workflow_dir)
def get_norms(tar: pd.Series):
df = tar.fillna(0)
df = df * (df >= 0).astype("int")
return df
assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)
# Check that categories match
if engine == "parquet":
cats_expected0 = df["name-cat"].unique()
cats0 = get_cats(workflow, "name-cat", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
cats_expected1 = df["name-string"].unique()
cats1 = get_cats(workflow, "name-string", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])
# Write to new "shuffled" and "processed" dataset
workflow.transform(dataset).to_parquet(
output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
)
dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)
tests/unit/workflow/test_cpu_workflow.py:76:
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init
self.engine = ParquetDatasetEngine(
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init
self._path0,
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0
return next(self._dataset.get_fragments()).path
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset
dataset = pa_ds.dataset(paths, filesystem=fs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???
???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c0/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c0/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
pyarrow/error.pxi:99: ArrowInvalid
_________________ test_cpu_workflow[True-False-csv-no-header] __________________
tmpdir = local('/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c1')
df = name-string id label x y
0 Victor 972 995 0.185652 -0.791950
1 Ray ... Bob 974 913 -0.483125 -0.863262
2160 Ray 1000 929 -0.768262 0.630309
[4321 rows x 5 columns]
dataset = <merlin.io.dataset.Dataset object at 0x7fda486431f0>, cpu = True
engine = 'csv-no-header', dump = False
@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"])
@pytest.mark.parametrize("dump", [True, False])
@pytest.mark.parametrize("cpu", [True])
def test_cpu_workflow(tmpdir, df, dataset, cpu, engine, dump):
# Make sure we are in cpu formats
if cudf and isinstance(df, cudf.DataFrame):
df = df.to_pandas()
if cpu:
dataset.to_cpu()
cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"]
cont_names = ["x", "y", "id"]
label_name = ["label"]
norms = ops.Normalize()
conts = cont_names >> ops.FillMissing() >> ops.Clip(min_value=0) >> norms
cats = cat_names >> ops.Categorify()
workflow = nvt.Workflow(conts + cats + label_name)
workflow.fit(dataset)
if dump:
workflow_dir = os.path.join(tmpdir, "workflow")
workflow.save(workflow_dir)
workflow = None
workflow = Workflow.load(workflow_dir)
def get_norms(tar: pd.Series):
df = tar.fillna(0)
df = df * (df >= 0).astype("int")
return df
assert math.isclose(get_norms(df.x).mean(), norms.means["x"], rel_tol=1e-4)
assert math.isclose(get_norms(df.y).mean(), norms.means["y"], rel_tol=1e-4)
assert math.isclose(get_norms(df.x).std(), norms.stds["x"], rel_tol=1e-3)
assert math.isclose(get_norms(df.y).std(), norms.stds["y"], rel_tol=1e-3)
# Check that categories match
if engine == "parquet":
cats_expected0 = df["name-cat"].unique()
cats0 = get_cats(workflow, "name-cat", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist())
assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None])
cats_expected1 = df["name-string"].unique()
cats1 = get_cats(workflow, "name-string", cpu=True)
# adding the None entry as a string because of move from gpu
assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist())
assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None])
# Write to new "shuffled" and "processed" dataset
workflow.transform(dataset).to_parquet(
output_path=tmpdir, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION
)
dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), cpu=cpu)
tests/unit/workflow/test_cpu_workflow.py:76:
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:303: in init
self.engine = ParquetDatasetEngine(
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:313: in init
self._path0,
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:338: in _path0
return next(self._dataset.get_fragments()).path
/usr/local/lib/python3.8/dist-packages/merlin/io/parquet.py:365: in _dataset
dataset = pa_ds.dataset(paths, filesystem=fs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???
???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c1/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-4/test_cpu_workflow_True_False_c1/part_0.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?
pyarrow/error.pxi:99: ArrowInvalid
=============================== warnings summary ===============================
../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33
/usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
DASK_VERSION = LooseVersion(dask.version)
../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings
/var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
other = LooseVersion(other)
nvtabular/loader/init.py:19
/var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader.
warnings.warn(
tests/unit/test_dask_nvt.py: 1 warning
tests/unit/test_s3.py: 2 warnings
tests/unit/test_tf4rec.py: 1 warning
tests/unit/test_tools.py: 5 warnings
tests/unit/test_triton_inference.py: 8 warnings
tests/unit/loader/test_dataloader_backend.py: 6 warnings
tests/unit/loader/test_tf_dataloader.py: 66 warnings
tests/unit/loader/test_torch_dataloader.py: 67 warnings
tests/unit/ops/test_categorify.py: 69 warnings
tests/unit/ops/test_drop_low_cardinality.py: 2 warnings
tests/unit/ops/test_fill.py: 8 warnings
tests/unit/ops/test_hash_bucket.py: 4 warnings
tests/unit/ops/test_join.py: 88 warnings
tests/unit/ops/test_lambda.py: 1 warning
tests/unit/ops/test_normalize.py: 9 warnings
tests/unit/ops/test_ops.py: 11 warnings
tests/unit/ops/test_ops_schema.py: 17 warnings
tests/unit/workflow/test_workflow.py: 27 warnings
tests/unit/workflow/test_workflow_chaining.py: 1 warning
tests/unit/workflow/test_workflow_node.py: 1 warning
tests/unit/workflow/test_workflow_schemas.py: 1 warning
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
tests/unit/test_dask_nvt.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files.
warnings.warn(
tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers
/usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters.
warnings.warn(
tests/unit/test_notebooks.py: 1 warning
tests/unit/test_tools.py: 17 warnings
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 54 warnings
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future
warnings.warn(
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 12 warnings
tests/unit/workflow/test_workflow.py: 9 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files.
warnings.warn(
tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet]
tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet]
tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True]
/usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
self._setitem_single_block(indexer, value, name)
tests/unit/workflow/test_cpu_workflow.py: 6 warnings
tests/unit/workflow/test_workflow.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files.
warnings.warn(
tests/unit/workflow/test_workflow.py: 48 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files.
warnings.warn(
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None]
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files.
warnings.warn(
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================== short test summary info ============================
FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-True-device-150-csv-0.1]
FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-True-None-0-csv-0.1]
FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-True-None-150-csv-0.1]
FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-False-device-0-csv-0.1]
FAILED tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-None-False-None-150-csv-0.1]
FAILED tests/unit/test_dask_nvt.py::test_dask_preproc_cpu[True-None-parquet]
FAILED tests/unit/test_dask_nvt.py::test_dask_preproc_cpu[True-None-csv-no-header]
FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-True-parquet]
FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-True-csv]
FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-True-csv-no-header]
FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-False-parquet]
FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-False-csv]
FAILED tests/unit/workflow/test_cpu_workflow.py::test_cpu_workflow[True-False-csv-no-header]
===== 13 failed, 1419 passed, 1 skipped, 619 warnings in 711.56s (0:11:51) =====
Build step 'Execute shell' marked build as failure
Performing Post build task...
Match found for : : True
Logical operation result is TRUE
Running script : #!/bin/bash
cd /var/jenkins_home/
CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log"
[nvtabular_tests] $ /bin/bash /tmp/jenkins242368713350332231.sh
Click to view CI Results
GitHub pull request #1636 of commit a74290155fced269fd77fc726a919d1645bf8cc6, no merge conflicts.
Running as SYSTEM
Setting status of a74290155fced269fd77fc726a919d1645bf8cc6 to PENDING with url http://10.20.17.181:8080/job/nvtabular_tests/4634/ and message: 'Build started for merge commit.'
Using context: Jenkins Unit Test Run
Building on master in workspace /var/jenkins_home/workspace/nvtabular_tests
using credential nvidia-merlin-bot
Cloning the remote Git repository
Cloning repository https://github.com/NVIDIA-Merlin/NVTabular.git
> git init /var/jenkins_home/workspace/nvtabular_tests/nvtabular # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
> git --version # timeout=10
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
> git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10
> git config remote.origin.url https://github.com/NVIDIA-Merlin/NVTabular.git # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/NVTabular.git
using GIT_ASKPASS to set credentials This is the bot credentials for our CI/CD
> git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/NVTabular.git +refs/pull/1636/*:refs/remotes/origin/pr/1636/* # timeout=10
> git rev-parse a74290155fced269fd77fc726a919d1645bf8cc6^{commit} # timeout=10
Checking out Revision a74290155fced269fd77fc726a919d1645bf8cc6 (detached)
> git config core.sparsecheckout # timeout=10
> git checkout -f a74290155fced269fd77fc726a919d1645bf8cc6 # timeout=10
Commit message: "Merge branch 'main' into fix/groupby-columns"
> git rev-list --no-walk 35f7c158c6023ef878644de0b65dbdfa3d28b609 # timeout=10
First time build. Skipping changelog.
[nvtabular_tests] $ /bin/bash /tmp/jenkins16867787764162096748.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/nvtabular_tests/nvtabular, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 1431 items / 1 skipped
tests/unit/test_dask_nvt.py ............................................ [ 3%]
........................................................................ [ 8%]
.... [ 8%]
tests/unit/test_notebooks.py ...... [ 8%]
tests/unit/test_tf4rec.py . [ 8%]
tests/unit/test_tools.py ...................... [ 10%]
tests/unit/test_triton_inference.py ................................ [ 12%]
tests/unit/framework_utils/test_tf_feature_columns.py . [ 12%]
tests/unit/framework_utils/test_tf_layers.py ........................... [ 14%]
................................................... [ 18%]
tests/unit/framework_utils/test_torch_layers.py . [ 18%]
tests/unit/loader/test_dataloader_backend.py ...... [ 18%]
tests/unit/loader/test_tf_dataloader.py ................................ [ 20%]
........................................s.. [ 23%]
tests/unit/loader/test_torch_dataloader.py ............................. [ 25%]
...................................................... [ 29%]
tests/unit/ops/test_categorify.py ...................................... [ 32%]
........................................................................ [ 37%]
........................................... [ 40%]
tests/unit/ops/test_column_similarity.py ........................ [ 42%]
tests/unit/ops/test_drop_low_cardinality.py .. [ 42%]
tests/unit/ops/test_fill.py ............................................ [ 45%]
........ [ 45%]
tests/unit/ops/test_groupyby.py ...................... [ 47%]
tests/unit/ops/test_hash_bucket.py ......................... [ 49%]
tests/unit/ops/test_join.py ............................................ [ 52%]
........................................................................ [ 57%]
.................................. [ 59%]
tests/unit/ops/test_lambda.py .......... [ 60%]
tests/unit/ops/test_normalize.py ....................................... [ 63%]
.. [ 63%]
tests/unit/ops/test_ops.py ............................................. [ 66%]
.................... [ 67%]
tests/unit/ops/test_ops_schema.py ...................................... [ 70%]
........................................................................ [ 75%]
........................................................................ [ 80%]
........................................................................ [ 85%]
....................................... [ 88%]
tests/unit/ops/test_reduce_dtype_size.py .. [ 88%]
tests/unit/ops/test_target_encode.py ..................... [ 89%]
tests/unit/workflow/test_cpu_workflow.py ...... [ 90%]
tests/unit/workflow/test_workflow.py ................................... [ 92%]
.......................................................... [ 96%]
tests/unit/workflow/test_workflow_chaining.py ... [ 96%]
tests/unit/workflow/test_workflow_node.py ........... [ 97%]
tests/unit/workflow/test_workflow_ops.py ... [ 97%]
tests/unit/workflow/test_workflow_schemas.py ........................... [ 99%]
... [100%]
=============================== warnings summary ===============================
../../../../../usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33
/usr/local/lib/python3.8/dist-packages/dask_cudf/core.py:33: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
DASK_VERSION = LooseVersion(dask.version)
../../../.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: 34 warnings
/var/jenkins_home/.local/lib/python3.8/site-packages/setuptools/_distutils/version.py:346: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
other = LooseVersion(other)
nvtabular/loader/init.py:19
/var/jenkins_home/workspace/nvtabular_tests/nvtabular/nvtabular/loader/init.py:19: DeprecationWarning: The nvtabular.loader module has moved to merlin.models.loader. Support for importing from nvtabular.loader is deprecated, and will be removed in a future version. Please update your imports to refer to merlin.models.loader.
warnings.warn(
tests/unit/test_dask_nvt.py::test_dask_workflow_api_dlrm[True-Shuffle.PER_WORKER-True-device-0-parquet-0.1]
/usr/local/lib/python3.8/dist-packages/tornado/ioloop.py:350: DeprecationWarning: make_current is deprecated; start the event loop first
self.make_current()
tests/unit/test_dask_nvt.py: 1 warning
tests/unit/test_tf4rec.py: 1 warning
tests/unit/test_tools.py: 5 warnings
tests/unit/test_triton_inference.py: 8 warnings
tests/unit/loader/test_dataloader_backend.py: 6 warnings
tests/unit/loader/test_tf_dataloader.py: 66 warnings
tests/unit/loader/test_torch_dataloader.py: 67 warnings
tests/unit/ops/test_categorify.py: 69 warnings
tests/unit/ops/test_drop_low_cardinality.py: 2 warnings
tests/unit/ops/test_fill.py: 8 warnings
tests/unit/ops/test_hash_bucket.py: 4 warnings
tests/unit/ops/test_join.py: 88 warnings
tests/unit/ops/test_lambda.py: 1 warning
tests/unit/ops/test_normalize.py: 9 warnings
tests/unit/ops/test_ops.py: 11 warnings
tests/unit/ops/test_ops_schema.py: 17 warnings
tests/unit/workflow/test_workflow.py: 27 warnings
tests/unit/workflow/test_workflow_chaining.py: 1 warning
tests/unit/workflow/test_workflow_node.py: 1 warning
tests/unit/workflow/test_workflow_schemas.py: 1 warning
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(
tests/unit/test_dask_nvt.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 8 files.
warnings.warn(
tests/unit/test_dask_nvt.py::test_merlin_core_execution_managers
/usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:431: UserWarning: Existing Dask-client object detected in the current context. New cuda cluster will not be deployed. Set force_new to True to ignore running clusters.
warnings.warn(
tests/unit/test_notebooks.py: 1 warning
tests/unit/test_tools.py: 17 warnings
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 54 warnings
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:2940: FutureWarning: Series.ceil and DataFrame.ceil are deprecated and will be removed in the future
warnings.warn(
tests/unit/loader/test_tf_dataloader.py: 2 warnings
tests/unit/loader/test_torch_dataloader.py: 12 warnings
tests/unit/workflow/test_workflow.py: 9 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 2 files.
warnings.warn(
tests/unit/ops/test_fill.py::test_fill_missing[True-True-parquet]
tests/unit/ops/test_fill.py::test_fill_missing[True-False-parquet]
tests/unit/ops/test_ops.py::test_filter[parquet-0.1-True]
/usr/local/lib/python3.8/dist-packages/pandas/core/indexing.py:1732: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
self._setitem_single_block(indexer, value, name)
tests/unit/workflow/test_cpu_workflow.py: 6 warnings
tests/unit/workflow/test_workflow.py: 12 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 1 files did not have enough partitions to create 10 files.
warnings.warn(
tests/unit/workflow/test_workflow.py: 48 warnings
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 20 files.
warnings.warn(
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_parquet_output[True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[True-True-None]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_WORKER]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-Shuffle.PER_PARTITION]
tests/unit/workflow/test_workflow.py::test_workflow_apply[False-True-None]
/usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:862: UserWarning: Only created 2 files did not have enough partitions to create 4 files.
warnings.warn(
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========== 1430 passed, 2 skipped, 618 warnings in 694.19s (0:11:34) ===========
Performing Post build task...
Match found for : : True
Logical operation result is TRUE
Running script : #!/bin/bash
cd /var/jenkins_home/
CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/NVTabular/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log"
[nvtabular_tests] $ /bin/bash /tmp/jenkins15785426595632365215.sh
Having updated the branch to the latest main, it looks like this PR doesn't work on CPU with Pandas. 😅