在 Amazon SageMaker PlatoBlockchain Data Intelligence 上进行 TensorFlow 1.x 加速训练的最佳实践。 垂直搜索。 哎。

在 Amazon SageMaker 上进行 TensorFlow 1.x 加速训练的最佳实践

如今,许多客户正在使用 TensorFlow 来训练深度学习模型,以提高他们在电子商务中的广告点击率和个性化推荐。 随着客户行为的变化,他们每天都会积累大量新数据。 模型迭代是数据科学家的日常工作之一,但他们面临在大型数据集上训练时间过长的问题。

亚马逊SageMaker 是一个完全托管的机器学习 (ML) 平台,可以帮助数据科学家专注于模型而不是基础设施,并原生支持自带算法和框架,例如 TensorFlow 和 PyTorch。 SageMaker 提供灵活的分布式培训选项,可根据您的特定工作流程进行调整。 由于许多数据科学家可能缺乏加速训练过程的经验,在这篇文章中,我们将向您展示快速深度学习模型训练的重要因素以及在 SageMaker 上进行 TensorFlow 1.x 加速训练的最佳实践。 我们还有一个示例代码 深度调频 SageMaker 上的分布式培训 GitHub回购.

在 SageMaker 上运行 TensorFlow 脚本时,应考虑许多因素以最大限度地提高 CPU/GPU 利用率,例如基础设施、加速器类型、分布式训练方法、数据加载方法、混合精度训练等。

我们讨论以下领域的最佳实践:

  • 加速单个实例的训练
  • 加速多个实例的训练
  • 数据管道
  • 自动混合精度训练

加速单个实例的训练

在单个实例上运行 TensorFlow 脚本时,您可以选择计算机优化的系列,例如 亚马逊弹性计算云 (Amazon EC2) C5 系列,或在单个实例中具有多个 GPU 的加速计算系列,例如 p3.8xlarge、p3.16xlarge、p3dn.24xlarge 和 p4d.24xlarge。

在本节中,我们将讨论在单个实例上使用多个 CPU 的策略,以及在单个实例上使用多个 GPU 进行分布式训练。

单个实例上的多个 CPU

在本节中,我们将讨论在 CPU 设备、塔式方法、TensorFlow MirroredStrategy 和 Horovod 上手动设置算子的并行度。

在 CPU 设备上手动设置算子的并行度

TensorFlow 在训练过程中自动选择合适数量的线程来并行化运算计算。 但是,您可以设置 intra_op 线程池和 inter_op TensorFlow 提供的并行设置,并使用 MKL-DNN 的环境变量来设置 OS 线程的绑定。 请参阅以下代码:

# Set parallelism of intra_op and inter_op
num_cpus = int(os.environ['SM_NUM_CPUS'])
config = tf.ConfigProto(allow_soft_placement=True, device_count={'CPU': num_cpus}, intra_op_parallelism_threads=num_cpus, inter_op_parallelism_threads=num_cpus)
run_config = tf.estimator.RunConfig().replace(session_config = config)

# Use Intel MKL-DNN Setting to accelerate training speed
os.environ["KMP_AFFINITY"]= "verbose,disabled"
os.environ['OMP_NUM_THREADS'] = str(num_cpus)
os.environ['KMP_SETTINGS'] = '1'

环境变量 KMP_AFFINITY MKL-DNN 的设置为 granularity=fine,compact,1,0 默认。 将TensorFlow的intra和inter都设置为当前实例的最大vCPU数后,CPU使用上限与训练实例的物理核数几乎相同。

如果你设置 os.environ["KMP_AFFINITY"]= "verbose,disabled",操作系统线程未绑定到硬件超线程,CPU使用率可能超过物理内核数。

关于 TensorFlow 内并行度、TensorFlow 间并行度和 MKL-DNN 线程数的设置,这三个参数的不同组合导致训练速度不同。 因此,您需要测试每种情况以找到最佳组合。 一个常见的情况是设置三个参数(intra_op_parallelism_threadsinter_op_parallelism_threads 对于 TensorFlow, os.environ['OMP_NUM_THREADS'] 对于 MKL-DNN) 为 vCPU(物理核心)数量或 vCPU 总数的一半。

