使用 Amazon Kinesis、AWS Glue 和 Amazon SageMaker PlatoBlockchain Data Intelligence 构建预测性维护解决方案。 垂直搜索。 哎。

使用 Amazon Kinesis、AWS Glue 和 Amazon SageMaker 构建预测性维护解决方案

组织越来越多地针对各种用例和问题构建和使用基于机器学习 (ML) 的解决方案,包括机器部件的预测性维护、基于客户偏好的产品推荐、信用分析、内容审核、欺诈检测等。 在其中许多场景中,当这些基于 ML 的解决方案能够近乎实时地处理数据事件并从中获得洞察力时,它们的有效性和优势将得到进一步增强。

尽管基于 ML 的近实时解决方案的商业价值和优势已经确立,但以最佳可靠性和性能大规模实施这些解决方案所需的架构是复杂的。 这篇文章描述了如何结合 亚马逊Kinesis, AWS胶水亚马逊SageMaker 构建用于预测性维护的近实时特征工程和推理解决方案。

用例概述

我们专注于预测性维护用例,其中部署在现场的传感器(例如工业设备或网络设备)需要在出现故障并导致停机之前更换或纠正。 停机对企业来说可能代价高昂,并可能导致糟糕的客户体验。 由机器学习模型提供支持的预测性维护还可以通过通知何时不应更换状况良好的机器部件来帮助增加基于计划的定期维护周期,从而避免不必要的成本。

在这篇文章中,我们专注于将机器学习应用于一个合成数据集,该数据集包含由于空气温度、过程温度、旋转速度、扭矩和工具磨损等特征而导致的机器故障。 使用的数据集来自 UCI 数据存储库.

机器故障由五种独立的故障模式组成:

  • 工具磨损故障 (TWF)
  • 散热故障 (HDF)
  • 电源故障 (PWF)
  • 过度应变失效 (OSF)
  • 随机故障 (RNF)

如果上述任何故障模式为真,则机器故障标签指示机器是否针对特定数据点发生故障。 如果至少有一种故障模式为真,则流程失败并且机器故障标签设置为 1。ML 模型的目标是正确识别机器故障,因此可以启动下游预测性维护操作。

解决方案概述

对于我们的预测性维护用例,我们假设设备传感器流式传输有关机器部件的各种测量和读数。 然后,我们的解决方案每次(微批量)获取一片流数据,并执行处理和特征工程以创建特征。 然后使用创建的特征从经过训练和部署的 ML 模型中近乎实时地生成推理。 生成的推理可以由下游应用程序进一步处理和使用,以采取适当的行动并启动维护活动。

下图展示了我们整体解决方案的架构。

该解决方案大致由以下部分组成,本文稍后将对此进行详细说明:

  • 流式数据源和摄取 - 我们用 Amazon Kinesis数据流 从现场传感器大规模收集流数据,并使其可用于进一步处理。
  • 近实时特征工程—— 我们使用 AWS Glue 流作业从 Kinesis 数据流中读取数据并执行数据处理和特征工程,然后将派生特征存储在 亚马逊简单存储服务 (亚马逊 S3)。 Amazon S3 提供了一种可靠且经济高效的选项来存储大量数据。
  • 模型训练和部署—— 我们使用来自 UCI 数据存储库的 AI4I 预测性维护数据集,使用 SageMaker 训练基于 XGBoost 算法的 ML 模型。 然后,我们将经过训练的模型部署到 SageMaker 异步推理端点。
  • 近实时机器学习推理 – 在 Amazon S3 中提供这些功能后,我们需要从已部署的模型中近乎实时地生成推理。 SageMaker 异步推理端点非常适合此要求,因为它们支持更大的有效负载大小(最多 1 GB),并且可以在几分钟内(最多 15 分钟)生成推理。 我们使用 S3 事件通知来运行 AWS Lambda 调用 SageMaker 异步推理端点的函数。 SageMaker 异步推理终端节点接受 S3 位置作为输入,从部署的模型生成推理,并将这些推理近乎实时地写回 Amazon S3。

此解决方案的源代码位于 GitHub上. 该解决方案已经过测试,应该在 us-east-1 中运行。

我们使用 AWS CloudFormation 模板,部署使用 AWS无服务器应用程序模型 (AWS SAM) 和 SageMaker 笔记本来部署解决方案。

先决条件

要开始,作为先决条件,您必须具备 山姆命令行界面, Python的3画中画 安装。 您还必须拥有 AWS命令行界面 (AWS CLI) 配置正确。

部署解决方案

