HybridBackend icon indicating copy to clipboard operation
HybridBackend copied to clipboard

error: Variables not initialized: communicator/1/HbNcclCommHandleOp

Open shijieliu opened this issue 1 year ago • 0 comments

Current behavior

2022-10-19 12:39:39.948019: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2022-10-19 12:39:39.948020: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
INFO:tensorflow:Parsing ../data//train.csv
INFO:tensorflow:Parsing ../data//train.csv
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
2022-10-19 12:39:43.135528: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
2022-10-19 12:39:43.136209: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x53d08e0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.136227: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2022-10-19 12:39:43.137613: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2022-10-19 12:39:43.147796: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
2022-10-19 12:39:43.148600: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x4190950 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.148638: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2022-10-19 12:39:43.150144: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2022-10-19 12:39:43.263791: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x6a91880
2022-10-19 12:39:43.263977: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.264217: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x6a644c0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.264241: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
2022-10-19 12:39:43.264400: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.264809: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
pciBusID: 0000:08:00.0
2022-10-19 12:39:43.264837: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2022-10-19 12:39:43.267560: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2022-10-19 12:39:43.267587: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
2022-10-19 12:39:43.272210: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x51f3c70
2022-10-19 12:39:43.272373: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.272643: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x51bf3f0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2022-10-19 12:39:43.272666: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
2022-10-19 12:39:43.272784: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.272986: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
pciBusID: 0000:07:00.0
2022-10-19 12:39:43.273007: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2022-10-19 12:39:43.275711: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2022-10-19 12:39:43.275737: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
2022-10-19 12:39:43.288994: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
2022-10-19 12:39:43.289162: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
2022-10-19 12:39:43.289498: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
2022-10-19 12:39:43.290069: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
2022-10-19 12:39:43.290150: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
2022-10-19 12:39:43.290231: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.290471: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.290643: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
2022-10-19 12:39:43.292542: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
2022-10-19 12:39:43.292558: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
2022-10-19 12:39:43.292564: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
2022-10-19 12:39:43.292640: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.292843: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.293058: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:worker/replica:0/task:0/device:GPU:0 with 9793 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:08:00.0, compute capability: 7.5)
2022-10-19 12:39:43.294188: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> 127.0.0.1:20001}
2022-10-19 12:39:43.294199: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> localhost:20002}
2022-10-19 12:39:43.294986: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20002
2022-10-19 12:39:43.297217: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
2022-10-19 12:39:43.297453: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
2022-10-19 12:39:43.297814: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
2022-10-19 12:39:43.298428: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
2022-10-19 12:39:43.298520: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
2022-10-19 12:39:43.298607: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.298845: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.299025: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
2022-10-19 12:39:43.301007: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
2022-10-19 12:39:43.301024: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
2022-10-19 12:39:43.301031: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
2022-10-19 12:39:43.301114: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.301327: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-19 12:39:43.301552: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:chief/replica:0/task:0/device:GPU:0 with 9729 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:07:00.0, compute capability: 7.5)
2022-10-19 12:39:43.302687: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> localhost:20001}
2022-10-19 12:39:43.302705: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> 127.0.0.1:20002}
2022-10-19 12:39:43.303434: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20001
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:run without loading checkpoint
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:run without loading checkpoint
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
Using TensorFlow version 1.15.5
Checking dataset...
Numbers of training dataset is 8000000
The training steps is 100
Traceback (most recent call last):
  File "benchmark_hb.py", line 405, in <module>
    main()
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
    return fn(*args, **kwargs)
  File "benchmark_hb.py", line 339, in main
    config=sess_config) as sess:
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
    sess = fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
    stop_grace_period_secs=stop_grace_period_secs)
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
    session_creator, hooks, should_recover=True, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
    self._sess = _RecoverableSession(self._coordinated_creator)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
    _WrappedSession.__init__(self, self._create_session())
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
    return self._sess_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
    self.tf_sess = self._session_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
    init_fn=self._scaffold.init_fn)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
    (_maybe_name(init_op), init_fn, self._local_init_op, msg))
RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
op: "NoOp"
input: "^group_deps_1/NoOp"
input: "^group_deps_1/NoOp_1"
device: "/job:chief/task:0/device:GPU:0"
, error: Variables not initialized: communicator/0/HbNcclCommHandleOp
Using TensorFlow version 1.15.5
Checking dataset...
Numbers of training dataset is 8000000
The training steps is 100
Traceback (most recent call last):
  File "benchmark_hb.py", line 405, in <module>
    main()
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
    return fn(*args, **kwargs)
  File "benchmark_hb.py", line 339, in main
    config=sess_config) as sess:
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
    sess = fn(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
    stop_grace_period_secs=stop_grace_period_secs)
  File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
    session_creator, hooks, should_recover=True, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
    self._sess = _RecoverableSession(self._coordinated_creator)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
    _WrappedSession.__init__(self, self._create_session())
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
    return self._sess_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
    self.tf_sess = self._session_creator.create_session()
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
    init_fn=self._scaffold.init_fn)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
    (_maybe_name(init_op), init_fn, self._local_init_op, msg))
RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
op: "NoOp"
input: "^group_deps_1/NoOp"
input: "^group_deps_1/NoOp_1"
device: "/job:worker/task:0/device:GPU:0"
, error: Variables not initialized: communicator/1/HbNcclCommHandleOp

Expected behavior

code run well

System information

  • GPU model and memory: 2080Ti
  • OS Platform:
  • Docker version:
  • GCC/CUDA/cuDNN version: cuda 11.4
  • Python/conda version:
  • TensorFlow/PyTorch version: DeepRec, commit message: 6bca2cc4e6acaca3766e0425b53bdd

Code to reproduce

  1. Download the train dataset(in csv format) from https://storage.googleapis.com/dataset-uploader/criteo-kaggle/large_version/train.csv
  2. The training script
# Copyright (c) 2022 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from tensorflow.python.framework import dtypes
import numpy as np
from ast import arg
import time
import argparse
import tensorflow as tf
import os
import sys
import math
import collections
from tensorflow.python.client import timeline
import json

from tensorflow.python.framework import sparse_tensor
from tensorflow.python.feature_column import feature_column_v2 as fc
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.framework import ops
os.environ["TF_GPU_THREAD_MODE"] = "global"
import hybridbackend.tensorflow as hb

# Set to INFO for tracking training, default is WARN. ERROR for least messages
tf.logging.set_verbosity(tf.logging.INFO)
print("Using TensorFlow version %s" % (tf.__version__))

# Definition of some constants
CONTINUOUS_COLUMNS = ['I' + str(i) for i in range(1, 14)]  # 1-13 inclusive
CATEGORICAL_COLUMNS = ['C' + str(i) for i in range(1, 27)]  # 1-26 inclusive
LABEL_COLUMN = ['clicked']
TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
HASH_BUCKET_SIZES = {
    'C1': 2500,
    'C2': 2000,
    'C3': 300000,
    'C4': 250000,
    'C5': 1000,
    'C6': 100,
    'C7': 20000,
    'C8': 4000,
    'C9': 20,
    'C10': 100000,
    'C11': 10000,
    'C12': 250000,
    'C13': 40000,
    'C14': 100,
    'C15': 100,
    'C16': 200000,
    'C17': 50,
    'C18': 10000,
    'C19': 4000,
    'C20': 20,
    'C21': 250000,
    'C22': 100,
    'C23': 100,
    'C24': 250000,
    'C25': 400,
    'C26': 100000
}