塔法

为了在 GPU 上复制模型,每个 GPU 都有自己的正向传递实例。 前向传播的实例称为 . 塔式方法几乎总是用于 GPU 设备。 为了与其他方法比较训练速度,这里我们也对我们的 CPU 设备使用了 tower 方法。

如果您不手动设置 CPU 设备,TensorFlow 不会使用 tower 方法来平均梯度,因此在这种情况下您不需要缩放批量大小。

  1. 手动设置 CPU 设备:
device_list = []
if manual_CPU_device_set:
		cpu_prefix=’/cpu:’
		for I in range(1, num_cpus):
			devices_list.append(cpu_prefix + str(i))

  1. 使用 replicate_model_fn 包裹 model_fn:
DeepFM = tf.estimator.Estimator(model_fn=tf.contrib.estimator.replicate_model_fn(model_fn, devices=device_list), model_dir=FLAGS.model_dir, params=model_params, config=config)

  1. 使用 TowerOptimizer 包裹 optimizer:
optimizer = tf.contrib.estimator.TowerOptimizer(optimizer)

  1. 包裹你的 model_fn:
with tf.variable_scope(‘deepfm_model’, reuse=tf.AUTO_REUSE)

  1. 将批量大小缩放到 (NUM_CPU – 1)。

让我们看看启用塔模式后 CPU 利用率的差异。 下图显示了 ml.c5.18xlarge 实例的 CPU 利用率,配置如下:

无塔 + LibSVM 数据 + 管道模式 + MKL-DNN 禁用绑定 + TensorFlow 内部/内部操作并行度设置为实例的最大 vCPU 数量

无塔

下图显示了 ml.c5.18xlarge 实例的 CPU 利用率,配置如下:

具有设置 CPU 设备的塔 + LibSVM 数据 + 管道模式 + MKL-DNN 禁用绑定 + TensorFlow 内部/内部操作并行度设置为实例的最大 vCPU 数量

使用塔式方法时CPU使用率更高,并且超过了物理核心数。

TensorFlow 镜像策略

TensorFlow MirroredStrategy 意味着在一台机器上跨多个副本进行同步训练。 此策略通常用于在具有多个 GPU 的一台机器上进行训练。 为了将训练速度与另一种方法进行比较,我们将 MirroredStrategy 用于我们的 CPU 设备。

在使用 TensorFlow MirroredStrategy 时,如果不设置 CPU 设备,TensorFlow 只是使用一个 CPU 作为单个 Worker,这很浪费资源。 我们建议手动设置 CPU 设备,因为它会对 /CPU:0, 所以 /CPU:0 device 在这里不用作副本。 请参阅以下代码:

device_list = []
if manual_CPU_device_set:
		cpu_prefix=’/cpu:’
		for I in range(1, num_cpus):
			devices_list.append(cpu_prefix + str(i))
mirrored_strategy = tf.distribute.MirroredStrategy(devices=devices_list)
	else:
mirrored_strategy = tf.distribute.MirroredStrategy()

# Set strategy to config:
config = tf.estimator.RunConfig(train_distribute=mirrored_strategy,
eval_distribute=mirrored_strategy,
session_config = config)

使用 MirroredStrategy 时需要缩放批量大小; 例如,将批量大小缩放为 GPU 设备数量的倍数。

对于设置 CPU 设备时的子策略,如果不设置 cross_device_ops 参数in tf.distribute.MirroredStrategy(), TensorFlow 使用 ReductionToOneDevice 默认为子策略。 但是,如果您设置 HierarchicalCopyAllReduce 作为子策略,TensorFlow 只是做 reduce 工作 /CPU:0. 当你使用TensorFlow dataset API和distribute strategy一起使用时,应该返回dataset对象而不是function中的features和labels input_fn.

通常,TensorFlow MirroredStrategy 在 CPU 训练上比 Tower 方法慢,所以我们不建议在多 CPU 单主机上使用 MirroredStrategy。

