Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Amazon SageMaker Studio ve Apache Spark kullanarak Delta Lake'ten veri yükleyin ve verileri dönüştürün

Veri gölleri, kritik iş verilerini depolamak için endüstride norm haline geldi. Bir veri gölünün birincil mantığı, ham verilerden ön işlenmiş ve son işlenmiş verilere kadar her tür veriyi toplamaktır ve hem yapılandırılmış hem de yapılandırılmamış veri formatlarını içerebilir. Her tür veri için merkezi bir veri deposuna sahip olmak, modern büyük veri uygulamalarının ihtiyaç duyulan her tür veriyi yüklemesine, dönüştürmesine ve işlemesine olanak tanır. Avantajlar arasında, ilk yapılandırmaya veya dönüştürmeye gerek kalmadan verileri olduğu gibi depolamak yer alır. En önemlisi, veri gölleri, daha iyi karar vermeyi yönlendirmek için birçok farklı analitik ve makine öğrenimi (ML) sürecinden gelen verilere kontrollü erişime izin verir.

Birden çok satıcı, aşağıdakiler dahil olmak üzere veri gölü mimarileri oluşturmuştur: AWS Göl Oluşumu. Ayrıca açık kaynaklı çözümler, şirketlerin verilere kolayca erişmesine, bunları yüklemesine ve paylaşmasına olanak tanır. AWS Cloud'da veri depolama seçeneklerinden biri, Delta Gölü. Delta Lake kitaplığı, açık kaynakta okuma ve yazma sağlar Apache Parke dosya biçimindedir ve ACID işlemleri, ölçeklenebilir meta veri işleme ve birleşik akış ve toplu veri işleme gibi yetenekler sağlar. Delta Lake, verileri aşağıdaki gibi bir nesne katmanı depolamasının üzerinde depolamak için kullanabileceğiniz bir depolama katmanı API'si sunar. Amazon Basit Depolama Hizmeti (Amazon S3).

Veri, makine öğreniminin kalbinde yer alır—genel olarak bir veri gölünde depolanan yüksek kaliteli geçmiş verilere erişim olmadan geleneksel denetimli bir modelin eğitimi mümkün değildir. Amazon Adaçayı Yapıcı ML çözümleri oluşturmak için çok yönlü bir çalışma tezgahı sağlayan ve veri alımı, veri işleme, model eğitimi ve model barındırma için son derece özel araçlar sağlayan tam olarak yönetilen bir hizmettir. Apache Spark verileri yüklemek ve işlemek için kapsamlı bir API ile modern veri işlemenin bir gücüdür. SageMaker, yüksek düzeyde dağıtılmış bir şekilde çalışan ML iş akışlarını etkinleştirmek için Spark kullanarak petabayt ölçeğinde veri hazırlama yeteneğine sahiptir. Bu gönderi, Delta Lake tarafından sunulan özelliklerden nasıl yararlanabileceğinizi vurgulamaktadır. Amazon SageMaker Stüdyosu.

Çözüme genel bakış

Bu gönderide, Delta Lake biçiminde depolanan verileri kolayca yüklemek ve dönüştürmek için SageMaker Studio not defterlerinin nasıl kullanılacağını açıklıyoruz. Tablo verilerini CSV ve Parke biçiminde okuyan ve yazan Apache Spark komutlarını çalıştırmak için standart bir Jupyter not defteri kullanıyoruz. açık kaynak kitaplığı delta-kıvılcım bu verilere kendi yerel biçiminde doğrudan erişmenizi sağlar. Bu kitaplık, veri dönüştürmeleri uygulamak, şema değişiklikleri yapmak ve verilerin belirli bir sürümünü çekmek için zaman yolculuğu veya zaman damgası sorgularını kullanmak için birçok API işleminden yararlanmanıza olanak tanır.

Örnek not defterimizde ham verileri Spark DataFrame'e yüklüyoruz, bir Delta tablosu oluşturuyoruz, onu sorguluyoruz, denetim geçmişini görüntülüyoruz, şema gelişimini gösteriyoruz ve tablo verilerini güncellemek için çeşitli yöntemler gösteriyoruz. kullanıyoruz DataFrame API'si veri kümesi özniteliklerini almak ve dönüştürmek için PySpark kitaplığından. kullanıyoruz delta-spark Delta Lake formatında veri okumak ve yazmak ve altta yatan tablo yapısını işlemek için kütüphane. plan.

Bir Jupyter not defterinden Python kodu oluşturmak ve çalıştırmak için SageMaker'ın yerleşik IDE'si olan SageMaker Studio'yu kullanıyoruz. biz bir yarattık GitHub deposu içeren bu defter ve bu örneği kendi başınıza çalıştırmak için diğer kaynaklar. Not defteri, farklı veri depolarında verileri çoğaltmaya gerek kalmadan tablolara yerinde erişilmesine izin veren Delta Lake biçiminde depolanan verilerle tam olarak nasıl etkileşime girileceğini gösterir.

