使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

使用 Amazon Keyspaces 作为数据源训练机器学习模型

许多用于工业设备维护、贸易监控、车队管理和路线优化的应用程序都是使用开源 Cassandra API 和驱动程序构建的,以高速、低延迟处理数据。 自己管理 Cassandra 表可能既耗时又昂贵。 Amazon Keyspaces(用于 Apache Cassandra) 让您可以在 AWS 云中设置、保护和扩展 Cassandra 表,而无需管理额外的基础设施。

在这篇文章中,我们将向您介绍与使用 Amazon Keyspaces 高级训练机器学习 (ML) 模型相关的 AWS 服务,并提供将数据从 Amazon Keyspaces 提取到数据中的分步说明。 亚马逊SageMaker 并训练可用于特定客户细分用例的模型。

AWS 拥有多种服务来帮助企业在云中实施机器学习流程。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

AWS ML Stack 具有三层。 中间层是 SageMaker,它为开发人员、数据科学家和机器学习工程师提供了大规模构建、训练和部署机器学习模型的能力。 它消除了 ML 工作流程每个步骤的复杂性,以便您可以更轻松地部署 ML 用例。 这包括从预测性维护到计算机视觉以预测客户行为的任何内容。 客户利用 SageMaker 将数据科学家的工作效率提高了 10 倍。

Apache Cassandra 是具有非结构化或半结构化数据的读取密集型用例的流行选择。 例如,流行的食品配送业务估计配送时间,而零售客户可以经常使用 Apache Cassandra 数据库中的产品目录信息。 亚马逊密钥空间 是一种可扩展、高度可用且托管的无服务器 Apache Cassandra 兼容数据库服务。 您不需要配置、修补或管理服务器,也不需要安装、维护或操作软件。 表可以自动扩展和缩小,您只需为您使用的资源付费。 Amazon Keyspaces 允许您使用您现在使用的相同 Cassandra 应用程序代码和开发人员工具在 AWS 上运行 Cassandra 工作负载。

SageMaker 提供了一套 内置算法 帮助数据科学家和机器学习从业者快速开始训练和部署机器学习模型。 在这篇文章中,我们将向您展示零售客户如何使用 Keyspaces 数据库中的客户购买历史记录并针对不同的客户群进行营销活动。

K均值 是一种无监督学习算法。 它试图在数据中找到离散的分组,其中一组的成员彼此尽可能相似,并且与其他组的成员尽可能不同。 您定义希望算法用来确定相似性的属性。 SageMaker 使用的修改版本 网络规模的 k 均值聚类算法。 与原始版本的算法相比,SageMaker 使用的版本更加准确。 然而,与原始算法一样,它可以扩展到海量数据集并缩短训练时间。

解决方案概述

这些说明假设您将使用 SageMaker Studio 来运行代码。 相关代码已分享于 AWS 示例 GitHub。 按照实验室中的说明,您可以执行以下操作:

  • 安装必要的依赖项。
  • 连接到 Amazon Keyspaces、创建表并提取示例数据。
  • 使用 Amazon Keyspaces 中的数据构建分类 ML 模型。
  • 探索模型结果。
  • 清理新创建的资源。

完成后,您将把 SageMaker 与 Amazon Keyspaces 集成来训练 ML 模型,如下图所示。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

现在您可以按照 分步说明 在这篇文章中,使用 SageMaker 提取存储在 Amazon Keyspaces 中的原始数据以及由此检索的数据以进行 ML 处理。

先决条件

首先,导航到 SageMaker。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,如果这是您第一次使用 SageMaker,请选择 立即购买.

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,选择 设置 SageMaker 域.

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,创建一个新的用户配置文件,名称为 – sagemaker用户,然后选择 创建新角色 ,在 默认执行角色 子部分。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,在弹出的屏幕中,选择任意 亚马逊简单存储服务(Amazon S3) 存储桶,然后选择创建角色。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

该角色将在以下步骤中使用,以允许 SageMaker 使用该角色的临时凭证访问 Keyspaces 表。 这样就无需在笔记本中存储用户名和密码。

接下来,检索与该角色关联的角色 sagemaker用户 这是在上一步中从摘要部分创建的。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

然后,导航到 AWS控制台 并抬头看 AWS 身份和访问管理 (IAM)。 在 IAM 中,导航到角色。 在角色中,搜索上一步中标识的执行角色。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,选择上一步中标识的角色并选择添加权限。 在出现的下拉列表中,选择创建内联策略。 SageMaker 允许您提供细粒度的访问权限,限制用户/应用程序可以根据业务需求执行哪些操作。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