霍罗沃德

霍罗沃德 是一个用于 TensorFlow、Keras、PyTorch 和 Apache MXNet 的分布式深度学习训练框架。 Horovod 的目标是使分布式深度学习变得快速且易于使用。

有一个参数 distribution 在 SageMaker Python SDK Estimator API 中,您可以使用它来说明 Horovod 分布式训练。 SageMaker 配置基础架构并使用 MPI 运行您的脚本。 请参阅以下代码:

hvd_processes_per_host = 4
distribution = {'mpi': { 
'enabled': True, 
'processes_per_host': hvd_processes_per_host,
'custom_mpi_options': '-verbose --NCCL_DEBUG=INFO -x OMPI_MCA_btl_vader_single_copy_mechanism=none' 
} 
}

在选择 ml.p3.8xlarge 等 GPU 实例时,您需要为每个 worker 固定每个 GPU:

config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

为了加快模型收敛速度,根据 Horovod 官方文档,按工作人员数量调整学习率。 但是,在实际项目中,您应该在一定程度上调整学习率,而不是按工作人员的数量,这会导致模型性能不佳。 例如,如果原始学习率为 0.001,我们将学习率缩放到 0.0015,即使工人数量为 XNUMX 或更多。

通常,只有主节点(Horovod rank 0)保存检查点和模型以及评估操作。 使用 Horovod 时不需要缩放批量大小。 SageMaker 提供 管道模式 从流式传输数据 亚马逊简单存储服务 (Amazon S3) 到训练实例中。 启用 Pipe 模式时,请注意同一主机上的不同工作人员需要使用不同的通道以避免错误。 这是因为第一个工作进程读取 FIFO/通道数据,同一个实例上的其他工作进程会因为无法从同一个 FIFO/通道读取数据而挂起,因此 Horovod 无法正常工作。 为避免此问题,请根据每个实例的工作人员数量设置通道。 至少要保证同一台主机上的不同worker消费不同的channel; 不同主机上的工作人员可以使用相同的通道。

在使用 Horovod 时,您可能会遇到以下错误:

“One or more tensors were submitted to be reduced, gathered or broadcasted by subset of ranks and are waiting for remainder of ranks for more than 60 seconds. This may indicate that different ranks are trying to submit different tensors or that only subset of ranks is submitting tensors, which will cause deadlock.”

造成这个问题的可能原因是某个rank(比如rank 0)比其他rank工作慢或者做的工作多,导致其他rank等待时间过长。 虽然 rank 0 有时需要比其他 rank 做更多的工作,但需要注意的是,rank 0 不应该长时间做太多的工作。 例如,对于验证集的模型评估和训练过程中保存检查点,如果这些操作不可避免地需要很长时间,这可能会导致错误,一种解决方法是让所有工作人员做与 rank 0 相同的工作(检查点保存、评估等)。

数据分片是使用分布式训练时要考虑的最重要的事情之一。 您可以使用 TensorFlow dataset.shard() 在你的脚本中。 SageMaker 还在 输入通道 通过设置 distribution=S3shardbykey 在数据集通道中。 请参阅以下代码:

dataset = PipeModeDataset(channel, record_format='TFRecord')

number_host = len(FLAGS.hosts)

if FLAGS.enable_data_multi_path : # If there are multi channels mapping with different S3 path
    if FLAGS.enable_s3_shard == False :
        if number_host > 1:
            index = hvd.rank() // FLAGS.worker_per_host
            dataset = dataset.shard(number_host, index)
else :
    if FLAGS.enable_s3_shard :
        dataset = dataset.shard(FLAGS.worker_per_host, hvd.local_rank())
    else :
        dataset = dataset.shard(hvd.size(), hvd.rank())

下图显示了使用 Horovod 时的结果(ml.c5.18xlarge,Horovod + LibSVM + 默认的内部运算和内部运算设置),您可以将其与塔式方法进行比较。

霍罗沃德

在单个实例上使用多个 GPU 进行分布式训练

