Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark

Datasøer er blevet normen i branchen til lagring af kritiske forretningsdata. Den primære begrundelse for en datasø er at lande alle typer data, fra rådata til forbehandlede og efterbehandlede data, og kan omfatte både strukturerede og ustrukturerede dataformater. At have et centraliseret datalager til alle typer data giver moderne big data-applikationer mulighed for at indlæse, transformere og behandle, uanset hvilken type data der er behov for. Fordelene omfatter lagring af data, som de er, uden først at skulle strukturere eller transformere dem. Vigtigst af alt tillader datasøer kontrolleret adgang til data fra mange forskellige typer analyse- og maskinlæringsprocesser (ML) for at vejlede bedre beslutningstagning.

Flere leverandører har skabt data sø-arkitekturer, herunder AWS søformation. Derudover giver open source-løsninger virksomheder mulighed for nemt at få adgang til, indlæse og dele data. En af mulighederne for at gemme data i AWS Cloud er Delta søen. Delta Lake-biblioteket muliggør læsning og skrivning i open source Apache parket filformat og giver funktioner som ACID-transaktioner, skalerbar metadatahåndtering og samlet streaming og batchdatabehandling. Delta Lake tilbyder en lagringslags-API, som du kan bruge til at gemme data oven på en objektlagslagring som Amazon Simple Storage Service (Amazon S3).

Data er kernen i ML – træning af en traditionel overvåget model er umulig uden adgang til historiske data af høj kvalitet, som almindeligvis er lagret i en datasø. Amazon SageMaker er en fuldt administreret tjeneste, der giver et alsidigt arbejdsbord til at bygge ML-løsninger og leverer meget skræddersyet værktøj til dataindtagelse, databehandling, modeltræning og modelhosting. Apache Spark er en arbejdshest af moderne databehandling med en omfattende API til indlæsning og manipulation af data. SageMaker har evnen til at forberede data i petabyte-skala ved hjælp af Spark for at aktivere ML-arbejdsgange, der kører på en meget distribueret måde. Dette indlæg fremhæver, hvordan du kan drage fordel af de muligheder, Delta Lake tilbyder ved hjælp af Amazon SageMaker Studio.

Løsningsoversigt

I dette indlæg beskriver vi, hvordan du bruger SageMaker Studio-notebooks til nemt at indlæse og transformere data, der er gemt i Delta Lake-formatet. Vi bruger en standard Jupyter-notesbog til at køre Apache Spark-kommandoer, der læser og skriver tabeldata i CSV- og Parquet-format. Open source-biblioteket delta-gnist giver dig direkte adgang til disse data i dets oprindelige format. Dette bibliotek giver dig mulighed for at drage fordel af de mange API-operationer til at anvende datatransformationer, foretage skemaændringer og bruge tidsrejse- eller tidsstempelforespørgsler til at trække en bestemt version af dataene.

I vores eksempelnotesbog indlæser vi rådata i en Spark DataFrame, opretter en Delta-tabel, forespørger på den, viser revisionshistorik, demonstrerer skemaudvikling og viser forskellige metoder til opdatering af tabeldataene. Vi bruger DataFrame API fra PySpark-biblioteket for at indtage og transformere datasættets attributter. Vi bruger delta-spark bibliotek til at læse og skrive data i Delta Lake-format og til at manipulere den underliggende tabelstruktur, kaldet skema.

Vi bruger SageMaker Studio, den indbyggede IDE fra SageMaker, til at skabe og køre Python-kode fra en Jupyter-notebook. Vi har skabt en GitHub repository der indeholder denne notesbog og andre ressourcer til at køre denne prøve på egen hånd. Notebooken demonstrerer nøjagtigt, hvordan man interagerer med data, der er gemt i Delta Lake-format, hvilket gør det muligt at få adgang til tabeller på stedet uden behov for at replikere data på tværs af forskellige datalagre.