Bu örnek için, halka açık bir veri kümesi kullanıyoruz Ödünç Club müşteri kredileri verilerini temsil eder. indirdik accepted veri dosyası (accepted_2007_to_2018Q4.csv.gz) ve orijinal niteliklerin bir alt kümesini seçti. Bu veri seti altında mevcuttur Creative Commons (CCO) Lisansı.

Önkoşullar

Kullanmadan önce birkaç önkoşul yüklemeniz gerekir. delta-spark işlevsellik. Gerekli bağımlılıkları karşılamak için, Dockerized kapsayıcı olarak çalışan ve bir Jupyter Gateway uygulaması aracılığıyla erişilen Studio ortamımıza bazı kitaplıklar yüklememiz gerekiyor:

  • Java ve ilgili kitaplıklara erişim için OpenJDK
  • PySpark (Python için Spark) kitaplığı
  • Delta Spark açık kaynak kitaplığı

İkisini de kullanabiliriz conda or pip her ikisinde de herkese açık olan bu kitaplıkları kurmak için conda-forge, PyPI sunucuları veya Maven depoları.

Bu not defteri, SageMaker Studio içinde çalışacak şekilde tasarlanmıştır. Not defterini Studio içinde başlattıktan sonra, Python 3(Veri Bilimi) çekirdek türü. Ek olarak, PySpark komutlarının daha hızlı çalışmasına izin veren en az 16 GB RAM'e (ml.g4dn.xlarge gibi) sahip bir örnek türü kullanmanızı öneririz. Dizüstü bilgisayarın ilk birkaç hücresini oluşturan gerekli bağımlılıkları yüklemek için aşağıdaki komutları kullanın:

%conda install openjdk -q -y
%pip install pyspark==3.2.0
%pip install delta-spark==1.1.0
%pip install -U "sagemaker>2.72"

Kurulum komutları tamamlandıktan sonra not defterinde çekirdek mantığı çalıştırmaya hazırız.

Çözümü uygula

Apache Spark komutlarını çalıştırmak için bir örnek oluşturmamız gerekir. SparkSession nesne. Gerekli import komutlarını ekledikten sonra konfigüre ediyoruz. SparkSession bazı ek yapılandırma parametreleri ayarlayarak. anahtarlı parametre spark.jars.packages çalıştırmak için Spark tarafından kullanılan ek kitaplıkların adlarını iletir delta komutlar. Aşağıdaki ilk kod satırları, geleneksel Maven koordinatlarını kullanarak bir paket listesi oluşturur (groupId:artifactId:version), bu ek paketleri SparkSession.

Ek olarak, anahtarlı parametreler spark.sql.extensions ve spark.sql.catalog.spark_catalog Spark'ın Delta Lake işlevselliğini doğru şekilde işlemesini sağlayın. Anahtarlı son yapılandırma parametresi fs.s3a.aws.credentials.provider ekler ContainerCredentialsProvider Studio'nun AWS Kimlik ve Erişim Yönetimi Kapsayıcı ortamı aracılığıyla kullanıma sunulan (IAM) rol izinleri. kod oluşturur SparkSession SageMaker Studio ortamı için düzgün şekilde başlatılan nesne:

# Configure Spark to use additional library packages to satisfy dependencies # Build list of packages entries using Maven coordinates (groupId:artifactId:version)
pkg_list = []
pkg_list.append("io.delta:delta-core_2.12:1.1.0")
pkg_list.append("org.apache.hadoop:hadoop-aws:3.2.2") packages=(",".join(pkg_list))
print('packages: '+packages) # Instantiate Spark via builder
# Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions spark = (SparkSession .builder .appName("PySparkApp") .config("spark.jars.packages", packages) .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider") .getOrCreate()) sc = spark.sparkContext print('Spark version: '+str(sc.version))