在单个实例上使用多个 GPU 开始分布式训练是很正常的,因为数据科学家只需要管理一个实例并利用 GPU 之间的高速互连。 SageMaker 训练作业支持在单个实例上具有多个 GPU 的多种实例类型,例如 ml.p3.8xlarge、ml.p3.16xlarge、ml.p3dn.24xlarge 和 ml.p4d.24xlarge。 该方法与单个实例中的多个 CPU 相同,但在脚本中进行了一些更改。

塔法

这里的塔式方法与多CPU训练中的几乎相同。 您需要根据使用的 GPU 数量来缩放批量大小。

TensorFlow 镜像策略

默认子策略 MirroredStrategy is NcclAllReduce. 您需要根据使用的 GPU 数量来缩放批量大小。 请参阅以下代码:

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=mirrored_strategy,
				eval_distribute=mirrored_strategy)

加速多个实例的训练

横向扩展始终是提高训练速度的一种选择。 越来越多的数据科学家将其作为分布式训练的默认选项。 在本节中,我们将讨论多主机分布式训练的策略。

具有多个实例的多个 CPU

启用分布式训练时,有四种主要方法可以使用多个 CPU 和多个实例:

    • 参数服务器,无需手动设置算子在 CPU 设备上的并行度
    • 在 CPU 设备上手动设置算子并行度的参数服务器
    • 带塔的参数服务器(手动设置CPU设备,并设置 allow_soft_placement=True in tf.ConfigProto)
    • 霍罗沃德

在使用参数服务器时 tf.estimator API,checkpoint的路径必须是Amazon S3等可共享路径或者是本地路径 亚马逊弹性文件服务 (Amazon EFS) 映射到容器。 对于参数服务器 tf.keras,检查点路径可以设置为本地路径。 对于 Horovod,检查点路径可以设置为训练实例的本地路径。

当使用参数服务器和 tf.estimator 带有到 Amazon S3 的检查点路径的 API,如果模型很大,您可能会遇到主卡在将检查点保存到 S3 的错误。 您可以使用 SageMaker 内置容器 TensorFlow 1.15 或 TensorFlow 1.15.2 或使用 Amazon EFS 作为共享的检查点路径。

多台主机使用参数服务器时,每个参数服务器进程上的参数负载可能不均衡(尤其是嵌入表变量比较大的时候),这可能会导致错误。 您可以通过查看 Amazon S3 中每个 shard 的 checkpoint 的文件大小来确定参数服务器上的参数是否平衡,因为每个参数服务器对应一个 checkpoint 文件的 shard。 为避免此类问题,您可以使用 partitioner 功能尝试使每个参数服务器的参数均匀分布:

with tf.variable_scope('deepfm_model', reuse=tf.AUTO_REUSE, partitioner = tf.fixed_size_partitioner(num_shards=len(FLAGS.hosts))):

具有多个实例的单 GPU

SageMaker 训练作业支持只有一个 GPU 的实例,例如 ml.p3.xlarge、ml.g4dn 和 ml.g5 系列。 此方案中使用了两种主要方法:参数服务器和 Horovod。

SageMaker内置的parameter server分布式训练方式是为每个训练实例启动一个parameter server进程和一个worker进程(每个parameter server只负责部分模型参数),所以默认是多机单- GPU 训练。 SageMaker 内置的参数服务器分布式训练是一种异步梯度更新方法。 为了减少异步更新对训练收敛的影响,建议降低学习率。 如果要使用实例上的所有 GPU,则需要使用参数服务器和 Tower 方法的组合。

对于 Horovod,只需设置 processes_per_host=1 在 SageMaker Python Estimator API 的分布参数中。

具有多个实例的多个 GPU

对于参数服务器和 tower 方法,代码改动与单实例多 GPU 的 tower 方法基本相同,无需手动设置 GPU 设备。

对于 Horovod,将分布参数中的 processes_per_host 设置为每个训练实例的 GPU 数量。 如果使用管道模式,每个实例的工作人员数量需要与通道数量相匹配。

数据管道

