Efficient training of deep recommenders on cloud.

Overview

HybridBackend

cibuild: cpu Documentation Status

Introduction

HybridBackend is a training framework for deep recommenders which bridges the gap between evolving cloud infrastructure and complex training process. See documentation for more information.

bridging

Installation

Install latest CPU version for TensorFlow 1.15:

pip install hybridbackend-cpu

Install latest CPU version for TensorFlow 1.14:

pip install hybridbackend-cpu-legacy

Note:

You might need to upgrade pip before above installations:

pip install -U pip

Contributing

We appreciate all contributions to improve HybridBackend. Please follow below steps to contribute:

1. Clone the repository and checkout a new branch.

git clone <git_repo_addr>
git pull -r
git checkout -b features/my_feature

2. Commit changes, check code style and test.

git commit
cibuild/run cibuild/format
cibuild/run cibuild/lint
cibuild/run make -j8
cibuild/run make test

3. Create pull request for code review.

Comments
  • Using shuffle or rebatch may cause OOM problem

    Using shuffle or rebatch may cause OOM problem

    1. Current behavior

    Using shuffle or rebatch may cause OOM problem.

    1.1 小文件测试记录

    total parquet file count: 15780

    total parquet file size: 126G

    total sample count: 600w

    No. | Test Scenarios | RAM Usage -- | -- | -- 1 | no shuffle no rebatch | use 50G ram stable 2 | shuffle,buffer_size=2048 rebatch,batch_size=8192 | start at 53G ram, and increasingly use more ram rapidly, exceed 94G ram limit within 2 minutes 3 | shuffle,buffer_size=8 rebatch,batch_size=8192 | increasingly use more ram slowly, exceed 94G ram limit after 2 hours 4 | no shuffle rebatch,batch_size=8192 | increasingly use more ram slowly, exceed 94G ram limit after 2 hours 5 | shuffle,buffer_size=8 no rebatch | use 50G ram stable at first, but increasingly use more ram after 1 hour

    image-20220303211253747

    image-20220303211236154

    image-20220303211125580

    image-20220303211314882

    image-20220303211215059

    1.2 大文件测试记录

    total parquet file count: 240

    total parquet file size: 126G

    单个parquet文件大小:500MB

    total sample count: 600w

    微信图片_20220323124551

    微信图片_20220323124557

    现象:每个epoch训练结束时,有明显内存回收的过程,但是回收不干净,导致每个epoch后使用的内存峰值越来越多,最终OOM。但是如果1、2个epoch内能训练完,内存不会爆。

    1.3 不同训练方式测试对比

    | | 新sdk | 旧sdk | | ------------ | ------------- | ------------------------------------------------------------ | | 是否内存溢出 | 是 | 否 | | 环境 | T4 单机单卡 | T4 单机单卡 | | 训练方式 | session.run() | tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) | | 调用链 | 见下图1 | 见下图2 |

    微信图片_20220323124545

    微信图片_20220323124542

    2. Expected behavior

    During the training process, tensorflow should use stable amount of RAM, not using more and more RAM.

    3. System information

    • OS Platform and Distribution: Ubuntu 18.04.5 LTS
    • TensorFlow version: 1.15.0
    • Python version: 3.6
    • CUDA/cuDNN version: 10.1
    • RAM: 94G
    • GPU model and memory: Tesla T4, 16G

    4. Code to reproduce

    BATCH_SIZE = 8192
    
    parquet_file_list = ['some_parquet_file1.snappy.parquet', 'some_parquet_file2.snappy.parquet', ...]
    filenames_ds = tf.data.TFRecordDataset.from_tensor_slices(file_list)
    hb_fields = []
    hb_fields.append(hb.data.DataFrame.Field('int_field', tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field('float_field', tf.float32, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field('array_field', tf.float32, ragged_rank=1))   # ... and some anthor fields
    
    
    # 1. no shuffle, no rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    
    # 2. big shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(2048)
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 3. small shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(8)
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 4. no shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 5. small shuffle and no rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(8)
    

    Willing to contribute

    Yes

    opened by liurcme 5
  • Question: Data Loading Performance with 150G Byte/s

    Question: Data Loading Performance with 150G Byte/s

    Hi, thanks for open source this project,it's a great job!🥂 🍻

    I saw the Data Loading doc here,the ParquetDataset is to solve IO performance issues on cloud.

    According to the doc, the speed of reading and decoding ParquetDataset is about 150G Byte/s (3346.10MB/21.67ms) , equals max throughput of 12X 100G bit/s NIC, it's nearly impossible on cloud(hdfs/oss/s3).

    File Format | Size (MB) | Framework | #Threads | Elapsed (ms) ----------- | --------- | ------------- | -------- | ------------ CSV | 11062.61 | Tensorflow | 1 | 8558.38 Parquet | 3346.10 | Tensorflow IO | 1 | 103056.71 Parquet | 3346.10 | HybridBackend | 1 | 397.88 Parquet | 3346.10 | HybridBackend | 20 | 21.67

    Is it convenient to provide details of test environment? Apart from code of Dataset module, will HybridBackend engine code be released in the future?

    Thanks 🥂 🍻

    opened by neuzxy 4
  • What version of snappy should I install for building HB from source?

    What version of snappy should I install for building HB from source?

    Summary

    I am trying to build HB from source, when i use the make -j8 command from the work dir, i get the following error :

    (base) [email protected]:/HybridBackend# make -j8
    mkdir -p /root/projects/tmp/HybridBackend/arrow/build/
    ARROW_INSTALL=/root/projects/tmp/HybridBackend/arrow/dist \
    ARROW_BUILD=/root/projects/tmp/HybridBackend/arrow/build \
    ARROW_OSX_TARGET= \
    USE_CXX11_ABI=0 \
    WITH_ARROW_HDFS=ON \
    WITH_ARROW_S3=ON \
    SIMD_LEVEL=AVX2 \
    OS=Linux \
    bash arrow/build.sh
    -- Building using CMake version: 3.16.3
    -- Arrow version: 5.0.0 (full: '5.0.0')
    -- Arrow SO version: 500 (full: 500.0.0)
    -- clang-tidy not found
    -- clang-format not found
    -- Could NOT find ClangTools (missing: CLANG_FORMAT_BIN CLANG_TIDY_BIN)
    -- infer not found
    -- Found cpplint executable at /root/projects/tmp/HybridBackend/arrow/src/cpp/build-support/cpplint.py
    -- System processor: x86_64
    -- Arrow build warning level: PRODUCTION
    Using ld linker
    Configured for RELEASE build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})
    -- Build Type: RELEASE
    -- Using CONDA approach to find dependencies
    -- Using CONDA_PREFIX for ARROW_PACKAGE_PREFIX: /root/miniconda3
    -- Setting (unset) dependency *_ROOT variables: /root/miniconda3
    -- ARROW_ABSL_BUILD_VERSION: 0f3bb466b868b523cf1dc9b2aaaed65c77b28862
    -- ARROW_AWSSDK_BUILD_VERSION: 1.8.133
    -- ARROW_AWS_CHECKSUMS_BUILD_VERSION: v0.1.10
    -- ARROW_AWS_C_COMMON_BUILD_VERSION: v0.5.10
    -- ARROW_AWS_C_EVENT_STREAM_BUILD_VERSION: v0.1.5
    -- ARROW_BOOST_BUILD_VERSION: 1.75.0
    -- ARROW_BROTLI_BUILD_VERSION: v1.0.9
    -- ARROW_BZIP2_BUILD_VERSION: 1.0.8
    -- ARROW_CARES_BUILD_VERSION: 1.17.1
    -- ARROW_GBENCHMARK_BUILD_VERSION: v1.5.2
    -- ARROW_GFLAGS_BUILD_VERSION: v2.2.2
    -- ARROW_GLOG_BUILD_VERSION: v0.4.0
    -- ARROW_GRPC_BUILD_VERSION: v1.35.0
    -- ARROW_GTEST_BUILD_VERSION: 1.10.0
    -- ARROW_JEMALLOC_BUILD_VERSION: 5.2.1
    -- ARROW_LZ4_BUILD_VERSION: v1.9.3
    -- ARROW_MIMALLOC_BUILD_VERSION: v1.7.2
    -- ARROW_ORC_BUILD_VERSION: 1.6.6
    -- ARROW_PROTOBUF_BUILD_VERSION: v3.14.0
    -- ARROW_RAPIDJSON_BUILD_VERSION: 1a803826f1197b5e30703afe4b9c0e7dd48074f5
    -- ARROW_RE2_BUILD_VERSION: 2021-02-02
    -- ARROW_SNAPPY_BUILD_VERSION: 1.1.8
    -- ARROW_THRIFT_BUILD_VERSION: 0.13.0
    -- ARROW_THRIFT_BUILD_MD5_CHECKSUM: 38a27d391a2b03214b444cb13d5664f1
    -- ARROW_UTF8PROC_BUILD_VERSION: v2.6.1
    -- ARROW_XSIMD_BUILD_VERSION: e9234cd6e6f4428fc260073b2c34ffe86fda1f34
    -- ARROW_ZLIB_BUILD_VERSION: 1.2.11
    -- ARROW_ZSTD_BUILD_VERSION: v1.5.0
    -- Boost include dir: /usr/include
    -- Boost libraries: Boost::system;Boost::filesystem
    CMake Error at /usr/share/cmake-3.16/Modules/FindPackageHandleStandardArgs.cmake:146 (message):
      Could NOT find Snappy (missing: Snappy_LIB Snappy_INCLUDE_DIR)
    Call Stack (most recent call first):
      /usr/share/cmake-3.16/Modules/FindPackageHandleStandardArgs.cmake:393 (_FPHSA_FAILURE_MESSAGE)
      cmake_modules/FindSnappy.cmake:55 (find_package_handle_standard_args)
      cmake_modules/ThirdpartyToolchain.cmake:235 (find_package)
      cmake_modules/ThirdpartyToolchain.cmake:948 (resolve_dependency)
      CMakeLists.txt:515 (include)
    
    
    -- Configuring incomplete, errors occurred!
    See also "/root/projects/tmp/HybridBackend/arrow/build/CMakeFiles/CMakeOutput.log".
    See also "/root/projects/tmp/HybridBackend/arrow/build/CMakeFiles/CMakeError.log".
    make: *** [arrow/Makefile:8: /root/projects/tmp/HybridBackend/arrow/build/install_manifest.txt] Error 1
    
    

    I have install libsnappy-dev and can found it from /usr/local/include/snappy.h and /usr/local/lib/libsnappy.a, but that error still exists, so how should I install the correct snappy version for building HB?

    I also tryed the docker images from registry.cn-shanghai.aliyuncs.com/pai-dlc/hybridbackend:developer-tensorflow1.15-manylinux_2_27-py3.6-cu114, same error exists.

    Installation environment

    • GPU model and memory:
    • OS Platform: "20.04.3 LTS (Focal Fossa)"
    • Docker version:
    • GCC/CUDA/cuDNN version: 11.4
    • Python/conda version: Python 3.8.10/ conda 4.10.3
    • TensorFlow/PyTorch version: 1.15.5+deeprec2201

    Willing to contribute

    Yes

    opened by fuhailin 3
  • to_sparse failed for Value with ragged_rank > 1 read from parquet file

    to_sparse failed for Value with ragged_rank > 1 read from parquet file

    Current behavior

    when hb read some nested lists with ragged_rank > 1,the read Value cannot be transformed to SparseTensor by function hb.data.to_sparse.

    For example: dense_feature is one of the features read by hb.data.ParquetDataset, and to_sparse does not work for it. image

    Moreover, if I swap the order of the two nested_row_splits, then it can be to_sparse.

    image

    So maybe the order of the nested_row_splits when reading parquet file is incorrect?

    Expected behavior

    the Value read from parquet file can be transformed to SparseTensor.

    System information

    • GPU model and memory: No
    • OS Platform: Ubuntu
    • Docker version: No
    • GCC/CUDA/cuDNN version: 7.4/No/No
    • Python/conda version:3.6.13/4.13.0
    • TensorFlow/PyTorch version:1.14.0

    Code to reproduce

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    dataset = hb.data.ParquetDataset("test2.zstd.parquet", batch_size=1)
    dataset = dataset.apply(hb.data.to_sparse())
    iterator = dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    sess = tf.Session()
    vals = sess.run(next_element)
    
    # One more simple demo:
    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4])))
    sess = tf.Session()
    sess.run(val.to_sparse())
    

    Willing to contribute

    Yes

    bug 
    opened by SamJia 2
  • rebatch api produce an Check failed: limit <= dim0_size error

    rebatch api produce an Check failed: limit <= dim0_size error

    Current behavior

    After rebatch(), data iterator get_next() produce an error:

    F tensorflow/core/framework/tensor.cc:833] Check failed: limit <= dim0_size (8194 vs. 8193)
    

    Expected behavior

    no error

    System information

    • OS Platform and Distribution: Ubuntu 18.04.5 LTS
    • TensorFlow version: 1.15.0
    • Python version: 3.6
    • CUDA/cuDNN version: 10.1
    • RAM: 94G
    • GPU model and memory: Tesla T4, 16G

    Code to reproduce

    Step 1: Generate a parquet file by running following code

    import numpy as np
    import pandas as pd
    import random
    
    data_list = []
    for i in range(1, 10000):
        int_feature = random.randint(1, 100)
        # float_feature = random.random()
        array_feature = [random.randint(1, 10) for x in range(0, 4)]
        data_list.append([int_feature, array_feature])
    
    df = pd.DataFrame(data_list, columns=["int_feature", "array_feature"])
    df.to_parquet("parquet_sample_file.parquet")
    

    Step 2: Load generated parquet in step 1 by HybridBackend

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    
    
    filenames_ds = tf.data.Dataset.from_tensor_slices(['file1.snappy.parquet', 'file2.snappy.parquet', ... 'fileN.snappy.parquet'])
    
    
    hb_fields = []
    hb_fields.append(hb.data.DataFrame.Field("feature1", tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field("feature2", tf.float32, ragged_rank=1))
    hb_fields.append(hb.data.DataFrame.Field("feature3", tf.int64, ragged_rank=1))
    
    ds = filenames_ds.apply(hb.data.read_parquet(8192, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    iterator = ds.apply(hb.data.rebatch(8192, fields=hb_fields))
    
    it = iterator.make_one_shot_iterator()
    item = it.get_next()
    
    batch_size_dict = {}
    with tf.Session() as sess:
        print("======  start ======")
        total_batch_size = 0
        while True:
            try:
                batch = sess.run(item)
                batch_size = len(batch['mod_series'])
                batch_size_dict[batch_size] = batch_size_dict.get(batch_size, 0) + 1
            except tf.errors.OutOfRangeError:
                break
    
    

    Running above code in a pyhon3 shell, an error shall be thrown:

    F tensorflow/core/framework/tensor.cc:833] Check failed: limit <= dim0_size (8194 vs. 8193)
    

    Willing to contribute

    Yes

    bug 
    opened by liurcme 2
  • Bump tensorflow from 1.15.5 to 2.5.3 in /docs

    Bump tensorflow from 1.15.5 to 2.5.3 in /docs

    Bumps tensorflow from 1.15.5 to 2.5.3.

    Release notes

    Sourced from tensorflow's releases.

    TensorFlow 2.5.3

    Release 2.5.3

    Note: This is the last release in the 2.5 series.

    This releases introduces several vulnerability fixes:

    • Fixes a floating point division by 0 when executing convolution operators (CVE-2022-21725)
    • Fixes a heap OOB read in shape inference for ReverseSequence (CVE-2022-21728)
    • Fixes a heap OOB access in Dequantize (CVE-2022-21726)
    • Fixes an integer overflow in shape inference for Dequantize (CVE-2022-21727)
    • Fixes a heap OOB access in FractionalAvgPoolGrad (CVE-2022-21730)
    • Fixes an overflow and divide by zero in UnravelIndex (CVE-2022-21729)
    • Fixes a type confusion in shape inference for ConcatV2 (CVE-2022-21731)
    • Fixes an OOM in ThreadPoolHandle (CVE-2022-21732)
    • Fixes an OOM due to integer overflow in StringNGrams (CVE-2022-21733)
    • Fixes more issues caused by incomplete validation in boosted trees code (CVE-2021-41208)
    • Fixes an integer overflows in most sparse component-wise ops (CVE-2022-23567)
    • Fixes an integer overflows in AddManySparseToTensorsMap (CVE-2022-23568)
    • Fixes a number of CHECK-failures in MapStage (CVE-2022-21734)
    • Fixes a division by zero in FractionalMaxPool (CVE-2022-21735)
    • Fixes a number of CHECK-fails when building invalid/overflowing tensor shapes (CVE-2022-23569)
    • Fixes an undefined behavior in SparseTensorSliceDataset (CVE-2022-21736)
    • Fixes an assertion failure based denial of service via faulty bin count operations (CVE-2022-21737)
    • Fixes a reference binding to null pointer in QuantizedMaxPool (CVE-2022-21739)
    • Fixes an integer overflow leading to crash in SparseCountSparseOutput (CVE-2022-21738)
    • Fixes a heap overflow in SparseCountSparseOutput (CVE-2022-21740)
    • Fixes an FPE in BiasAndClamp in TFLite (CVE-2022-23557)
    • Fixes an FPE in depthwise convolutions in TFLite (CVE-2022-21741)
    • Fixes an integer overflow in TFLite array creation (CVE-2022-23558)
    • Fixes an integer overflow in TFLite (CVE-2022-23559)
    • Fixes a dangerous OOB write in TFLite (CVE-2022-23561)
    • Fixes a vulnerability leading to read and write outside of bounds in TFLite (CVE-2022-23560)
    • Fixes a set of vulnerabilities caused by using insecure temporary files (CVE-2022-23563)
    • Fixes an integer overflow in Range resulting in undefined behavior and OOM (CVE-2022-23562)
    • Fixes a vulnerability where missing validation causes tf.sparse.split to crash when axis is a tuple (CVE-2021-41206)
    • Fixes a CHECK-fail when decoding resource handles from proto (CVE-2022-23564)
    • Fixes a CHECK-fail with repeated AttrDef (CVE-2022-23565)
    • Fixes a heap OOB write in Grappler (CVE-2022-23566)
    • Fixes a CHECK-fail when decoding invalid tensors from proto (CVE-2022-23571)
    • Fixes an unitialized variable access in AssignOp (CVE-2022-23573)
    • Fixes an integer overflow in OpLevelCostEstimator::CalculateTensorSize (CVE-2022-23575)
    • Fixes an integer overflow in OpLevelCostEstimator::CalculateOutputSize (CVE-2022-23576)
    • Fixes a null dereference in GetInitOp (CVE-2022-23577)
    • Fixes a memory leak when a graph node is invalid (CVE-2022-23578)
    • Fixes an abort caused by allocating a vector that is too large (CVE-2022-23580)
    • Fixes multiple CHECK-failures during Grappler's IsSimplifiableReshape (CVE-2022-23581)
    • Fixes multiple CHECK-failures during Grappler's SafeToRemoveIdentity (CVE-2022-23579)
    • Fixes multiple CHECK-failures in TensorByteSize (CVE-2022-23582)
    • Fixes multiple CHECK-failures in binary ops due to type confusion (CVE-2022-23583)

    ... (truncated)

    Changelog

    Sourced from tensorflow's changelog.

    Release 2.5.3

    This releases introduces several vulnerability fixes:

    • Fixes a floating point division by 0 when executing convolution operators (CVE-2022-21725)
    • Fixes a heap OOB read in shape inference for ReverseSequence (CVE-2022-21728)
    • Fixes a heap OOB access in Dequantize (CVE-2022-21726)
    • Fixes an integer overflow in shape inference for Dequantize (CVE-2022-21727)
    • Fixes a heap OOB access in FractionalAvgPoolGrad (CVE-2022-21730)
    • Fixes an overflow and divide by zero in UnravelIndex (CVE-2022-21729)
    • Fixes a type confusion in shape inference for ConcatV2 (CVE-2022-21731)
    • Fixes an OOM in ThreadPoolHandle (CVE-2022-21732)
    • Fixes an OOM due to integer overflow in StringNGrams (CVE-2022-21733)
    • Fixes more issues caused by incomplete validation in boosted trees code (CVE-2021-41208)
    • Fixes an integer overflows in most sparse component-wise ops (CVE-2022-23567)
    • Fixes an integer overflows in AddManySparseToTensorsMap (CVE-2022-23568)
    • Fixes a number of CHECK-failures in MapStage (CVE-2022-21734)
    • Fixes a division by zero in FractionalMaxPool (CVE-2022-21735)
    • Fixes a number of CHECK-fails when building invalid/overflowing tensor shapes (CVE-2022-23569)
    • Fixes an undefined behavior in SparseTensorSliceDataset (CVE-2022-21736)
    • Fixes an assertion failure based denial of service via faulty bin count operations (CVE-2022-21737)
    • Fixes a reference binding to null pointer in QuantizedMaxPool (CVE-2022-21739)
    • Fixes an integer overflow leading to crash in SparseCountSparseOutput (CVE-2022-21738)
    • Fixes a heap overflow in SparseCountSparseOutput (CVE-2022-21740)
    • Fixes an FPE in BiasAndClamp in TFLite (CVE-2022-23557)
    • Fixes an FPE in depthwise convolutions in TFLite (CVE-2022-21741)

    ... (truncated)

    Commits
    • 959e9b2 Merge pull request #54213 from tensorflow/fix-sanity-on-r2.5
    • d05fcbc Fix sanity build
    • f2526a0 Merge pull request #54205 from tensorflow/disable-flaky-tests-on-r2.5
    • a5f94df Disable flaky test
    • 7babe52 Merge pull request #54201 from tensorflow/cherrypick-510ae18200d0a4fad797c0bf...
    • 0e5d378 Set Env Variable to override Setuptools new behavior
    • fdd4195 Merge pull request #54176 from tensorflow-jenkins/relnotes-2.5.3-6805
    • 4083165 Update RELEASE.md
    • a2bb7f1 Merge pull request #54185 from tensorflow/cherrypick-d437dec4d549fc30f9b85c75...
    • 5777ea3 Update third_party/icu/workspace.bzl
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    dependencies 
    opened by dependabot[bot] 2
  • Question: When to release the code?

    Question: When to release the code?

    Hi, I saw the code is not public according to the architecture doc here https://hybridbackend.readthedocs.io/en/latest/architecture.html

    Do you have a plan to open source it? Or is it just focused on data io?

    I'd appreciate it if anyone could help me.

    Thanks :clinking_glasses: :beers:

    opened by gaocegege 2
  • [DATA] Supports `hb.data.Iterator`

    [DATA] Supports `hb.data.Iterator`

    This patch implements #74 , introduces hb.data.Iterator, which optimizes cross-device transfers of inputs.

    Example:

    iterator = tf.data.make_one_shot_iterator(ds)
    iterator = hb.data.Iterator(iterator)
    features = iterator.get_next()
    hooks.append(hb.data.Iterator.Hook())
    
    enhancement 
    opened by 2sin18 1
  • Feature Request: Supports prefetching data to GPU

    Feature Request: Supports prefetching data to GPU

    User Story

    As a recommender system engineer, I want to read large batch of tabular data on GPU efficiently, so that training performance of large deep recommenders can be improved.

    Detailed requirements

    • It should be easy to use with TensorFlow Dataset API

    API Compatibility

    • Only new APIs should be introduced.

    Willing to contribute

    Yes

    enhancement 
    opened by 2sin18 0
  • hybridbackend 0.6.0a2 version raise ValueError when ParquetDataset wrapped by parallel_interleave ops

    hybridbackend 0.6.0a2 version raise ValueError when ParquetDataset wrapped by parallel_interleave ops

    Current behavior

    When hb.data.ParquetDataset wrapped by tf.data.experimental.parallel_interleave ops, here is a ValueError: Field xxx (dtype=unkown, shape=()) is incomplete, please specify dtype and ragged_rank

    Expected behavior

    hb.data.ParquetDataset wrapped by tf.data.experimental.parallel_interleave ops works as normally as hybridbackend-0.6.0a1 version .

    System information

    • GPU model and memory:Tesla T4 16G
    • OS Platform: Ubuntu 18.04.5 LTS
    • Docker version: 20.10.14
    • GCC/CUDA/cuDNN version: gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)/CUDA Version: 11.4.2/cuDNN 8
    • Python/conda version:
    • TensorFlow/PyTorch version: 1.15.5+deeprec2201
    • HybridBackend version: '0.6.0a2'

    Code to reproduce

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    from tensorflow.python.data.ops import dataset_ops
    
    
    def make_initializable_iterator(ds):
      r"""Wrapper of make_initializable_iterator.
      """
      if hasattr(dataset_ops, 'make_initializable_iterator'):
        return dataset_ops.make_initializable_iterator(ds)
      return ds.make_initializable_iterator()
    
    
    def parquet_map(record):
      label = record.pop('label_play')
      return record, label
    
    
    # Read from a parquet file.
    dataset = tf.data.Dataset.list_files([
        'part-00000-d07256ce-4685-4d6c-a9ab-b507ffef206e-c000.snappy.parquet'
    ],
                                         seed=1)
    dataset = dataset.apply(
        tf.data.experimental.parallel_interleave(
            lambda x: hb.data.ParquetDataset(
                x,
                # drop_remainder=True,
                batch_size=4,
                num_parallel_reads=1,
                fields=[
                    hb.data.DataFrame.Field('uid', tf.int64),
                    hb.data.DataFrame.Field('packagename', tf.int64, ragged_rank=0),
                    hb.data.DataFrame.Field('recent_play_3', tf.int64, ragged_rank=1),
                    hb.data.DataFrame.Field('label_play', tf.float64),
                ],
            ),
            cycle_length=1,
            block_length=1,
        ))
    ds = dataset.prefetch(4)
    
    iterator = make_initializable_iterator(ds)
    sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
    
    with tf.Session(config=sess_config) as sess:
      sess.run(iterator.initializer)
      for i in range(1):
        feature = sess.run(iterator.get_next())
        print(feature)
    

    You can download the toy dataste from here

    Willing to contribute

    Yes

    opened by fuhailin 0
  • support keras fit history in estimator's train_and_evaluate

    support keras fit history in estimator's train_and_evaluate

    User Story

    I want to hold a record of the loss values and metric values during training, like keras History object: https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/History https://keras.io/guides/training_with_built_in_methods/ image

    Detailed requirements

    I have to decide saving or not models depends on it's metrics(maybe lastest one).

    API Compatibility

    hb.estimator.train_and_evaluate

    Willing to contribute

    Yes

    enhancement 
    opened by karterotte 0
  • support ARROW_NUM_THREADS in ParquetDataset

    support ARROW_NUM_THREADS in ParquetDataset

    User Story

    image

    hb.data.ParquetDataset can not used all of pod-cpu.

    Detailed requirements

    hb.data.ParquetDataset

    1. num_parallel_reads to set file reader nums
    2. **[new]**num_arrow_threads to set column reader thread nums

    to accelerate model training

    API Compatibility

    hb.data.ParquetDataset

    Willing to contribute

    Yes

    opened by karterotte 0
  • Error when drop_reminder=True using rebatch API

    Error when drop_reminder=True using rebatch API

    Current behavior

    Using rebatch API with drop_reminder=True will make program exit with segmentation fault

    Expected behavior

    No error

    System information

    • GPU model and memory:
    • OS Platform: ubuntu 18
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version: python 3.6
    • TensorFlow/PyTorch version: 1.5.0
    • HybridBackend version: 0.6.0a0

    Code to reproduce

    (1) First generate a random parquet file.

    import pandas as pd
    import random
    
    data_list = []
    for i in range(1, 100000):
        int_feature = random.randint(1, 1000)
        array_feature = [random.randint(1, 1000) for x in range(0, 50)]
        data_list.append([int_feature, array_feature, 0.8])
    
    df = pd.DataFrame(data_list, columns=["int_feature", "array_feature", "label"])
    df['label'] = pd.to_numeric(df["label"], downcast="float")
    df.to_parquet("parquet_sample_file.parquet")
    

    (2) Then read data

    import tensorflow as tf
    import tensorflow.keras as keras
    import hybridbackend.tensorflow as hb
    
    BATCH_SIZE = 1000
    
    
    def get_parquet_ds():
        filenames_ds = tf.data.Dataset.from_tensor_slices([
            'parquet_sample_file.parquet'
        ]*1)
        hb_fields = []
    
        def _map(elem):
            features = {
                "int_feature": tf.cast(tf.reshape(elem["int_feature"], [-1, 1]), dtype=tf.float32),
                "array_feature": tf.cast(tf.reshape(elem["array_feature"].values, [-1, 50]),
                                                  dtype=tf.float32)
            }
            labels = tf.reshape(elem["label"], [-1, 1])
            return features, labels
    
        hb_fields.append(hb.data.DataFrame.Field("int_feature", tf.int64, ragged_rank=0))
        hb_fields.append(hb.data.DataFrame.Field("array_feature", tf.int64, ragged_rank=1))
        hb_fields.append(hb.data.DataFrame.Field("label", tf.float32, ragged_rank=0))
        iterator = filenames_ds.apply(
            hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
        iterator = iterator.apply(hb.data.rebatch(BATCH_SIZE*2, fields=hb_fields, drop_remainder=True)).map(_map)
    
        return iterator
    
    
    def train():
        global_init_op = tf.compat.v1.global_variables_initializer()
    
        ds = get_parquet_ds()
        iterator = ds.make_one_shot_iterator()
        get_data_op = iterator.get_next()
    
        with tf.compat.v1.Session() as sess:
            a = sess.run([global_init_op])
            i = 1
            while True:
                try:
                    sample = sess.run([get_data_op])
    
                    f_category = sample[0][0]["int_feature"]
                    f_list = sample[0][0]["array_feature"]
                    labels_ = sample[0][1]
    
                    if i % 100 == 0:
                        print(f"step={i}")
                    i += 1
    
                except tf.errors.OutOfRangeError:
                    break
    
    
    if __name__ == '__main__':
        train()
    

    Willing to contribute

    Yes

    opened by liurcme 0
  • Error when running imported/restored model that uses feedable iterator

    Error when running imported/restored model that uses feedable iterator

    I got a situation where I trained a model and saved its checkpoint files, then I need to restore the graph from the meta file and feed a new data iterator to keep training, so i find a issue talking about that, then i write some code to demo my situation.

    Current behavior

    When i use ParquetDataset to feed, i can't restore the meta file, and got the following error:

    Traceback (most recent call last):
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/importer.py", line 501, in _import_graph_def_internal
        graph._c_graph, serialized, options)  # pylint: disable=protected-access
    tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot add function '__inference_Dataset_flat_map__create_dataset_10' because a different function with the same name already exists.
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "test/io/restore_hb.py", line 223, in <module>
        resume_training(another_train_dataset, another_test_dataset)
      File "test/io/restore_hb.py", line 132, in resume_training
        saver = tf.train.import_meta_graph('checkpoints_hb/fufu.meta')
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1697, in import_meta_graph
        **kwargs)[0]
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1721, in _import_meta_graph_with_return_elements
        **kwargs))
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/meta_graph.py", line 809, in import_scoped_meta_graph_with_return_elements
        return_elements=return_elements)
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/util/deprecation.py", line 507, in new_func
        return func(*args, **kwargs)
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/importer.py", line 405, in import_graph_def
        producer_op_list=producer_op_list)
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/importer.py", line 505, in _import_graph_def_internal
        raise ValueError(str(e))
    ValueError: Cannot add function '__inference_Dataset_flat_map__create_dataset_10' because a different function with the same name already exists.
    

    I guess that error not belongs to a bug for HybridBackend, because i also try the TFRecordDataset and get a similar error:

    Traceback (most recent call last):
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/importer.py", line 501, in _import_graph_def_internal
        graph._c_graph, serialized, options)  # pylint: disable=protected-access
    tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot add function '__inference_Dataset_map__parse_function_55' because a different function with the same name already exists.
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "test/io/restore_pb.py", line 225, in <module>
        restore_feed()
      File "test/io/restore_pb.py", line 220, in restore_feed
        resume_training(another_train_dataset, another_test_dataset)
      File "test/io/restore_pb.py", line 155, in resume_training
        saver = tf.train.import_meta_graph('checkpoints_pb/fufu.meta')
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1697, in import_meta_graph
        **kwargs)[0]
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1721, in _import_meta_graph_with_return_elements
        **kwargs))
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/meta_graph.py", line 809, in import_scoped_meta_graph_with_return_elements
        return_elements=return_elements)
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/util/deprecation.py", line 507, in new_func
        return func(*args, **kwargs)
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/importer.py", line 405, in import_graph_def
        producer_op_list=producer_op_list)
      File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/importer.py", line 505, in _import_graph_def_internal
        raise ValueError(str(e))
    ValueError: Cannot add function '__inference_Dataset_map__parse_function_55' because a different function with the same name already exists.
    

    But that process works for from_tensor_slices and CsvDataset, i'm just curious and want to know how to restore and feed a new dataset iterator.

    Expected behavior

    When i use ParquetDataset in traing, i can restore the checkpoint and feed a new ParquetDataset iterator

    System information

    • GPU model and memory: 16G for Tesla T4
    • OS Platform: Ubuntu 18.04.5 LTS (Bionic Beaver)
    • Docker version: 20.10.14
    • GCC/CUDA/cuDNN version: gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04),
    • Python/conda version: Python 3.6.12
    • TensorFlow/PyTorch version: tensorflow 1.15.5+deeprec2201

    Code to reproduce

    training and restore use ParquetDataset to feed that doesn't work

    # Tensorflow 1.15
    # https://github.com/tensorflow/tensorflow/issues/11679#
    #
    import tensorflow as tf
    import numpy as np
    import pandas as pd
    import os
    import shutil
    from hybridbackend.tensorflow.data import DataFrame
    from hybridbackend.tensorflow.data import ParquetDataset
    from tensorflow.python.data.ops import dataset_ops
    
    new_dtypes = {"test1": np.float32, "test2": np.float32}
    
    train_df = pd.DataFrame(np.random.randint(0, 100, (5, 2)), columns=['test1', 'test2'])
    train_df = train_df.astype(new_dtypes)
    train_df.to_parquet('train.parquet')
    
    test_df = pd.DataFrame(np.random.randint(0, 100, (2, 2)), columns=['test1', 'test2'])
    test_df = test_df.astype(new_dtypes)
    test_df.to_parquet('test.parquet')
    
    
    def make_initializable_iterator(ds):
      if hasattr(dataset_ops, 'make_initializable_iterator'):
        return dataset_ops.make_initializable_iterator(ds)
      return ds.make_initializable_iterator()
    
    
    def make_one_shot_iterator(ds):
      if hasattr(dataset_ops, 'make_one_shot_iterator'):
        return dataset_ops.make_one_shot_iterator(ds)
      return ds.make_one_shot_iterator()
    
    
    def train(train_dataset, test_dataset):
      """
        Create graph with an Dataset and Iterator and save the model.
    
        There is some op that is applied to the data from the iterator.
        """
      iterator_handle = tf.placeholder(tf.string, shape=[])
      tf.add_to_collection('iterator_handle', iterator_handle)
    
      iterator = tf.data.Iterator.from_string_handle(iterator_handle, dataset_ops.get_legacy_output_types(train_dataset),
                                                     dataset_ops.get_legacy_output_shapes(train_dataset),
                                                     dataset_ops.get_legacy_output_classes(train_dataset))
      train_iter = make_initializable_iterator(train_dataset)
      test_iter = make_initializable_iterator(test_dataset)
      element = iterator.get_next()
    
      v = tf.get_variable(name='v', initializer=tf.zeros(shape=(1, 2)))
    
      # to use when saving summaries
      global_step = tf.Variable(0, name='global_step', trainable=False, dtype=tf.int32)
      increament_global_step = tf.assign(global_step, global_step + 1)
      global_step = global_step + 1
      tf.add_to_collection('increament_global_step', increament_global_step)
    
      some_op = tf.assign(v, v + tf.abs(element['test1']))
      tf.add_to_collection('some_op', tf.reduce_sum(some_op))
    
      tf.summary.scalar('v_sum', tf.reduce_sum(v))
      tf.summary.scalar('some_op', tf.reduce_mean(some_op))
      merged_summary = tf.summary.merge_all()
      tf.add_to_collection('merged_summary', merged_summary)
    
      writer = tf.summary.FileWriter('checkpoints_hb', graph=tf.get_default_graph())
      saver = tf.train.Saver()
    
      with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Run data iterator initialisation
        sess.run(train_iter.initializer)
        sess.run(test_iter.initializer)
    
        # "Training"
        print("Training")
        while True:
          try:
            [op, summary_values, g_step] = sess.run([some_op, merged_summary, increament_global_step],
                                                    feed_dict={iterator_handle: train_handle})
            writer.add_summary(summary_values, global_step=g_step)
            print(op)
          except tf.errors.OutOfRangeError:
            break
    
        # "Test evaluation"
        print("Testing")
        while True:
          try:
            print(sess.run(some_op, feed_dict={iterator_handle: test_handle}))
          except tf.errors.OutOfRangeError:
            break
    
        saver.save(sess, 'checkpoints_hb/fufu')
    
    def resume_training(train_dataset, test_dataset):
      """Restore the model from file and pass some new data through it
         for further training """
      with tf.Session() as sess:
        saver = tf.train.import_meta_graph('checkpoints_hb/fufu.meta')
        saver.restore(sess, 'checkpoints_hb/fufu')
        iterator_handle = tf.get_collection('iterator_handle')[0]
        some_op = tf.get_collection('some_op')[0]
        increament_global_step = tf.get_collection('increament_global_step')[0]
        merged_summary = tf.get_collection('merged_summary')[0]
    
        writer = tf.summary.FileWriter('checkpoints_hb', graph=tf.get_default_graph())
    
        # Make new iterators and handles
        train_iter = make_initializable_iterator(train_dataset)
        test_iter = make_initializable_iterator(test_dataset)
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Further training the model using new datasets (which may be different from original ones)
        print("Resume training ...")
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Run data iterator initialisation
        sess.run(train_iter.initializer)
        sess.run(test_iter.initializer)
    
        # "Training"
        print("Training")
        while True:
          try:
            [op, summary_values, g_step] = sess.run([some_op, merged_summary, increament_global_step],
                                                    feed_dict={iterator_handle: train_handle})
            writer.add_summary(summary_values, global_step=g_step)
            print(op)
          except tf.errors.OutOfRangeError:
            break
    
        # "Test evaluation"
        print("Testing")
        while True:
          try:
            print(sess.run(some_op, feed_dict={iterator_handle: test_handle}))
          except tf.errors.OutOfRangeError:
            break
    
        saver.save(sess, 'checkpoints_hb/fufu')
    
    
    def train_feed():
      # delete existing saved models and summary files
      if os.path.exists('checkpoints_hb'):
        shutil.rmtree('checkpoints_hb')
      # train_dataset = tf.data.Dataset.from_tensor_slices(
      #     tf.constant(np.random.randint(0, 100, (5, 2)), dtype=tf.float32))
      train_dataset = ParquetDataset('train.parquet',
                                     batch_size=1,
                                     fields=[DataFrame.Field('test1', tf.float32),
                                             DataFrame.Field('test2', tf.float32)])
      test_dataset = ParquetDataset('test.parquet',
                                    batch_size=1,
                                    fields=[DataFrame.Field('test1', tf.float32),
                                            DataFrame.Field('test2', tf.float32)])
      # test_dataset = tf.data.Dataset.from_tensor_slices(
      # tf.constant(np.random.randint(0, 100, (2, 2)), dtype=tf.float32))
    
      train(train_dataset, test_dataset)
    
    
    def restore_feed():
      # Load and fine-tune the saved model using new data
      another_train_dataset = ParquetDataset(
          'train.parquet',
          batch_size=1,
          fields=[DataFrame.Field('test1', tf.float32),
                  DataFrame.Field('test2', tf.float32)])
      another_test_dataset = ParquetDataset(
          'test.parquet', batch_size=1, fields=[DataFrame.Field('test1', tf.float32),
                                                DataFrame.Field('test2', tf.float32)])
    
      resume_training(another_train_dataset, another_test_dataset)
    
    
    if __name__ == '__main__':
      train_feed()
      restore_feed()
    

    It works neither for TFRecordDataset.

    import tensorflow as tf
    import numpy as np
    import pandas as pd
    import os
    import shutil
    from tensorflow.python.data.ops import dataset_ops
    
    
    def make_one_shot_iterator(ds):
      if hasattr(dataset_ops, 'make_one_shot_iterator'):
        return dataset_ops.make_one_shot_iterator(ds)
      return ds.make_one_shot_iterator()
    
    
    def make_initializable_iterator(ds):
      if hasattr(dataset_ops, 'make_initializable_iterator'):
        return dataset_ops.make_initializable_iterator(ds)
      return ds.make_initializable_iterator()
    
    
    # Define features
    feature_description = {
        'test1': tf.io.FixedLenFeature([], dtype=tf.float32),
        'test2': tf.io.FixedLenFeature([], dtype=tf.float32)
    }
    
    
    def _parse_function(example_proto):
      return tf.io.parse_example(example_proto, feature_description)
    
    
    def write_pb(df, file):
      # Write TFrecord file
      with tf.io.TFRecordWriter(file) as writer:
        for index, row in df.iterrows():
          print(row['test1'], row['test2'])
          # Create the Example
          example = tf.train.Example(features=tf.train.Features(
              feature={
                  'test1': tf.train.Feature(float_list=tf.train.FloatList(value=[row['test1']])),
                  'test2': tf.train.Feature(float_list=tf.train.FloatList(value=[row['test2']]))
              }))
          writer.write(example.SerializeToString())
    
    
    new_dtypes = {"test1": np.float32, "test2": np.float32}
    
    train_df = pd.DataFrame(np.random.randint(0, 100, (5, 2)), columns=['test1', 'test2'])
    train_df = train_df.astype(new_dtypes)
    write_pb(train_df, 'train.tfrecord')
    
    test_df = pd.DataFrame(np.random.randint(0, 100, (2, 2)), columns=['test1', 'test2'])
    test_df = test_df.astype(new_dtypes)
    write_pb(test_df, 'test.tfrecord')
    
    
    def train(train_dataset, test_dataset):
      """
      Create graph with an Dataset and Iterator and save the model.
    
      There is some op that is applied to the data from the iterator.
      """
      iterator_handle = tf.placeholder(tf.string, shape=[])
      tf.add_to_collection('iterator_handle', iterator_handle)
    
      iterator = tf.data.Iterator.from_string_handle(iterator_handle, dataset_ops.get_legacy_output_types(train_dataset),
                                                     dataset_ops.get_legacy_output_shapes(train_dataset),
                                                     dataset_ops.get_legacy_output_classes(train_dataset))
      train_iter = make_initializable_iterator(train_dataset)
      test_iter = make_initializable_iterator(test_dataset)
      element = iterator.get_next()
    
      v = tf.get_variable(name='v', initializer=tf.zeros(shape=(1, 2)))
    
      # to use when saving summaries
      global_step = tf.Variable(0, name='global_step', trainable=False, dtype=tf.int32)
      increament_global_step = tf.assign(global_step, global_step + 1)
      global_step = global_step + 1
      tf.add_to_collection('increament_global_step', increament_global_step)
    
      some_op = tf.assign(v, v + tf.abs(element['test1']))
      tf.add_to_collection('some_op', tf.reduce_sum(some_op))
    
      tf.summary.scalar('v_sum', tf.reduce_sum(v))
      tf.summary.scalar('some_op', tf.reduce_mean(some_op))
      merged_summary = tf.summary.merge_all()
      tf.add_to_collection('merged_summary', merged_summary)
    
      writer = tf.summary.FileWriter('checkpoints_pb', graph=tf.get_default_graph())
      saver = tf.train.Saver()
    
      with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Run data iterator initialisation
        sess.run(train_iter.initializer)
        sess.run(test_iter.initializer)
    
        # "Training"
        print("Training")
        while True:
          try:
            [op, summary_values, g_step] = sess.run([some_op, merged_summary, increament_global_step],
                                                    feed_dict={iterator_handle: train_handle})
            writer.add_summary(summary_values, global_step=g_step)
            print(op)
          except tf.errors.OutOfRangeError:
            break
    
        # "Test evaluation"
        print("Testing")
        while True:
          try:
            print(sess.run(some_op, feed_dict={iterator_handle: test_handle}))
          except tf.errors.OutOfRangeError:
            break
    
        saver.save(sess, 'checkpoints_pb/fufu')
    
    
    def resume_training(train_dataset, test_dataset):
      """Restore the model from file and pass some new data through it
         for further training """
      with tf.Session() as sess:
        saver = tf.train.import_meta_graph('checkpoints_pb/fufu.meta')
        saver.restore(sess, 'checkpoints_pb/fufu')
        iterator_handle = tf.get_collection('iterator_handle')[0]
        some_op = tf.get_collection('some_op')[0]
        increament_global_step = tf.get_collection('increament_global_step')[0]
        merged_summary = tf.get_collection('merged_summary')[0]
    
        writer = tf.summary.FileWriter('checkpoints_pb', graph=tf.get_default_graph())
    
        # Make new iterators and handles
        train_iter = make_initializable_iterator(train_dataset)
        test_iter = make_initializable_iterator(test_dataset)
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Further training the model using new datasets (which may be different from original ones)
        print("Resume training ...")
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Run data iterator initialisation
        sess.run(train_iter.initializer)
        sess.run(test_iter.initializer)
    
        # "Training"
        print("Training")
        while True:
          try:
            [op, summary_values, g_step] = sess.run([some_op, merged_summary, increament_global_step],
                                                    feed_dict={iterator_handle: train_handle})
            writer.add_summary(summary_values, global_step=g_step)
            print(op)
          except tf.errors.OutOfRangeError:
            break
    
        # "Test evaluation"
        print("Testing")
        while True:
          try:
            print(sess.run(some_op, feed_dict={iterator_handle: test_handle}))
          except tf.errors.OutOfRangeError:
            break
    
        saver.save(sess, 'checkpoints_pb/fufu')
    
    
    def train_feed():
      # delete existing saved models and summary files
      if os.path.exists('checkpoints_pb'):
        shutil.rmtree('checkpoints_pb')
      # train_dataset = tf.data.Dataset.from_tensor_slices(
      #     tf.constant(np.random.randint(0, 100, (5, 2)), dtype=tf.float32))
      train_dataset = tf.data.TFRecordDataset(['train.tfrecord']).batch(1).map(_parse_function)
      test_dataset = tf.data.TFRecordDataset(['test.tfrecord']).batch(1).map(_parse_function)
    
      train(train_dataset, test_dataset)
    
    
    def restore_feed():
      # Load and fine-tune the saved model using new data
      another_train_dataset = tf.data.TFRecordDataset(['train.tfrecord']).batch(1).map(_parse_function)
      another_test_dataset = tf.data.TFRecordDataset(['test.tfrecord']).batch(1).map(_parse_function)
    
      resume_training(another_train_dataset, another_test_dataset)
    
    
    if __name__ == '__main__':
      train_feed()
      restore_feed()
    
    

    But works for CsvDataset

    import tensorflow as tf
    import numpy as np
    import pandas as pd
    import os
    import shutil
    from tensorflow.python.data.experimental.ops import readers
    from tensorflow.python.data.ops import dataset_ops
    
    new_dtypes = {"test1": np.float32, "test2": np.float32}
    
    train_df = pd.DataFrame(np.random.randint(0, 100, (5, 2)), columns=['test1', 'test2'])
    train_df = train_df.astype(new_dtypes)
    train_df.to_csv('train.csv', index=False)
    
    test_df = pd.DataFrame(np.random.randint(0, 100, (2, 2)), columns=['test1', 'test2'])
    test_df = test_df.astype(new_dtypes)
    test_df.to_csv('test.csv', index=False)
    
    
    def make_initializable_iterator(ds):
      if hasattr(dataset_ops, 'make_initializable_iterator'):
        return dataset_ops.make_initializable_iterator(ds)
      return ds.make_initializable_iterator()
    
    
    def make_one_shot_iterator(ds):
      if hasattr(dataset_ops, 'make_one_shot_iterator'):
        return dataset_ops.make_one_shot_iterator(ds)
      return ds.make_one_shot_iterator()
    
    
    def train(train_dataset, test_dataset):
      """
        Create graph with an Dataset and Iterator and save the model.
    
        There is some op that is applied to the data from the iterator.
        """
      iterator_handle = tf.placeholder(tf.string, shape=[])
      tf.add_to_collection('iterator_handle', iterator_handle)
    
      iterator = tf.data.Iterator.from_string_handle(iterator_handle, dataset_ops.get_legacy_output_types(train_dataset),
                                                     dataset_ops.get_legacy_output_shapes(train_dataset),
                                                     dataset_ops.get_legacy_output_classes(train_dataset))
      train_iter = make_initializable_iterator(train_dataset)
      test_iter = make_initializable_iterator(test_dataset)
      element = iterator.get_next()
    
      v = tf.get_variable(name='v', initializer=tf.zeros(shape=(1, 2)))
    
      # to use when saving summaries
      global_step = tf.Variable(0, name='global_step', trainable=False, dtype=tf.int32)
      increament_global_step = tf.assign(global_step, global_step + 1)
      global_step = global_step + 1
      tf.add_to_collection('increament_global_step', increament_global_step)
    
      some_op = tf.assign(v, v + tf.abs(element))
      tf.add_to_collection('some_op', tf.reduce_sum(some_op))
    
      tf.summary.scalar('v_sum', tf.reduce_sum(v))
      tf.summary.scalar('some_op', tf.reduce_mean(some_op))
      merged_summary = tf.summary.merge_all()
      tf.add_to_collection('merged_summary', merged_summary)
    
      writer = tf.summary.FileWriter('checkpoints_csv', graph=tf.get_default_graph())
      saver = tf.train.Saver()
    
      with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Run data iterator initialisation
        sess.run(train_iter.initializer)
        sess.run(test_iter.initializer)
    
        # "Training"
        print("Training")
        while True:
          try:
            [op, summary_values, g_step] = sess.run([some_op, merged_summary, increament_global_step],
                                                    feed_dict={iterator_handle: train_handle})
            writer.add_summary(summary_values, global_step=g_step)
            print(op)
          except tf.errors.OutOfRangeError:
            break
    
        # "Test evaluation"
        print("Testing")
        while True:
          try:
            print(sess.run(some_op, feed_dict={iterator_handle: test_handle}))
          except tf.errors.OutOfRangeError:
            break
    
        saver.save(sess, 'checkpoints_csv/fufu')
    
    
    def resume_training(train_dataset, test_dataset):
      """Restore the model from file and pass some new data through it
         for further training """
      with tf.Session() as sess:
        saver = tf.train.import_meta_graph('checkpoints_csv/fufu.meta')
        saver.restore(sess, 'checkpoints_csv/fufu')
        iterator_handle = tf.get_collection('iterator_handle')[0]
        some_op = tf.get_collection('some_op')[0]
        increament_global_step = tf.get_collection('increament_global_step')[0]
        merged_summary = tf.get_collection('merged_summary')[0]
    
        writer = tf.summary.FileWriter('checkpoints_csv', graph=tf.get_default_graph())
    
        # Make new iterators and handles
        train_iter = make_initializable_iterator(train_dataset)
        test_iter = make_initializable_iterator(test_dataset)
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Further training the model using new datasets (which may be different from original ones)
        print("Resume training ...")
    
        train_handle = sess.run(train_iter.string_handle())
        test_handle = sess.run(test_iter.string_handle())
    
        # Run data iterator initialisation
        sess.run(train_iter.initializer)
        sess.run(test_iter.initializer)
    
        # "Training"
        print("Training")
        while True:
          try:
            [op, summary_values, g_step] = sess.run([some_op, merged_summary, increament_global_step],
                                                    feed_dict={iterator_handle: train_handle})
            writer.add_summary(summary_values, global_step=g_step)
            print(op)
          except tf.errors.OutOfRangeError:
            break
    
        # "Test evaluation"
        print("Testing")
        while True:
          try:
            print(sess.run(some_op, feed_dict={iterator_handle: test_handle}))
          except tf.errors.OutOfRangeError:
            break
    
        saver.save(sess, 'checkpoints_csv/fufu')
    
    
    def train_feed():
      # delete existing saved models and summary files
      if os.path.exists('checkpoints_csv'):
        shutil.rmtree('checkpoints_csv')
      # train_dataset = tf.data.Dataset.from_tensor_slices(
      #     tf.constant(np.random.randint(0, 100, (5, 2)), dtype=tf.float32))
      train_dataset = readers.CsvDataset("train.csv", record_defaults=[tf.float32, tf.float32], header=True)
      test_dataset = readers.CsvDataset("test.csv", record_defaults=[tf.float32, tf.float32], header=True)
      # test_dataset = tf.data.Dataset.from_tensor_slices(
      # tf.constant(np.random.randint(0, 100, (2, 2)), dtype=tf.float32))
    
      train(train_dataset, test_dataset)
    
    
    def restore_feed():
      # Load and fine-tune the saved model using new data
      another_train_dataset = readers.CsvDataset("train.csv", record_defaults=[tf.float32, tf.float32], header=True)
      another_test_dataset = readers.CsvDataset("test.csv", record_defaults=[tf.float32, tf.float32], header=True)
    
      resume_training(another_train_dataset, another_test_dataset)
    
    
    if __name__ == '__main__':
      train_feed()
      restore_feed()
    

    Willing to contribute

    Yes

    opened by fuhailin 0
  • Filter_func in parqeut reader

    Filter_func in parqeut reader

    User Story

    It is a common process that map, filter and batch, in row-based storage format, like tfrecord. But in parquet format, transforming to row-based dataset performs very badly and fitlering data after batch will bring the size of batch fluctuating drasticly. So we suppose to add a filter_func in read_parquet interface that helps user to get a clean batch directly.

    Detailed requirements

    add filter_func in hybridbackend.tensorflow.data.read_parquet(batch_size, fields=None, partition_count=1, partition_index=0, drop_remainder=False, num_parallel_reads=None, num_sequential_reads=1, filter_func=None, map_func=None)

    API Compatibility

    At least tensorflow 1.14 and 1.15

    opened by paopaoactioner 0