然后,选择 JSON 选项卡并从 Github 的“注释”部分复制策略 。 此策略允许 SageMaker 笔记本连接到 Keyspace 并检索数据以进行进一步处理。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

然后,再次选择“添加权限”,然后从下拉列表中选择“附加策略”。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

查找 AmazonKeyspacesFullAccess 策略,选中匹配结果旁边的复选框,然后选择附加策略。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

验证权限策略部分是否包括 AmazonS3FullAccess, AmazonSageMakerFullAccess, AmazonKeyspacesFullAccess,以及新添加的内联策略。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,使用 AWS 控制台导航到 SageMaker Studio 并选择 SageMaker Studio。 到达那里后,选择“启动应用程序”并选择“Studio”。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

笔记本演练

从 SageMaker Notebook 连接到 Keyspaces 的首选方法是使用 AWS 签名版本 4 流程 (SigV4) 基于 临时凭证 用于身份验证。 在这种情况下,我们不需要生成或存储 Keyspaces 凭据,并且可以使用凭据通过 SigV4 插件进行身份验证。 临时安全凭证由访问密钥 ID 和秘密访问密钥组成。 但是,它们还包含一个安全令牌,用于指示凭据何时过期。 在本文中,我们将创建 IAM 角色并生成临时安全凭证。

首先,我们安装驱动程序(cassandra-sigv4)。 此驱动程序使您能够使用 AWS 签名版本 4 流程 (SigV4) 将身份验证信息添加到 API 请求中。 使用该插件,您可以为用户和应用程序提供短期凭证,以使用 IAM 用户和角色访问 Amazon Keyspaces(适用于 Apache Cassandra)。 接下来,您将导入所需的证书以及其他包依赖项。 最后,您将允许笔记本承担与 Keyspaces 对话的角色。

# Install missing packages and import dependencies
# Installing Cassandra SigV4
%pip install cassandra-sigv4 # Get Security certificate
!curl https://certs.secureserver.net/repository/sf-class2-root.crt -O # Import
from sagemaker import get_execution_role
from cassandra.cluster import Cluster
from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
from cassandra_sigv4.auth import SigV4AuthProvider
import boto3 import pandas as pd
from pandas import DataFrame import csv
from cassandra import ConsistencyLevel
from datetime import datetime
import time
from datetime import timedelta import pandas as pd
import datetime as dt
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler # Getting credentials from the role
client = boto3.client("sts") # Get notebook Role
role = get_execution_role()
role_info = {"RoleArn": role, "RoleSessionName": "session1"}
print(role_info) credentials = client.assume_role(**role_info)

接下来,连接到 Amazon Keyspaces 并将系统数据从 Keyspaces 读取到 Pandas DataFrame 中以验证连接。

# Connect to Cassandra Database from SageMaker Notebook # using temporary credentials from the Role.
session = boto3.session.Session() ###
### You can also pass specific credentials to the session
###
#session = boto3.session.Session(
# aws_access_key_id=credentials["Credentials"]["AccessKeyId"],
# aws_secret_access_key=credentials["Credentials"]["SecretAccessKey"],
# aws_session_token=credentials["Credentials"]["SessionToken"],
#) region_name = session.region_name # Set Context
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations("sf-class2-root.crt")
ssl_context.verify_mode = CERT_REQUIRED auth_provider = SigV4AuthProvider(session)
keyspaces_host = "cassandra." + region_name + ".amazonaws.com" cluster = Cluster([keyspaces_host], ssl_context=ssl_context, auth_provider=auth_provider, port=9142)
session = cluster.connect() # Read data from Keyspaces system table. # Keyspaces is serverless DB so you don't have to create Keyspaces DB ahead of time.
r = session.execute("select * from system_schema.keyspaces") # Read Keyspaces row into Panda DataFrame
df = DataFrame(r)
print(df)

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

接下来,准备用于在原始数据集上进行训练的数据。 在与本文相关的 python 笔记本中,使用从以下位置下载的零售数据集 此处,并对其进行处理。 给定数据集后,我们的业务目标是使用称为 RFM 的特定指标对客户进行聚类。 RFM 模型基于三个定量因素:

  • 新近度:客户最近进行购买的时间。
  • 频率:客户购买的频率。
  • 货币价值:客户在购买上花费了多少钱。