除了我们讨论过的基础架构之外,还有一个重要的事情需要考虑:数据管道。 数据管道是指在数据输入神经网络之前如何加载数据和转换数据。 CPU用于准备数据,而GPU用于从CPU计算数据。 因为 GPU 是一种昂贵的资源,所以更多的 GPU 空闲时间是低效的; 训练工作中良好的数据管道可以提高 GPU 和 CPU 利用率。

当您尝试优化 TensorFlow 数据输入管道时,请考虑使用的 API 顺序 TensorFlow 数据集,训练数据大小(很多小文件或几个大文件),批量大小等。

让我们看一下训练期间GPU和CPU之间的交互。 下图比较了有和没有管道的交互。

管道

更好的管道可以减少 GPU 空闲时间。 考虑以下提示:

  • 在提取特征和标签时使用简单的函数逻辑
  • 将样本预取到内存
  • 减少不必要的磁盘 I/O 和网络 I/O
  • 在内存中缓存处理后的特征和标签
  • 减少 CPU 和 GPU 之间的复制次数
  • 让不同的工作人员处理训练数据集的不同部分
  • 减少调用 TensorFlow 数据集 API 的次数

TensorFlow 提供了与数据集格式相关的转换 API,TensorFlow 中转换 API 的顺序对训练速度影响很大。 需要测试调用 TensorFlow 数据集 API 的最佳顺序。 以下是一些基本原则:

  • 使用矢量化地图。 这意味着首先调用 TensorFlow 数据集批处理 API,然后是数据集映射 API。 map函数中提供的自定义解析函数,如 decode_tfrecord 在示例代码中,解析一小批数据。 相反,先映射后批处理是标量映射,自定义解析器函数只处理一个样本。
  • 使用 TensorFlow 数据集缓存 API 来缓存特征和标签。 将 TensorFlow 数据集缓存 API 放在 TensorFlow 数据集重复 API 之前,否则 RAM 利用率会逐个时期线性增加。 如果数据集与 RAM 一样大,请不要使用 TensorFlow 数据集缓存 API。 如果您需要使用 TensorFlow 数据集缓存 API 和 shuffle API,请考虑使用以下顺序:创建 TensorFlow 数据集对象 -> 缓存 API -> shuffle API -> 批处理 API -> 映射 API -> 重复 API -> 预取 API。
  • 使用 tfrecord 数据集格式多于 LibSVM 格式。
  • 文件模式或管道模式取决于您的数据集格式和文件数量。 这 tfrecorddataset API可以设置 num_parallel_reads 并行读取多个文件并设置 buffer_size 优化数据的读取,而 pipemodedataset API 没有这样的设置。 管道模式更适合单个文件较大,文件总数较少的情况。 我们建议使用 SageMaker 处理作业来完成预处理工作,例如根据标签将多个文件连接到一个更大的文件中,使用采样的方法使数据集更加平衡,以及对平衡的数据集进行打乱。

请参阅以下代码示例:

def decode_tfrecord(batch_examples):
        # The feature definition here should BE consistent with LibSVM TO TFRecord process.
        features = tf.parse_example(batch_examples,
                                           features={
                                               "label": tf.FixedLenFeature([], tf.float32),
                                               "ids": tf.FixedLenFeature(dtype=tf.int64, shape=[FLAGS.field_size]),
                                               "values": tf.FixedLenFeature(dtype=tf.float32, shape=[FLAGS.field_size]) 
                                           })
        
        batch_label = features["label"]
        batch_ids = features["ids"]
        batch_values = features["values"]
        
        return {"feat_ids": batch_ids, "feat_vals": batch_values}, batch_label


    def decode_libsvm(line):
        columns = tf.string_split([line], ' ')
        labels = tf.string_to_number(columns.values[0], out_type=tf.float32)
        splits = tf.string_split(columns.values[1:], ':')
        id_vals = tf.reshape(splits.values,splits.dense_shape)
        feat_ids, feat_vals = tf.split(id_vals,num_or_size_splits=2,axis=1)
        feat_ids = tf.string_to_number(feat_ids, out_type=tf.int32)
        feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32)
        return {"feat_ids": feat_ids, "feat_vals": feat_vals}, labels