您可以使用 AWS 云外壳 运行这些步骤。 CloudShell 是一种基于浏览器的 shell,它使用您的控制台凭证进行预身份验证,并包括预安装的常见开发和操作工具(例如 AWS SAM、AWS CLI 和 Python)。 因此,不需要本地安装或配置。

  • 我们首先创建一个 S3 存储桶,在其中存储 AWS Glue 流作业的脚本。 在终端中运行以下命令以创建新存储桶:
aws s3api create-bucket --bucket sample-script-bucket-$RANDOM --region us-east-1

  • 记下创建的存储桶的名称。

ML-9132 解决方案拱门

  • 接下来,我们在本地克隆代码存储库,其中包含用于部署堆栈的 CloudFormation 模板。 在终端中运行以下命令:
git clone https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance

  • 导航到 sam-template 目录:
cd amazon-sagemaker-predictive-maintenance/sam-template

ML-9132 git 克隆仓库

  • 运行以下命令以将 AWS Glue 作业脚本(从 glue_streaming/app.py)复制到您创建的 S3 存储桶:
aws s3 cp glue_streaming/app.py s3://sample-script-bucket-30232/glue_streaming/app.py

ML-9132 复制胶水脚本

  • 您现在可以通过 AWS SAM 通过 CloudFormation 模板继续构建和部署解决方案。 运行以下命令:

ML-9132 SAM 构建

sam deploy --guided

  • 为部署提供参数,例如堆栈名称、首选 AWS 区域 (us-east-1), GlueScriptsBucket.

确保提供您之前为 AWS Glue 脚本 S3 存储桶创建的相同 S3 存储桶(参数 GlueScriptsBucket 在以下屏幕截图中)。

ML-9132 SAM 部署参数

在您提供所需的参数后,AWS SAM 将启动堆栈部署。 以下屏幕截图显示了创建的资源。

已部署 ML-9132 SAM

成功部署堆栈后,您应该会看到以下消息。

ML-9132 SAM CF 部署

  • 在 AWS CloudFormation 控制台上,打开堆栈(对于这篇文章, nrt-streaming-inference) 在部署 CloudFormation 模板时提供。
  • 点击 资源 选项卡,记下 SageMaker 笔记本实例 ID。
  1. ML-9132 SM 笔记本创建
  • 在 SageMaker 控制台上,打开此实例。

ML-9132 image018

SageMaker 笔记本实例已预加载所需的笔记本。

导航到笔记本文件夹并打开并按照笔记本中的说明进行操作(Data_Pre-Processing.ipynbModelTraining-Evaluation-and-Deployment.ipynb) 来探索数据集、执行预处理和特征工程,以及训练模型并将其部署到 SageMaker 异步推理端点。

ML-9132 打开 SM 笔记本

流式数据源和摄取

Kinesis Data Streams 是一种无服务器、可扩展且持久的实时数据流服务,可用于实时收集和处理大量数据记录流。 Kinesis Data Streams 支持捕获、处理和存储来自各种来源的数据流,例如 IT 基础设施日志数据、应用程序日志、社交媒体、市场数据源、Web 点击流数据、IoT 设备和传感器等。 您可以根据吞吐量和扩展要求以按需模式或预置模式预置 Kinesis 数据流。 有关详细信息,请参阅 选择数据流容量模式.

对于我们的用例,我们假设各种传感器正在将温度、转速、扭矩和工具磨损等测量值发送到数据流。 Kinesis Data Streams 充当收集和摄取数据流的渠道。

我们使用 Amazon Kinesis 数据生成器 (KDG) 在本文后面生成数据并将其发送到 Kinesis 数据流,模拟传感器生成的数据。 来自数据流 sensor-data-stream 的数据是使用 AWS Glue 流式传输作业提取和处理的,我们将在下面讨论。

近实时特征工程

AWS Glue 流式传输作业 提供一种方便的方式来大规模处理流数据,而无需管理计算环境。 AWS Glue 允许您使用连续运行的作业对流数据执行提取、转换和加载 (ETL) 操作。 AWS Glue 流式传输 ETL 基于 Apache Spark 结构化流式引擎构建,可以从 Kinesis、Apache Kafka 和 适用于Apache Kafka的Amazon托管流 (亚马逊 MSK)。

流式 ETL 作业既可以使用 AWS Glue 内置转换,也可以使用 Apache Spark Structured Streaming 原生的转换。 您还可以使用 Spark ML 和 多层板 AWS Glue 作业中的库,以便使用现成的帮助程序库更轻松地处理功能。