Til dette eksempel bruger vi et offentligt tilgængeligt datasæt fra Udlånsklub der repræsenterer kundelånsdata. Vi downloadede accepted datafil (accepted_2007_to_2018Q4.csv.gz), og valgte en delmængde af de originale attributter. Dette datasæt er tilgængeligt under Creative Commons (CCO)-licens.

Forudsætninger

Du skal installere et par forudsætninger, før du bruger delta-spark funktionalitet. For at tilfredsstille påkrævede afhængigheder er vi nødt til at installere nogle biblioteker i vores Studio-miljø, som kører som en dockeriseret container og tilgås via en Jupyter Gateway-app:

  • OpenJDK for adgang til Java og tilhørende biblioteker
  • PySpark (Spark for Python) bibliotek
  • Delta Spark open source-bibliotek

Vi kan bruge begge dele conda or pip at installere disse biblioteker, som er offentligt tilgængelige i begge conda-forge, PyPI-servere eller Maven-lagre.

Denne notesbog er designet til at køre i SageMaker Studio. Når du har startet notesbogen i Studio, skal du sørge for at vælge Python 3 (Data Science) kernetype. Derudover foreslår vi at bruge en instanstype med mindst 16 GB RAM (som ml.g4dn.xlarge), som tillader PySpark-kommandoer at køre hurtigere. Brug følgende kommandoer til at installere de nødvendige afhængigheder, som udgør de første adskillige celler i notesbogen:

%conda install openjdk -q -y
%pip install pyspark==3.2.0
%pip install delta-spark==1.1.0
%pip install -U "sagemaker>2.72"

Når installationskommandoerne er fuldført, er vi klar til at køre kernelogikken i notesbogen.

Implementer løsningen

For at køre Apache Spark-kommandoer skal vi instansiere en SparkSession objekt. Efter at vi har inkluderet de nødvendige importkommandoer, konfigurerer vi SparkSession ved at indstille nogle yderligere konfigurationsparametre. Parameteren med nøgle spark.jars.packages videregiver navnene på yderligere biblioteker, som Spark bruger til at køre delta kommandoer. De følgende indledende kodelinjer samler en liste over pakker ved hjælp af traditionelle Maven-koordinater (groupId:artifactId:version), for at videregive disse ekstra pakker til SparkSession.

Derudover er parametrene med nøgle spark.sql.extensions , spark.sql.catalog.spark_catalog gør det muligt for Spark at håndtere Delta Lake-funktionaliteten korrekt. Den endelige konfigurationsparameter med nøgle fs.s3a.aws.credentials.provider tilføjer ContainerCredentialsProvider klasse, som giver Studio mulighed for at slå op AWS identitets- og adgangsstyring (IAM) rolletilladelser gjort tilgængelige via containermiljøet. Koden opretter en SparkSession objekt, der er korrekt initialiseret til SageMaker Studio-miljøet:

# Configure Spark to use additional library packages to satisfy dependencies # Build list of packages entries using Maven coordinates (groupId:artifactId:version)
pkg_list = []
pkg_list.append("io.delta:delta-core_2.12:1.1.0")
pkg_list.append("org.apache.hadoop:hadoop-aws:3.2.2") packages=(",".join(pkg_list))
print('packages: '+packages) # Instantiate Spark via builder
# Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions spark = (SparkSession .builder .appName("PySparkApp") .config("spark.jars.packages", packages) .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider") .getOrCreate()) sc = spark.sparkContext print('Spark version: '+str(sc.version))

I det næste afsnit uploader vi en fil, der indeholder en delmængde af Lending Club-forbrugslånsdatasættet til vores standard S3-spand. Det originale datasæt er meget stort (over 600 MB), så vi leverer en enkelt repræsentativ fil (2.6 MB) til brug for denne notesbog. PySpark bruger s3a protokol for at aktivere yderligere Hadoop-biblioteksfunktionalitet. Derfor ændrer vi hver native S3 URI fra s3 protokol at bruge s3a i cellerne i denne notesbog.

