Kumulativ stateful transformation i Apache Spark Streaming

Dette blogindlæg diskuterer stateful transformationer i Spark Streaming. Lær alt om kumulativ sporing og dygtighed til en Hadoop Spark-karriere.

Bidrag fra Prithviraj Bose



I min tidligere blog har jeg diskuteret statefulde transformationer ved hjælp af vinduesbegrebet Apache Spark Streaming. Du kan læse det her .



forskel mellem ansible og kok

I dette indlæg skal jeg diskutere kumulative stateful operationer i Apache Spark Streaming. Hvis du er nybegynder med Spark Streaming, anbefaler jeg dig stærkt at læse min tidligere blog for at forstå, hvordan windowsing fungerer.

Typer af stateful transformation i Spark Streaming (Fortsat ...)

> Kumulativ sporing

Vi havde brugt reducereByKeyAndWindow (...) API til at spore tilstanden af ​​nøgler, men vindue udgør begrænsninger for visse brugssager. Hvad hvis vi ønsker at akkumulere tilstanden af ​​tasterne igennem i stedet for at begrænse det til et tidsvindue? I så fald skal vi bruge updateStateByKey (…) ILD.



Denne API blev introduceret i Spark 1.3.0 og har været meget populær. Denne API har dog en vis ydeevneomkostning, dens ydeevne forringes, når staternes størrelse øges over tid. Jeg har skrevet en prøve for at vise brugen af ​​denne API. Du kan finde koden her .

Spark 1.6.0 introducerede en ny API mapWithState (…) som løser de omkostningsomkostninger, der udgør updateStateByKey (…) . I denne blog skal jeg diskutere denne særlige API ved hjælp af et eksempelprogram, som jeg har skrevet. Du kan finde koden her .

Før jeg dykker ned i en kodegennemgang, lad os spare et par ord på kontrolpunktet. For enhver stateful transformation er kontrolpunkt obligatorisk. Kontrolpunkt er en mekanisme til at gendanne nøglenes tilstand, hvis driverprogrammet mislykkes. Når driveren genstarter, gendannes tastenes tilstand fra kontrolpunktsfilerne. Checkpoint-placeringer er normalt HDFS eller Amazon S3 eller enhver pålidelig lagerplads. Mens man tester koden, kan man også gemme i det lokale filsystem.



I prøveprogrammet lytter vi til socket-tekststrøm på vært = localhost og port = 9999. Det tokeniserer den indgående strøm til (ord, antal forekomster) og sporer ordtællingen ved hjælp af 1.6.0 API mapWithState (…) . Derudover fjernes nøgler uden opdateringer vha StateSpec.timeout API. Vi kontrollerer i HDFS, og kontrolpunktfrekvensen er hvert 20. sekund.

Lad os først oprette en Spark Streaming-session,

Spark-streaming-session

Vi opretter en kontrolpunktDir i HDFS, og kald derefter objektmetoden getOrCreate (…) . Det getOrCreate API kontrollerer kontrolpunktDir for at se om der er nogen tidligere tilstande at gendanne, hvis det findes, genskaber det Spark Streaming-sessionen og opdaterer tilstandene for nøglerne fra de data, der er gemt i filerne, inden de går videre med nye data. Ellers opretter det en ny Spark Streaming-session.

Det getOrCreate tager kontrolpunktets katalognavn og en funktion (som vi har navngivet createFunc ) hvis underskrift skal være () => StreamingContext .

Lad os undersøge koden indeni createFunc .

Linje nr. 2: Vi opretter en streamingkontekst med jobnavn til “TestMapWithStateJob” og batchinterval = 5 sekunder.

Linje nr. 5: Indstil kontrolpunktbiblioteket.

Linje # 8: Indstil tilstandsspecifikationen ved hjælp af klassen org.apache.streaming.StateSpec objekt. Vi indstiller først den funktion, der vil spore tilstanden, derefter indstiller vi antallet af partitioner for de resulterende DStreams, der skal genereres under efterfølgende transformationer. Endelig indstiller vi timeout (til 30 sekunder), hvor hvis nogen opdatering til en nøgle ikke modtages på 30 sekunder, vil nøgletilstanden blive fjernet.

Linie 12 #: Opsæt stikkontakten, flad de indgående batchdata, opret et nøgleværdipar, ring mapWithState , indstil kontrolpunktsintervallet til 20'erne, og udskriv til sidst resultaterne.

Spark-rammen kalder th e createFunc for hver nøgle med den forrige værdi og den aktuelle tilstand. Vi beregner summen og opdaterer tilstanden med den kumulative sum, og til sidst returnerer vi summen for nøglen.

hvad er en forekomst af en klasse i java

Github Kilder -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Har du et spørgsmål til os? Nævn det i kommentarfeltet, og vi vender tilbage til dig.

Relaterede indlæg:

Kom godt i gang med Apache Spark & ​​Scala

Stateful Transformations with Windowing in Spark Streaming