如果流数据源的架构是预先确定的,您可以在 AWS Data Catalog 表中指定它。 如果无法事先确定架构定义,您可以在流式 ETL 作业中启用架构检测。 然后,该作业会自动根据传入数据确定架构。 此外,您可以使用 AWS Glue 架构注册表 允许集中发现、控制和发展数据流模式。 您可以进一步将架构注册表与数据目录集成,以便在创建或更新数据目录中的 AWS Glue 表或分区时有选择地使用存储在架构注册表中的架构。

对于这篇文章,我们创建了一个 AWS Glue 数据目录表 (sensor-stream) 以我们的 Kinesis 数据流作为源并为我们的传感器数据定义架构。

我们从 Data Catalog 表创建一个 AWS Glue 动态数据框,以从 Kinesis 读取流数据。 我们还指定以下选项:

  • 60 秒的窗口大小,以便 AWS Glue 作业在 60 秒的窗口中读取和处理数据
  • 起始位置 TRIM_HORIZON,以允许从 Kinesis 数据流中最旧的记录中读取

我们还使用 Spark MLlib 的 字符串索引器 特征转换器将字符串列类型编码为标签索引。 这种转换是使用 Spark ML Pipelines 实现的。 Spark ML 管道 为 ML 算法提供一组统一的高级 API,以便更轻松地将多种算法组合到单个管道或工作流中。

我们使用 foreachBatch API 调用一个名为 processBatch 的函数,该函数依次处理此数据帧引用的数据。 请参阅以下代码:

# Read from Kinesis Data Stream
sourceStreamData = glueContext.create_data_frame.from_catalog(database = "sensordb", table_name = "sensor-stream", transformation_ctx = "sourceStreamData", additional_options = {"startingPosition": "TRIM_HORIZON"})
type_indexer = StringIndexer(inputCol="type", outputCol="type_enc", stringOrderType="alphabetAsc")
pipeline = Pipeline(stages=[type_indexer])
glueContext.forEachBatch(frame = sourceStreamData, batch_function = processBatch, options = {"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})

函数 processBatch 执行指定的转换并根据年、月、日和批次 ID 对 Amazon S3 中的数据进行分区。

我们还将 AWS Glue 分区重新分区为单个分区,以避免 Amazon S3 中的小文件过多。 拥有多个小文件会影响读取性能,因为它会放大与查找、打开和读取每个文件相关的开销。 我们最终将生成推理的特征写入 S3 存储桶中的前缀(特征)。 请参阅以下代码:

# Function that gets called to perform processing, feature engineering and writes to S3 for every micro batch of streaming data from Kinesis.
def processBatch(data_frame, batchId):
transformer = pipeline.fit(data_frame)
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
if (data_frame.count() > 0):
data_frame = transformer.transform(data_frame)
data_frame = data_frame.drop("type")
data_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
data_frame.printSchema()
# Write output features to S3
s3prefix = "features" + "/year=" + "{:0>4}".format(str(year)) + "/month=" + "{:0>2}".format(str(month)) + "/day=" + "{:0>2}".format(str(day)) + "/hour=" + "{:0>2}".format(str(hour)) + "/min=" + "{:0>2}".format(str(minute)) + "/batchid=" + str(batchId)
s3path = "s3://" + out_bucket_name + "/" + s3prefix + "/"
print("-------write start time------------")
print(str(datetime.datetime.now()))
data_frame = data_frame.toDF().repartition(1)
data_frame.write.mode("overwrite").option("header",False).csv(s3path)
print("-------write end time------------")
print(str(datetime.datetime.now()))

模型训练和部署

SageMaker 是一项完全托管和集成的 ML 服务,使数据科学家和 ML 工程师能够快速轻松地构建、训练和部署 ML 模型。

在 Data_Pre-Processing.ipynb 笔记本中, 我们首先从 UCI 数据存储库导入 AI4I 预测性维护数据集并执行探索性数据分析 (EDA)。 我们还执行特征工程以使我们的特征对训练模型更有用。

例如,在数据集中,我们有一个名为 type 的特征,它将产品的质量类型表示为 L(低)、M(中)或 H(高)。 因为这是分类特征,我们需要在训练模型之前对其进行编码。 我们使用 Scikit-Learn 的 LabelEncoder 来实现这一点:

from sklearn.preprocessing import LabelEncoder
type_encoder = LabelEncoder()
type_encoder.fit(origdf['type'])
type_values = type_encoder.transform(origdf['type'])

在处理完特征并生成精选的训练和测试数据集后,我们就可以训练 ML 模型来根据系统读数预测机器是否发生故障。 我们使用 SageMaker 内置算法训练 XGBoost 模型。 XGBoost 可以为多种类型的 ML 问题(包括分类)提供良好的结果,即使在训练样本有限的情况下也是如此。

SageMaker 培训工作 提供在 SageMaker 上训练 ML 模型的强大而灵活的方法。 SageMaker 管理底层计算基础架构并提供 多种选择 可供选择,根据用例满足不同的模型训练要求。

xgb = sagemaker.estimator.Estimator(container,
role,
instance_count=1,
instance_type='ml.c4.4xlarge',
output_path=xgb_upload_location,
sagemaker_session=sagemaker_session)
xgb.set_hyperparameters(max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.8,
silent=0,
objective='binary:hinge',
num_round=100)

xgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})