Vi bruger Spark til at indlæse rådataene (med muligheder for både CSV- eller Parquet-filer) med følgende kode, som returnerer en Spark DataFrame med navnet loans_df:

loans_df = spark.read.csv(s3a_raw_csv, header=True)

Følgende skærmbillede viser de første 10 rækker fra den resulterende DataFrame.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Vi kan nu skrive denne DataFrame ud som en Delta Lake-tabel med en enkelt kodelinje ved at specificere .format("delta") og give S3 URI-placeringen, hvor vi ønsker at skrive tabeldataene:

loans_df.write.format("delta").mode("overwrite").save(s3a_delta_table_uri)

De næste par notebook-celler viser en mulighed for at forespørge Delta Lake-tabellen. Vi kan konstruere en standard SQL-forespørgsel, specificer delta format og tabelplacering, og send denne kommando ved hjælp af Spark SQL-syntaks:

sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY loan_amnt'
sql_results = spark.sql(sql_cmd)

Følgende skærmbillede viser resultaterne af vores SQL-forespørgsel som sorteret efter loan_amnt.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Interager med Delta Lake-tabeller

I dette afsnit viser vi frem DeltaTable klasse fra delta-spark bibliotek. DeltaTable er den primære klasse til programmatisk interaktion med Delta Lake-tabeller. Denne klasse indeholder flere statiske metoder til at finde information om en tabel. For eksempel isDeltaTable metode returnerer en boolsk værdi, der angiver, om tabellen er gemt i delta-format:

# Use static method to determine table type
print(DeltaTable.isDeltaTable(spark, s3a_delta_table_uri))

Du kan oprette DeltaTable instanser, der bruger stien til Delta-tabellen, som i vores tilfælde er S3 URI-placeringen. I den følgende kode henter vi hele historikken for tabelændringer:

deltaTable = DeltaTable.forPath(spark, s3a_delta_table_uri)
history_df = deltaTable.history()
history_df.head(3)

Outputtet angiver, at tabellen har seks ændringer gemt i historikken og viser de seneste tre versioner.

Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Skema udvikling

I dette afsnit demonstrerer vi, hvordan Delta Lake-skemaudviklingen fungerer. Som standard, delta-spark tvinger tabelskrivninger til at overholde det eksisterende skema ved at håndhæve begrænsninger. Men ved at angive visse muligheder kan vi sikkert ændre tabellens skema.

Lad os først læse data tilbage fra Delta-tabellen. Fordi disse data blev skrevet ud som delta format, skal vi specificere .format("delta") når vi læser dataene, så giver vi S3 URI'en, hvor Delta-tabellen er placeret. For det andet skriver vi DataFrame tilbage til en anden S3-placering, hvor vi demonstrerer skemaudvikling. Se følgende kode:

delta_df = (spark.read.format("delta").load(s3a_delta_table_uri))
delta_df.write.format("delta").mode("overwrite").save(s3a_delta_update_uri)

Nu bruger vi Spark DataFrame API til at tilføje to nye kolonner til vores datasæt. Kolonnenavnene er funding_type , excess_int_rate, og kolonneværdierne indstilles til konstanter ved hjælp af DataFrame withColumn metode. Se følgende kode:

funding_type_col = "funding_type"
excess_int_rate_col = "excess_int_rate" delta_update_df = (delta_df.withColumn(funding_type_col, lit("NA")) .withColumn(excess_int_rate_col, lit(0.0)))
delta_update_df.dtypes

Et hurtigt kig på datatyperne (dtypes) viser, at de yderligere kolonner er en del af DataFrame.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Nu aktiverer vi skemaændringen, og ændrer derved det underliggende skema for Delta-tabellen ved at indstille mergeSchema mulighed for at true i følgende Spark-skrivekommando:

(delta_update_df.write.format("delta") .mode("overwrite") .option("mergeSchema", "true") # option - evolve schema .save(s3a_delta_update_uri)
)

Lad os tjekke ændringshistorikken for vores nye tabel, som viser, at tabelskemaet er blevet ændret:

deltaTableUpdate = DeltaTable.forPath(spark, s3a_delta_update_uri) # Let's retrieve history BEFORE schema modification
history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

Historielisten viser hver revision af metadataene.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Betingede tabelopdateringer

Du kan bruge DeltaTable update metode til at køre et prædikat og derefter anvende en transformation, når betingelsen evalueres til True. I vores tilfælde skriver vi værdien FullyFunded til funding_type kolonne, når som helst loan_amnt svarer til funded_amnt. Dette er en kraftfuld mekanisme til at skrive betingede opdateringer til dine tabeldata.

deltaTableUpdate.update(condition = col("loan_amnt") == col("funded_amnt"), set = { funding_type_col: lit("FullyFunded") } )

Følgende skærmbillede viser vores resultater.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

I den sidste ændring af tabeldata viser vi syntaksen for at videregive en funktion til opdateringsmetoden, som i vores tilfælde beregner overrenten ved at trække 10.0 % fra lånets int_rate attribut. Endnu en SQL-kommando trækker poster, der opfylder vores betingelse, ved at bruge WHERE-sætningen til at lokalisere poster med int_rate mere end 10.0 %:

# Function that calculates rate overage (amount over 10.0)
def excess_int_rate(rate): return (rate-10.0) deltaTableUpdate.update(condition = col("int_rate") > 10.0, set = { excess_int_rate_col: excess_int_rate(col("int_rate")) } )

Den nye excess_int_rate kolonnen indeholder nu korrekt int_rate minus 10.0 %.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Vores sidste notesbogscelle henter historien fra Delta-tabellen igen, denne gang viser ændringerne efter skemaændringen er blevet udført:

# Finally, let's retrieve table history AFTER the schema modifications history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

Følgende skærmbillede viser vores resultater.
Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Konklusion

Du kan bruge SageMaker Studio-notebooks til at interagere direkte med data, der er gemt i open source Delta Lake-formatet. I dette indlæg leverede vi eksempelkode, der læser og skriver disse data ved hjælp af open source delta-spark bibliotek, som giver dig mulighed for at oprette, opdatere og administrere datasættet som en Delta bord. Vi demonstrerede også styrken ved at kombinere disse kritiske teknologier for at udvinde værdi fra allerede eksisterende datasøer og viste, hvordan man bruger mulighederne i Delta Lake på SageMaker.

Vores notesbogseksempel giver en ende-til-ende-opskrift til installation af forudsætninger, instansiering af Spark-datastrukturer, læsning og skrivning af DataFrames i Delta Lake-format og brug af funktionaliteter som skemaudvikling. Du kan integrere disse teknologier for at forstørre deres kraft til at give transformative forretningsresultater.


Om forfatterne

Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Paul Hargis har fokuseret sin indsats på Machine Learning hos flere virksomheder, herunder AWS, Amazon og Hortonworks. Han nyder at bygge teknologiske løsninger og også lære folk, hvordan de får mest muligt ud af det. Før sin rolle hos AWS var han ledende arkitekt for Amazon Exports and Expansions, der hjalp amazon.com med at forbedre oplevelsen for internationale shoppere. Paul kan lide at hjælpe kunder med at udvide deres maskinlæringsinitiativer for at løse problemer i den virkelige verden.

Indlæs og transformer data fra Delta Lake ved hjælp af Amazon SageMaker Studio og Apache Spark PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Vedant Jain er en Sr. AI/ML Specialist Solutions Architect, der hjælper kunder med at få værdi ud af Machine Learning-økosystemet hos AWS. Inden han kom til AWS, har Vedant haft ML/Data Science Specialty-stillinger hos forskellige virksomheder såsom Databricks, Hortonworks (nu Cloudera) og JP Morgan Chase. Uden for sit arbejde brænder Vedant for at lave musik, bruge Science til at leve et meningsfuldt liv og udforske lækkert vegetarisk køkken fra hele verden.

Tidsstempel:

Mere fra AWS maskinindlæring