Bir sonraki bölümde, Lending Club tüketici kredileri veri setinin bir alt kümesini içeren bir dosyayı varsayılan S3 kovamıza yüklüyoruz. Orijinal veri kümesi çok büyüktür (600 MB'nin üzerinde), bu nedenle bu dizüstü bilgisayar tarafından kullanılmak üzere tek bir temsili dosya (2.6 MB) sağlıyoruz. PySpark şunları kullanır: s3a ek Hadoop kitaplığı işlevselliğini etkinleştirmek için protokol. Bu nedenle, her yerel S3 URI'sini s3 kullanılacak protokol s3a bu defter boyunca hücrelerde.

Spark DataFrame adlı bir Spark DataFrame döndüren aşağıdaki kodla ham verileri (hem CSV hem de Parquet dosyaları için seçeneklerle birlikte) okumak için Spark kullanıyoruz. loans_df:

loans_df = spark.read.csv(s3a_raw_csv, header=True)

Aşağıdaki ekran görüntüsü, elde edilen DataFrame'den ilk 10 satırı gösterir.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Artık bu DataFrame'i belirterek tek satır kodlu bir Delta Lake tablosu olarak yazabiliriz. .format("delta") ve tablo verilerini yazmak istediğimiz S3 URI konumunu sağlamak:

loans_df.write.format("delta").mode("overwrite").save(s3a_delta_table_uri)

Sonraki birkaç not defteri hücresi, Delta Lake tablosunu sorgulamak için bir seçenek gösterir. Standart bir SQL sorgusu oluşturabiliriz, belirtebiliriz delta biçimini ve tablo konumunu seçin ve Spark SQL sözdizimini kullanarak bu komutu gönderin:

sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY loan_amnt'
sql_results = spark.sql(sql_cmd)

Aşağıdaki ekran görüntüsü, SQL sorgumuzun sonuçlarını şu şekilde sıralanmıştır: loan_amnt.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Delta Lake tablolarıyla etkileşime gir

Bu bölümde sergiliyoruz DeltaTable sınıfı itibaren delta-spark kütüphane. DeltaTable Delta Lake tablolarıyla programlı etkileşim için birincil sınıftır. Bu sınıf, bir tablo hakkındaki bilgileri keşfetmek için birkaç statik yöntem içerir. Örneğin, isDeltaTable yöntem, tablonun delta biçiminde depolanıp depolanmadığını gösteren bir Boole değeri döndürür:

# Use static method to determine table type
print(DeltaTable.isDeltaTable(spark, s3a_delta_table_uri))

Oluşturabilirsiniz DeltaTable bizim durumumuzda S3 URI konumu olan Delta tablosunun yolunu kullanan örnekler. Aşağıdaki kodda, tablo değişikliklerinin tam geçmişini alıyoruz:

deltaTable = DeltaTable.forPath(spark, s3a_delta_table_uri)
history_df = deltaTable.history()
history_df.head(3)

Çıktı, tabloda kayıtlı altı değişiklik olduğunu ve en son üç sürümü gösterir.

Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

şema evrimi

Bu bölümde, Delta Lake şema evriminin nasıl çalıştığını gösteriyoruz. Varsayılan olarak, delta-spark tablo yazarlarını, kısıtlamaları uygulayarak mevcut şemaya uymaya zorlar. Ancak, belirli seçenekleri belirleyerek tablonun şemasını güvenle değiştirebiliriz.

İlk olarak, Delta tablosundaki verileri tekrar okuyalım. Çünkü bu veriler şu şekilde yazılmıştır: delta biçimini belirtmemiz gerekir. .format("delta") verileri okurken, Delta tablosunun bulunduğu S3 URI'sini sağlarız. İkinci olarak, DataFrame'i şema evrimini gösterdiğimiz farklı bir S3 konumuna geri yazıyoruz. Aşağıdaki koda bakın:

delta_df = (spark.read.format("delta").load(s3a_delta_table_uri))
delta_df.write.format("delta").mode("overwrite").save(s3a_delta_update_uri)

Şimdi veri kümemize iki yeni sütun eklemek için Spark DataFrame API'sini kullanıyoruz. Sütun adları funding_type ve excess_int_rate, ve sütun değerleri DataFrame kullanılarak sabitlere ayarlanır withColumn yöntem. Aşağıdaki koda bakın:

funding_type_col = "funding_type"
excess_int_rate_col = "excess_int_rate" delta_update_df = (delta_df.withColumn(funding_type_col, lit("NA")) .withColumn(excess_int_rate_col, lit(0.0)))
delta_update_df.dtypes

Veri türlerine hızlı bir bakış (dtypes) ek sütunların DataFrame'in parçası olduğunu gösterir.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Şimdi şema değişikliğini etkinleştiriyoruz, böylece Delta tablosunun altında yatan şemayı değiştiriyoruz. mergeSchema için seçenek true aşağıdaki Spark yazma komutunda:

(delta_update_df.write.format("delta") .mode("overwrite") .option("mergeSchema", "true") # option - evolve schema .save(s3a_delta_update_uri)
)

Tablo şemasının değiştirildiğini gösteren yeni tablomuzun değişiklik geçmişini kontrol edelim:

deltaTableUpdate = DeltaTable.forPath(spark, s3a_delta_update_uri) # Let's retrieve history BEFORE schema modification
history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

Geçmiş listesi, meta verilere yapılan her revizyonu gösterir.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Koşullu tablo güncellemeleri

Sen kullanabilirsiniz DeltaTable update bir yüklemi çalıştırma ve ardından koşul şu şekilde değerlendirildiğinde bir dönüşüm uygulama yöntemi True. Bizim durumumuzda değeri yazıyoruz FullyFunded için funding_type sütun ne zaman olursa olsun loan_amnt eşittir funded_amnt. Bu, tablo verilerinize koşullu güncellemeler yazmak için güçlü bir mekanizmadır.

deltaTableUpdate.update(condition = col("loan_amnt") == col("funded_amnt"), set = { funding_type_col: lit("FullyFunded") } )

Aşağıdaki ekran görüntüsü sonuçlarımızı göstermektedir.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Tablo verilerindeki son değişiklikte, güncelleme yöntemine bir işlevi iletmek için sözdizimini gösteriyoruz, bu bizim durumumuzda kredinin %10.0'ını çıkararak fazla faiz oranını hesaplıyor. int_rate bağlanmak. Bir SQL komutu, kayıtları bulmak için WHERE yan tümcesini kullanarak koşulumuzu karşılayan kayıtları çeker. int_rate %10.0'dan büyük:

# Function that calculates rate overage (amount over 10.0)
def excess_int_rate(rate): return (rate-10.0) deltaTableUpdate.update(condition = col("int_rate") > 10.0, set = { excess_int_rate_col: excess_int_rate(col("int_rate")) } )

Yeni excess_int_rate sütun şimdi doğru bir şekilde şunları içeriyor: int_rate eksi %10.0.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Son not defteri hücremiz, geçmişi tekrar Delta tablosundan alır, bu sefer şema değişikliği yapıldıktan sonraki değişiklikleri gösterir:

# Finally, let's retrieve table history AFTER the schema modifications history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

Aşağıdaki ekran görüntüsü sonuçlarımızı göstermektedir.
Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.

Sonuç

Açık kaynaklı Delta Lake biçiminde depolanan verilerle doğrudan etkileşim kurmak için SageMaker Studio not defterlerini kullanabilirsiniz. Bu gönderide, açık kaynak kullanarak bu verileri okuyan ve yazan örnek kod sağladık. delta-spark olarak veri kümesini oluşturmanıza, güncellemenize ve yönetmenize olanak sağlayan kitaplık Delta tablosu. Ayrıca, önceden var olan veri göllerinden değer elde etmek için bu kritik teknolojileri birleştirmenin gücünü gösterdik ve Delta Lake'in yeteneklerinin SageMaker'da nasıl kullanılacağını gösterdik.

Not defteri örneğimiz, önkoşulları yüklemek, Spark veri yapılarını başlatmak, Delta Lake biçiminde DataFrame'leri okumak ve yazmak ve şema evrimi gibi işlevleri kullanmak için uçtan uca bir tarif sunar. Dönüştürücü iş sonuçları sağlamak üzere güçlerini artırmak için bu teknolojileri entegre edebilirsiniz.


Yazarlar Hakkında

Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.paul hargis çabalarını AWS, Amazon ve Hortonworks dahil olmak üzere çeşitli şirketlerde Makine Öğrenimi üzerine yoğunlaştırdı. Teknoloji çözümleri oluşturmaktan ve ayrıca insanlara bundan en iyi şekilde nasıl yararlanacaklarını öğretmekten hoşlanıyor. AWS'deki görevinden önce, Amazon.com'un uluslararası alışveriş yapanlar için deneyimi iyileştirmesine yardımcı olan Amazon İhracat ve Genişletmeleri için baş mimardı. Paul, müşterilerin gerçek dünyadaki sorunları çözmek için makine öğrenimi girişimlerini genişletmelerine yardımcı olmayı sever.

Amazon SageMaker Studio ve Apache Spark PlatoBlockchain Data Intelligence kullanarak Delta Lake'ten veri yükleyin ve dönüştürün. Dikey Arama. Ai.Vedan Jain Müşterilerin AWS'deki Makine Öğrenimi ekosisteminden değer elde etmesine yardımcı olan bir Kıdemli AI/ML Uzman Çözüm Mimarıdır. AWS'ye katılmadan önce Vedant, Databricks, Hortonworks (şimdi Cloudera) ve JP Morgan Chase gibi çeşitli şirketlerde Makine Öğrenimi/Veri Bilimi Uzmanlığı pozisyonlarında bulundu. İşinin dışında, Vedant müzik yapmak, anlamlı bir yaşam sürmek için Bilimi kullanmak ve dünyanın dört bir yanından lezzetli vejetaryen mutfağı keşfetmek konusunda tutkulu.

Zaman Damgası:

Den fazla AWS Makine Öğrenimi