当模型训练完成,根据业务需求对模型评估满意后,就可以开始模型部署了。 我们首先使用 AsyncInferenceConfig 对象选项并使用之前训练的模型创建端点配置:

endpoint_config_name = resource_name.format("EndpointConfig")
create_endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"VariantName": "variant1",
"ModelName": model_name,
"InstanceType": "ml.m5.xlarge",
"InitialInstanceCount": 1,
}
],
AsyncInferenceConfig={
"OutputConfig": {
"S3OutputPath": f"s3://{bucket}/{prefix}/output",
#Specify Amazon SNS topics
"NotificationConfig": {
"SuccessTopic": "arn:aws:sns:<region>:<account-id>:<success-sns-topic>",
"ErrorTopic": "arn:aws:sns:<region>:<account-id>:<error-sns-topic>",
}},
"ClientConfig": {"MaxConcurrentInvocationsPerInstance": 4},
},)

然后,我们使用我们创建的端点配置创建一个 SageMaker 异步推理端点。 配置完成后,我们可以开始调用端点以异步生成推理。

endpoint_name = resource_name.format("Endpoint")
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)

近实时推理

SageMaker 异步推理 端点提供了对传入推理请求进行排队并以近乎实时的方式异步处理它们的能力。 这对于具有较大负载大小(高达 1 GB)的推理请求、可能需要更长的处理时间(长达 15 分钟)以及具有近实时延迟要求的应用程序来说是理想的选择。 异步推理还使您能够通过在没有要处理的请求时将实例计数自动缩放为零来节省成本,因此您只需在终端节点处理请求时付费。

您可以创建 SageMaker 异步推理端点,类似于创建实时推理端点的方式,并另外指定 AsyncInferenceConfig 对象,同时使用 CreateEndpointConfig API 中的 EndpointConfig 字段创建端点配置。 下图显示了推理工作流程以及异步推理端点如何生成推理。

ML-9132 SageMaker 异步架构

要调用异步推理终端节点,请求有效负载应存储在 Amazon S3 中,并且需要在 InvokeEndpointAsync 请求中提供对此有效负载的引用。 调用时,SageMaker 将请求排队等待处理,并返回标识符和输出位置作为响应。 处理后,SageMaker 将结果放置在 Amazon S3 位置。 您可以选择接收成功或错误通知 亚马逊简单通知服务 (亚马逊 SNS)。

测试端到端解决方案

要测试解决方案,请完成以下步骤:

  • 在 AWS CloudFormation 控制台上,打开您之前创建的堆栈 (nrt-streaming-inference).
  • 点击 输出 选项卡,复制 S3 存储桶的名称 (EventsBucket).

这是我们的 AWS Glue 流作业在从 Kinesis 数据流读取和处理后向其中写入功能的 S3 存储桶。

ML-9132 S3 事件桶

接下来,我们为此 S3 存储桶设置事件通知。

  • 在 Amazon S3 控制台上,导航到存储桶 EventsBucket.
  • 点击 查看房源 标签,在 事件通知 部分中,选择 创建事件通知.

ML-9132 S3 事件存储桶属性

ML-9132 S3 事件桶通知

  • 针对 事件名称,输入 invoke-endpoint-lambda.
  • 针对 字首,输入 features/.
  • 针对 后缀,输入 .csv.
  • 针对 活动类型, 选择 所有对象创建事件.

ML-9132 S3 事件桶通知配置
ML-9132 S3 事件桶通知配置

  • 针对 目的地, 选择 拉姆达函数.
  • 针对 拉姆达函数, 并选择函数 invoke-endpoint-asynch.
  • 保存更改.

ML-9132 S3 事件桶通知配置 lambda

  • 在 AWS Glue 控制台上,打开作业 GlueStreaming-Kinesis-S3.
  • 运行工作.

ML-9132 运行胶水作业