Releases(v0.5.4)
  • v0.5.4(Jul 25, 2022)

    Objectives:

    • Easy to use with existing AI workflows

    Features:

    • Support fixed length list in ParquetDataset
    • Support schema parsing in ParquetDataset
    • Provide validation tools for parquet files

    Bug Fixes:

    • Fixes indices calculation in rebatching
    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Apr 16, 2022)

    Objectives:

    1. Communication-efficient training and evaluation at scale
    2. Easy to use with existing AI workflows

    Features:

    1. Data-Parallel Training and Evaluation
    • Bucketized Gradients Aggregation using AllReduce
    • Global Metric Operations
    • Out-Of-Range Coordination
    1. Hybrid-Parallel Embedding Learning
    • Bucketized Embedding Exchanging using AllToAllv
    • Fusion and Quantization of AllToAllv
    • Fusion of Partitioning and Stitching
    1. Usability
    • Support of MonitoredSession and Estimator
    • Declarative API for Model Definition
    1. Compatibility
    • Support of NVIDIA TensorFlow and DeepRec
    1. Interoperability
    • Inference Pipeline Needs No Change
    • Support of SavedModel
    • Support of Variable, XDL HashTable and PAI Embedding Variable

    Bug Fixes:

    [#46] Fixes rebatching in ParquetDataset.

    Source code(tar.gz)
    Source code(zip)
  • v0.5.3(Jul 25, 2022)

  • v0.5.2(Dec 2, 2021)

    Objectives:

    • Memory-efficient loading of categorical data
    • Easy to use with existing AI workflows

    Features:

    1. Parquet Dataset
    • Reading batch of tensors from numeric fields in zero-copy way
    • Reading batch of sparse tensors from numeric list fields in zero-copy way
    • Support of string fields
    • Support of local filesystem, HDFS, S3 and OSS
    1. Data Pipeline Functions
    • Resizing batch of tensors and ragged tensors
    • Converting ragged tensors to sparse tensors
    • Objective: "Easy to use with existing AI workflows"
    1. Compatibility
    • Support of TensorFlow 1.15 and Tensorflow 1.14
    • GitHub actions for uploading wheels to PyPI

    Bug Fixes:

    • [#11][#12][#13] Supports manylinux_2_24 platform.
    Source code(tar.gz)
    Source code(zip)
Owner
Alibaba
Alibaba Open Source
Alibaba
This is the code of our paper An Efficient Training Approach for Very Large Scale Face Recognition or F²C for simplicity.

Fast Face Classification (F²C) This is the code of our paper An Efficient Training Approach for Very Large Scale Face Recognition or F²C for simplicit

null 33 Jun 27, 2021
DeepI2P - Image-to-Point Cloud Registration via Deep Classification. CVPR 2021

#DeepI2P: Image-to-Point Cloud Registration via Deep Classification Summary Video PyTorch implementation for our CVPR 2021 paper DeepI2P. DeepI2P solv

Li Jiaxin 129 Sep 13, 2022
Deep Scalable Sparse Tensor Network Engine (DSSTNE) is an Amazon developed library for building Deep Learning (DL) machine learning (ML) models

Amazon DSSTNE: Deep Scalable Sparse Tensor Network Engine DSSTNE (pronounced "Destiny") is an open source software library for training and deploying

Amazon Archives 4.4k Sep 26, 2022
PPLNN is a high-performance deep-learning inference engine for efficient AI inferencing.

PPLNN, which is short for "PPLNN is a Primitive Library for Neural Network", is a high-performance deep-learning inference engine for efficient AI inferencing.

null 888 Sep 27, 2022
Triton - a language and compiler for writing highly efficient custom Deep-Learning primitives.

Triton - a language and compiler for writing highly efficient custom Deep-Learning primitives.

OpenAI 4.1k Oct 2, 2022
Training and fine-tuning YOLOv4 Tiny on custom object detection dataset for Taiwanese traffic

Object Detection on Taiwanese Traffic using YOLOv4 Tiny Exploration of YOLOv4 Tiny on custom Taiwanese traffic dataset Trained and tested AlexeyAB's D

Andrew Chen 4 Oct 3, 2022
ONNX Runtime: cross-platform, high performance ML inferencing and training accelerator

ONNX Runtime is a cross-platform inference and training machine-learning accelerator compatible with deep learning frameworks, PyTorch and TensorFlow/Keras, as well as classical machine learning libraries such as scikit-learn, and more.

Microsoft 7.5k Sep 26, 2022
ResNet Implementation, Training, and Inference Using LibTorch C++ API

LibTorch C++ ResNet CIFAR Example Introduction ResNet implementation, training, and inference using LibTorch C++ API. Because there is no native imple

Lei Mao 20 Jun 20, 2022
Training and Evaluating Facial Classification Keras Models using the Tensorflow C API Implemented into a C++ Codebase.

CFace Training and Evaluating Facial Classification Keras Models using the Tensorflow C API Implemented into a C++ Codebase. Dependancies Tensorflow 2

null 8 Nov 23, 2021
Dorylus: Affordable, Scalable, and Accurate GNN Training

Dorylus: Affordable, Scalable, and Accurate GNN Training with Distributed CPU Servers and Serverless Threads This is Dorylus, a Scalable, Resource-eff

UCLASystem 55 Aug 24, 2022
OpenEmbedding is an open source framework for Tensorflow distributed training acceleration.

OpenEmbedding English version | 中文版 About OpenEmbedding is an open-source framework for TensorFlow distributed training acceleration. Nowadays, many m

4Paradigm 19 Jul 25, 2022
Implementation of Univaraint Linear Regresion (Supervised Machine Learning) in c++. With a data set (training set) you can predict outcomes.

Linear-Regression Implementation of Univaraint Linear Regresion (Supervised Machine Learning) in c++. With a data set (training set) you can predict o

vincent laizer 1 Nov 3, 2021
Reactive Light Training Module used in fitness for developing agility and reaction speed.

Hello to you , Thanks for taking interest in this project. Use case of this project is to help people that want to improve their agility and reactio

null 4 Sep 16, 2022
A system to flag anomalous source code expressions by learning typical expressions from training data

A friendly request: Thanks for visiting control-flag GitHub repository! If you find control-flag useful, we would appreciate a note from you (to niran

Intel Labs 1.2k Sep 24, 2022
Fairring (FAIR + Herring) is a plug-in for PyTorch that provides a process group for distributed training that outperforms NCCL at large scales

Fairring (FAIR + Herring): a faster all-reduce TL;DR: Using a variation on Amazon’s "Herring" technique, which leverages reduction servers, we can per

Meta Research 44 Aug 2, 2022
Nvvl - A library that uses hardware acceleration to load sequences of video frames to facilitate machine learning training

NVVL is part of DALI! DALI (Nvidia Data Loading Library) incorporates NVVL functionality and offers much more than that, so it is recommended to switc

NVIDIA Corporation 661 Sep 3, 2022
A library for distributed ML training with PyTorch

moolib moolib - a communications library for distributed ML training moolib offers general purpose RPC with automatic transport selection (shared memo

Meta Research 338 Sep 16, 2022
Weekly competitive programming training for newbies (Codeforces problem set)

Codeforces Basic Problem Set Weekly competitive programming training for newbies based on the Codeforces problem set. Note that, this training problem

Nguyen Hoang Hai 4 Apr 22, 2022
HugeCTR is a GPU-accelerated recommender framework designed to distribute training across multiple GPUs and nodes and estimate Click-Through Rates (CTRs)

Merlin: HugeCTR HugeCTR is a GPU-accelerated recommender framework designed to distribute training across multiple GPUs and nodes and estimate Click-T

null 704 Sep 29, 2022