EMBEDDING_DIMENSIONS = {
    'C1': 64,
    'C2': 64,
    'C3': 128,
    'C4': 128,
    'C5': 64,
    'C6': 64,
    'C7': 64,
    'C8': 64,
    'C9': 64,
    'C10': 128,
    'C11': 64,
    'C12': 128,
    'C13': 64,
    'C14': 64,
    'C15': 64,
    'C16': 128,
    'C17': 64,
    'C18': 64,
    'C19': 64,
    'C20': 64,
    'C21': 128,
    'C22': 64,
    'C23': 64,
    'C24': 128,
    'C25': 64,
    'C26': 128
}


def transform_numeric(feature):
    r'''Transform numeric features.
    '''
    # Notes: Statistics of Kaggle's Criteo Dataset has been calculated in advance to save time.
    mins_list = [
        0.0, -3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
    ]
    range_list = [
        1539.0, 22069.0, 65535.0, 561.0, 2655388.0, 233523.0, 26297.0, 5106.0,
        24376.0, 9.0, 181.0, 1807.0, 6879.0
    ]

    def make_minmaxscaler(min, range):
        def minmaxscaler(col):
            return (col - min) / range

        return minmaxscaler

    numeric_list = []

    for column_name in CONTINUOUS_COLUMNS:
        normalizer_fn = None
        i = CONTINUOUS_COLUMNS.index(column_name)
        normalizer_fn = make_minmaxscaler(mins_list[i], range_list[i])
        numeric = normalizer_fn(feature[column_name])
        numeric_list.append(tf.reshape(numeric, shape=[-1, 1]))
    return numeric_list


def transform_categorical(feature):
    r'''Transform categorical features.
    '''
    deep_features = []
    max_value = np.iinfo(dtypes.int64.as_numpy_dtype).max

    variables = []
    indices = []
    for column_name in CATEGORICAL_COLUMNS:
        ev_opt = tf.EmbeddingVariableOption(
            evict_option=None, filter_option=None)
        device_str = '/gpu'
        with tf.device(device_str), hb.scope(sharding=True):
            embedding_weights = tf.get_embedding_variable(
                f'{column_name}_weight',
                initializer=tf.random_normal_initializer(
                    mean=0.0, stddev=0.05
                ),
                embedding_dim=EMBEDDING_DIMENSIONS[column_name],
                ev_option=ev_opt
            )

        category = tf.strings.to_hash_bucket_fast(
            feature[column_name], max_value)
        sparse_tensor = fc._to_sparse_input_and_drop_ignore_values(category)
        sparse_tensor = tf.sparse.reshape(sparse_tensor, (-1, 1))
        
        deep_features.append(tf.nn.embedding_lookup_sparse(
            embedding_weights, sparse_tensor, None))
        
        variables.append(embedding_weights)
        indices.append(sparse_tensor)
    return deep_features


def stacked_dcn_v2(features, mlp_dims):
    r'''Stacked DCNv2.

    DCNv2: Improved Deep & Cross Network and Practical Lessons for Web-scale
    Learning to Rank Systems.

    See https://arxiv.org/abs/2008.13535 for more information.
    '''
    with tf.name_scope('cross'):
        cross_input = tf.concat(features, axis=-1)
        cross_input_shape = [-1, sum([f.shape[-1] for f in features])]
        cross_input = tf.reshape(cross_input, cross_input_shape)
        cross_input_sq = tf.layers.dense(
            cross_input, cross_input.shape[-1],
            activation=tf.nn.relu,
            kernel_initializer=tf.truncated_normal_initializer(),
            bias_initializer=tf.zeros_initializer())
        cross_output = cross_input * cross_input_sq + cross_input
        cross_output = tf.reshape(cross_output, [-1, cross_input.shape[1]])
        cross_output_dim = (len(features) * (len(features) + 1)) / 2

    with tf.name_scope('mlp'):
        prev_layer = cross_output
        prev_dim = cross_output_dim
        for i, d in enumerate(mlp_dims[:-1]):
            prev_layer = tf.layers.dense(
                prev_layer, d,
                activation=tf.nn.relu,
                kernel_initializer=tf.random_normal_initializer(
                    mean=0.0,
                    stddev=math.sqrt(2.0 / (prev_dim + d))),
                bias_initializer=tf.random_normal_initializer(
                    mean=0.0,
                    stddev=math.sqrt(1.0 / d)),
                name=f'mlp_{i}')
            prev_dim = d
        return tf.layers.dense(
            prev_layer, mlp_dims[-1],
            activation=tf.nn.sigmoid,
            kernel_initializer=tf.random_normal_initializer(
                mean=0.0,
                stddev=math.sqrt(2.0 / (prev_dim + mlp_dims[-1]))),
            bias_initializer=tf.random_normal_initializer(
                mean=0.0,
                stddev=math.sqrt(1.0 / mlp_dims[-1])),
            name=f'mlp_{len(mlp_dims) - 1}')