接下来,我们使用 Kinesis Data Generator (KDG) 来模拟传感器将数据发送到我们的 Kinesis 数据流。 如果这是您第一次使用 KDG,请参阅 概述 用于初始设置。 KDG 提供了一个 CloudFormation 模板来创建用户并分配足够的权限以使用 KDG 将事件发送到 Kinesis。 跑过 CloudFormation模板 在您用于构建本文中的解决方案的 AWS 账户中。 设置 KDG 后,登录并访问 KDG 以将测试事件发送到我们的 Kinesis 数据流。

  • 使用您在其中创建 Kinesis 数据流的区域 (us-east-1)。
  • 在下拉菜单中,选择数据流 sensor-data-stream.
  • 每秒记录数 部分,选择 常数 并输入 100。
  • 取消选择 压缩记录.
  • 针对 记录模板,使用以下模板:
{
"air_temperature": {{random.number({"min":295,"max":305, "precision":0.01})}},
"process_temperature": {{random.number({"min":305,"max":315, "precision":0.01})}},
"rotational_speed": {{random.number({"min":1150,"max":2900})}},
"torque": {{random.number({"min":3,"max":80, "precision":0.01})}},
"tool_wear": {{random.number({"min":0,"max":250})}},
"type": "{{random.arrayElement(["L","M","H"])}}"
}

  • 点击 发送数据 开始向 Kinesis 数据流发送数据。

ML-9132 Kineses 数据生成器

AWS Glue 流式传输作业根据提供的窗口大小从 Kinesis 数据流中读取并提取一小批数据(表示传感器读数)。 然后,流作业在此微批次上处理和执行特征工程,然后将其分区并写入 S3 存储桶中的前缀特征。

随着 AWS Glue 流作业创建的新功能被写入 S3 存储桶,一个 Lambda 函数 (invoke-endpoint-asynch) 被触发,它通过发送调用请求来调用 SageMaker 异步推理端点,以从我们部署的 ML 模型中获取推理。 异步推理端点将异步调用请求排队。 处理完成后,SageMaker 将推理结果存储在 Amazon S3 位置 (S3OutputPath) 在异步推理端点配置期间指定。

对于我们的用例,推理结果根据传感器读数指示机器部件是否可能发生故障。

ML-9132 模型推断

SageMaker 还通过 Amazon SNS 发送成功或错误通知。 例如,如果您设置了一个 电子邮件订阅 对于成功和错误 SNS 主题(在异步 SageMaker 推理端点配置中指定),每次处理推理请求时都可以发送一封电子邮件。 以下屏幕截图显示了来自 SNS 成功主题的示例电子邮件。

ML-9132 SNS 电子邮件订阅

对于现实世界的应用程序,您可以将 SNS 通知与其他服务集成,例如 Amazon Simple Queue服务 (Amazon SQS) 和 Lambda,用于根据您的要求对生成的推理进行额外的后处理或与其他下游应用程序集成。 例如,对于我们的预测性维护用例,您可以根据 SNS 通知调用 Lambda 函数,以从 Amazon S3 读取生成的推理,进一步处理它(例如聚合或过滤),并启动工作流,例如发送工作订单设备维修技术人员。

清理

完成堆栈测试后,删除资源(尤其是 Kinesis 数据流、Glue 流作业和 SNS 主题)以避免意外费用。

运行以下代码以删除您的堆栈:

sam delete nrt-streaming-inference

还可以按照 ModelTraining-Evaluation-and-Deployment 笔记本中的清理部分删除 SageMaker 端点等资源。

结论

在这篇博文中,我们使用了一个预测性维护用例来演示如何使用 Kinesis、AWS Glue 和 SageMaker 等各种服务来构建近乎实时的推理管道。 我们鼓励您尝试此解决方案,并让我们知道您的想法。

如果您有任何问题,请在评论中分享。


关于作者

使用 Amazon Kinesis、AWS Glue 和 Amazon SageMaker PlatoBlockchain Data Intelligence 构建预测性维护解决方案。 垂直搜索。 哎。 拉胡尔·沙玛 是 AWS 数据实验室的解决方案架构师,帮助 AWS 客户设计和构建 AI/ML 解决方案。 在加入 AWS 之前,Rahul 在金融和保险领域工作了数年,帮助客户构建数据和分析平台。

使用 Amazon Kinesis、AWS Glue 和 Amazon SageMaker PlatoBlockchain Data Intelligence 构建预测性维护解决方案。 垂直搜索。 哎。帕特·赖利 是 AWS 数据实验室的架构师,他帮助客户设计和构建数据工作负载以支持他们的业务。 在加入 AWS 之前,Pat 曾在 AWS 合作伙伴处担任顾问,负责跨多个行业构建 AWS 数据工作负载。

时间戳记:

更多来自 AWS机器学习