RFM 分析对这三个类别中的每一个类别中的客户进行数字排名,通常按 1 到 5 的等级(数字越高,结果越好)。 “最佳”客户将在每个类别中获得最高分。 我们将使用 pandas 的基于分位数的离散化函数 (qcut)。 它将有助于基于或基于样本分位数将值离散化为相同大小的桶。

# Prepare Data
r = session.execute("select * from " + keyspaces_schema + ".online_retail") df = DataFrame(r)
df.head(100) df.count()
df["description"].nunique()
df["totalprice"] = df["quantity"] * df["price"]
df.groupby("invoice").agg({"totalprice": "sum"}).head() df.groupby("description").agg({"price": "max"}).sort_values("price", ascending=False).head()
df.sort_values("price", ascending=False).head()
df["country"].value_counts().head()
df.groupby("country").agg({"totalprice": "sum"}).sort_values("totalprice", ascending=False).head() returned = df[df["invoice"].str.contains("C", na=False)]
returned.sort_values("quantity", ascending=True).head() df.isnull().sum()
df.dropna(inplace=True)
df.isnull().sum()
df.dropna(inplace=True)
df.isnull().sum()
df.describe([0.05, 0.01, 0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.99]).T
df.drop(df.loc[df["customer_id"] == ""].index, inplace=True) # Recency Metric
import datetime as dt today_date = dt.date(2011, 12, 9)
df["customer_id"] = df["customer_id"].astype(int) # create get the most recent invoice for each customer
temp_df = df.groupby("customer_id").agg({"invoice_date": "max"})
temp_df["invoice_date"] = temp_df["invoice_date"].astype(str)
temp_df["invoice_date"] = pd.to_datetime(temp_df["invoice_date"]).dt.date
temp_df["Recency"] = (today_date - temp_df["invoice_date"]).dt.days
recency_df = temp_df.drop(columns=["invoice_date"])
recency_df.head() # Frequency Metric
temp_df = df.groupby(["customer_id", "invoice"]).agg({"invoice": "count"})
freq_df = temp_df.groupby("customer_id").agg({"invoice": "count"})
freq_df.rename(columns={"invoice": "Frequency"}, inplace=True) # Monetary Metric
monetary_df = df.groupby("customer_id").agg({"totalprice": "sum"})
monetary_df.rename(columns={"totalprice": "Monetary"}, inplace=True)
rfm = pd.concat([recency_df, freq_df, monetary_df], axis=1) df = rfm
df["RecencyScore"] = pd.qcut(df["Recency"], 5, labels=[5, 4, 3, 2, 1])
df["FrequencyScore"] = pd.qcut(df["Frequency"].rank(method="first"), 5, labels=[1, 2, 3, 4, 5])
df["Monetary"] = df["Monetary"].astype(int)
df["MonetaryScore"] = pd.qcut(df["Monetary"], 5, labels=[1, 2, 3, 4, 5])
df["RFM_SCORE"] = ( df["RecencyScore"].astype(str) + df["FrequencyScore"].astype(str) + df["MonetaryScore"].astype(str)
)
seg_map = { r"[1-2][1-2]": "Hibernating", r"[1-2][3-4]": "At Risk", r"[1-2]5": "Can't Loose", r"3[1-2]": "About to Sleep", r"33": "Need Attention", r"[3-4][4-5]": "Loyal Customers", r"41": "Promising", r"51": "New Customers", r"[4-5][2-3]": "Potential Loyalists", r"5[4-5]": "Champions",
} df["Segment"] = df["RecencyScore"].astype(str) + rfm["FrequencyScore"].astype(str)
df["Segment"] = df["Segment"].replace(seg_map, regex=True)
df.head()
rfm = df.loc[:, "Recency":"Monetary"]
df.groupby("customer_id").agg({"Segment": "sum"}).head()

在本例中,我们使用 CQL 从 Keyspace 表中读取记录。 在某些机器学习用例中,您可能需要多次从同一个 Keyspaces 表中读取相同的数据。 在这种情况下,我们建议您将数据保存到 Amazon S3 存储桶中,以避免产生额外的费用 成本正在从 Amazon Keyspaces 读取数据。 根据您的场景,您还可以使用 亚马逊电子病历摄取 将非常大的 Amazon S3 文件导入到 SageMaker 中。