# generate dataset pipline
def build_model_input(filename, batch_size, num_epochs):
    def parse_csv(value):
        tf.logging.info('Parsing {}'.format(filename))
        cont_defaults = [[0.0] for i in range(1, 14)]
        cate_defaults = [[' '] for i in range(1, 27)]
        label_defaults = [[0]]
        column_headers = TRAIN_DATA_COLUMNS
        record_defaults = label_defaults + cont_defaults + cate_defaults
        columns = tf.io.decode_csv(value, record_defaults=record_defaults)
        all_columns = collections.OrderedDict(zip(column_headers, columns))
        labels = all_columns.pop(LABEL_COLUMN[0])
        features = all_columns
        return features, labels

    '''Work Queue Feature'''
    if args.workqueue:
        from tensorflow.python.ops.work_queue import WorkQueue
        work_queue = WorkQueue([filename])
        # For multiple files:
        # work_queue = WorkQueue([filename, filename1,filename2,filename3])
        files = work_queue.input_dataset()
    else:
        files = filename
    # Extract lines from input files using the Dataset API.
    dataset = tf.data.TextLineDataset(files)
    dataset = dataset.shuffle(buffer_size=20000,
                              seed=args.seed)  # fix seed for reproducing
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_csv, num_parallel_calls=28)
    dataset = dataset.prefetch(2)
    return dataset

@hb.function()
def main():
    
    # check dataset and count data set size
    print("Checking dataset...")
    train_file = args.data_location + '/train.csv'
    if (not os.path.exists(train_file)):
        print("Dataset does not exist in the given data_location.")
        sys.exit()
    no_of_training_examples = sum(1 for line in open(train_file))
    print("Numbers of training dataset is {}".format(no_of_training_examples))

    # set batch size, eporch & steps
    batch_size = args.batch_size

    if args.steps == 0:
        no_of_epochs = 1
        train_steps = math.ceil(
            (float(no_of_epochs) * no_of_training_examples) / batch_size)
    else:
        no_of_epochs = math.ceil(
            (float(batch_size) * args.steps) / no_of_training_examples)
        train_steps = args.steps
    print("The training steps is {}".format(train_steps))

    # set fixed random seed
    tf.set_random_seed(args.seed)

    # create data pipline of train & test dataset
    with tf.device('/cpu:0'):
        train_dataset = build_model_input(train_file, batch_size, no_of_epochs)

        iterator = tf.data.Iterator.from_structure(train_dataset.output_types,
                                                train_dataset.output_shapes)
        next_element = iterator.get_next()

    train_init_op = iterator.make_initializer(train_dataset)

    # create feature column
    feature, labels = next_element[0], next_element[1]

    deep_features = transform_categorical(feature)
    wide_features = transform_numeric(feature)
    logits = stacked_dcn_v2(features=deep_features + wide_features,
                            mlp_dims=[1024, 1024, 512, 256, 1]
                            )
    loss = tf.reduce_mean(tf.keras.losses.binary_crossentropy(tf.reshape(labels, (-1, 1)), logits))

    step = tf.train.get_or_create_global_step()
    opt = tf.train.AdagradOptimizer(learning_rate=0.01)
    train_op = opt.minimize(loss, global_step=step)

    # Session config
    sess_config = tf.ConfigProto()

    # # Session hooks
    hooks = []

    # if args.smartstaged and not args.tf:
    #     '''Smart staged Feature'''
    #     next_element = tf.staged(next_element, num_threads=4, capacity=40)
    #     sess_config.graph_options.optimizer_options.do_smart_stage = True
    #     hooks.append(tf.make_prefetch_hook())
    # if args.op_fusion and not args.tf:
    #     '''Auto Graph Fusion'''
    #     sess_config.graph_options.optimizer_options.do_op_fusion = True
    # if args.micro_batch and not args.tf:
    #     '''Auto Mirco Batch'''
    #     sess_config.graph_options.optimizer_options.micro_batch_num = args.micro_batch

    scaffold = tf.train.Scaffold(
        local_init_op=tf.group(
            tf.local_variables_initializer(), train_init_op),
    )

    stop_hook = tf.train.StopAtStepHook(last_step=train_steps)
    log_hook = tf.train.LoggingTensorHook(
        {
            'steps': step,
            'loss': loss,
        }, every_n_iter=1)
    hooks.append(stop_hook)
    hooks.append(log_hook)

    with tf.train.MonitoredTrainingSession(
            master='',
            hooks=hooks,
            scaffold=scaffold,
            config=sess_config) as sess:
        while not sess.should_stop():
            print(sess.run([feature]))
            sess.run([loss, train_op])
    print("Training completed.")