if FLAGS.pipe_mode == 0:
        dataset = tf.data.TFRecordDataset(filenames)
    else :
        # Enter Pipe mode
        dataset = PipeModeDataset(channel, record_format='TFRecord')
        
    if FLAGS.enable_s3_shard == False:
        host_rank = FLAGS.hosts.index(FLAGS.current_host)
        number_host = len(FLAGS.hosts)
        dataset = dataset.shard(number_host, host_rank)
    
    dataset = dataset.batch(batch_size, drop_remainder=True) # Batch size to use
    dataset = dataset.map(decode_tfrecord,
                          num_parallel_calls=tf.data.experimental.AUTOTUNE) 

    if num_epochs > 1:
        dataset = dataset.repeat(num_epochs)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

对于 CPU 实例的训练,设置并行度 intra op, inter op, MKL-DNN 的环境变量是一个很好的起点。

自动混合精度训练

我们讨论的最后一件事是自动混合精度训练,它可以加快速度并提高模型性能。 在撰写本文时,Nvidia V100 GPU(P3 实例)和 A100(P4dn 实例)支持 Tensor 核心。 使用这些类型的实例时,您可以在 TensorFlow 中启用混合精度训练。 从 1.14 版本开始,TensorFlow 已经支持自动混合精度训练。 您可以使用以下语句来包装您的原始优化器:

tf.train.experimental.enable_mixed_precision_graph_rewrite(optimizer)

如果模型小,GPU利用率低,那么自动混合精度训练就没有优势了。 如果模型很大,自动混合精度训练可以加快训练速度。

结论

当您在 SageMaker 中开始深度学习模型训练时,请考虑以下提示以实现更快的训练速度:

  • 先尝试多 CPU 单实例方法或单 GPU 单实例方法。 如果 CPU/GPU 利用率非常高(例如超过 90%),请转到下一步。
  • 在单个主机中尝试更多 CPU 或在单个主机中尝试更多 GPU。 如果利用率接近 CPU 或 GPU 的最大利用率,请转到下一步。
  • 使用多个主机尝试多个 CPU 或多个 GPU。
  • 使用参数服务器或 Horovod 时需要修改代码。 TensorFlow 基于会话的 API 的代码修改不一样, tf.estimator API,以及 tf.keras API。 参数服务器或 Horovod 可能会在不同的训练案例和任务中显示不同的训练速度,因此如果您有时间和预算来确定最佳方法,请尝试这两种方法。

请记住以下建议:

  • 在扩展之前检查利用率,优化您的数据管道,并使 CPU 和 GPU 在时间线中重叠。
  • 首先扩大规模,然后扩大规模。
  • 如果在所有方法之后都无法提高 GPU 利用率,请尝试 CPU。 在很多情况下(尤其是点击率排名模型),CPU 实例训练的总训练时间比 GPU 实例训练更短且更具成本效益。

我们还有一个代码示例 GitHub回购,我们在 SageMaker 上展示了两个 DeepFM 分布式训练样本。 一个是 CPU 实例上的 TensorFlow 参数服务器,另一个是 GPU 实例上的 Horovod。


作者简介

在 Amazon SageMaker PlatoBlockchain Data Intelligence 上进行 TensorFlow 1.x 加速训练的最佳实践。 垂直搜索。 哎。 梁玉辉 是一名高级机器学习解决方案架构师。 专注于机器学习的推广与应用,深度参与了众多客户的机器学习项目。 他在深度学习分布式训练、推荐系统和计算广告方面拥有丰富的经验。

在 Amazon SageMaker PlatoBlockchain Data Intelligence 上进行 TensorFlow 1.x 加速训练的最佳实践。 垂直搜索。 哎。王世帅 是一名高级机器学习解决方案架构师。 他与 AWS 客户合作,帮助他们大规模采用机器学习。 他喜欢看电影和环游世界。

时间戳记:

更多来自 AWS机器学习