## Optional Code to save Python DataFrame to S3
from io import StringIO # python3 (or BytesIO for python2) smclient = boto3.Session().client('sagemaker')
sess = sagemaker.Session()
bucket = sess.default_bucket() # Set a default S3 bucket
print(bucket) csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, ‘out/saved_online_retail.csv').put(Body=csv_buffer.getvalue())

接下来,我们使用 KMeans 算法训练 ML 模型并确保创建集群。 在此特定场景中,您将看到打印创建的集群,显示原始数据集中的客户已根据数据集中的各种属性分组在一起。 该集群信息可用于有针对性的营销活动。

# Training sc = MinMaxScaler((0, 1))
df = sc.fit_transform(rfm) # Clustering
kmeans = KMeans(n_clusters=6).fit(df) # Result
segment = kmeans.labels_ # Visualize the clusters
import matplotlib.pyplot as plt final_df = pd.DataFrame({"customer_id": rfm.index, "Segment": segment})
bucket_data = final_df.groupby("Segment").agg({"customer_id": "count"}).head()
index_data = final_df.groupby("Segment").agg({"Segment": "max"}).head()
index_data["Segment"] = index_data["Segment"].astype(int)
dataFrame = pd.DataFrame(data=bucket_data["customer_id"], index=index_data["Segment"])
dataFrame.rename(columns={"customer_id": "Total Customers"}).plot.bar( rot=70, title="RFM clustering"
)
# dataFrame.plot.bar(rot=70, title="RFM clustering");
plt.show(block=True);

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。

(可选)接下来,我们将 ML 模型识别的客户群保存回 Amazon Keyspaces 表中,以进行有针对性的营销。 批处理作业可以读取这些数据并向特定细分市场的客户运行有针对性的活动。

# Create ml_clustering_results table to store results createTable = """CREATE TABLE IF NOT EXISTS %s.ml_clustering_results ( run_id text, segment int, total_customers int, run_date date, PRIMARY KEY (run_id, segment)); """
cr = session.execute(createTable % keyspaces_schema)
time.sleep(20)
print("Table 'ml_clustering_results' created") insert_ml = ( "INSERT INTO " + keyspaces_schema + '.ml_clustering_results' + '("run_id","segment","total_customers","run_date") ' + 'VALUES (?,?,?,?); '
) prepared = session.prepare(insert_ml)
prepared.consistency_level = ConsistencyLevel.LOCAL_QUORUM run_id = "101"
dt = datetime.now() for ind in dataFrame.index: print(ind, dataFrame['customer_id'][ind]) r = session.execute( prepared, ( run_id, ind, dataFrame['customer_id'][ind], dt, ), )

最后,我们 清理资源 在本教程期间创建以避免产生额外费用。

# Delete blog keyspace and tables
deleteKeyspace = "DROP KEYSPACE IF EXISTS blog"
dr = session.execute(deleteKeyspace) time.sleep(5)
print("Dropping %s keyspace. It may take a few seconds to a minute to complete deletion keyspace and table." % keyspaces_schema )

完成键空间和表的删​​除可能需要几秒钟到一分钟的时间。 当您删除键空间时,该键空间及其所有表都将被删除,并且您将停止从中产生费用。

结论

本文向您展示了如何将客户数据从 Amazon Keyspaces 提取到 SageMaker 中,并训练一个允许您对客户进行细分的集群模型。 您可以利用这些信息进行有针对性的营销,从而大大提高您的业务 KPI。 要了解有关 Amazon Keyspaces 的更多信息,请查看以下资源:


作者简介

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。瓦季姆·利亚科维奇 是旧金山湾区 AWS 的高级解决方案架构师,帮助客户迁移到 AWS。 他正在与从大型企业到小型初创公司的各种组织合作,支持他们的创新。 他还帮助客户在 AWS 上构建可扩展、安全且经济高效的解决方案。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。帕特·帕特尔(Parth Patel) 是旧金山湾区 AWS 的解决方案架构师。 Parth 指导客户加速云之旅并帮助他们成功采用 AWS 云。 他专注于机器学习和应用程序现代化。

使用 Amazon Keyspaces 作为数据源 PlatoBlockchain Data Intelligence 训练机器学习模型。 垂直搜索。 哎。拉姆帕坦吉 是旧金山湾区 AWS 的解决方案架构师。 他帮助农业、保险、银行、零售、医疗保健和生命科学、酒店业和高科技垂直行业的客户在 AWS 云上成功运营业务。 他专注于数据库、分析和机器学习。

时间戳记:

更多来自 AWS机器学习