def boolean_string(string):
    low_string = string.lower()
    if low_string not in {'false', 'true'}:
        raise ValueError('Not a valid boolean string')
    return low_string == 'true'


# Get parse
def get_arg_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument('--data_location',
                        help='Full path of train data',
                        required=False,
                        default='./data')
    parser.add_argument('--steps',
                        help='set the number of steps on train dataset',
                        type=int,
                        default=0)
    parser.add_argument('--batch_size',
                        help='Batch size to train. Default is 512',
                        type=int,
                        default=512)
    parser.add_argument('--seed',
                        help='set the random seed for tensorflow',
                        type=int,
                        default=2021)
    parser.add_argument('--workqueue',
                        help='Whether to enable Work Queue. Default to False.',
                        type=boolean_string,
                        default=False)
    return parser


# Some DeepRec's features are enabled by ENV.
# This func is used to set ENV and enable these features.
# A triple quotes comment is used to introduce these features and play an emphasizing role.
def set_env_for_DeepRec():
    '''
    Set some ENV for these DeepRec's features enabled by ENV. 
    More Detail information is shown in https://deeprec.readthedocs.io/zh/latest/index.html.
    START_STATISTIC_STEP & STOP_STATISTIC_STEP: On CPU platform, DeepRec supports memory optimization
        in both stand-alone and distributed trainging. It's default to open, and the 
        default start and stop steps of collection is 1000 and 1100. Reduce the initial 
        cold start time by the following settings.
    MALLOC_CONF: On CPU platform, DeepRec can use memory optimization with the jemalloc library.
        Please preload libjemalloc.so by `LD_PRELOAD=./libjemalloc.so.2 python ...`
    '''
    os.environ['START_STATISTIC_STEP'] = '100'
    os.environ['STOP_STATISTIC_STEP'] = '110'
    os.environ['MALLOC_CONF'] = \
        'background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000'


if __name__ == '__main__':
    parser = get_arg_parser()
    args = parser.parse_args()

    set_env_for_DeepRec()

    main()
  1. Training command:
python -m hybridbackend.run  python benchmark_hb.py --data_location ../data/ --steps 100 

Willing to contribute

Yes

shijieliu avatar Oct 19 '22 12:10 shijieliu