RDD χρησιμοποιώντας το Spark: Το δομικό στοιχείο του Apache Spark



Αυτό το ιστολόγιο στο RDD που χρησιμοποιεί το Spark θα σας παρέχει μια λεπτομερή και περιεκτική γνώση του RDD, το οποίο είναι η θεμελιώδης ενότητα του Spark & ​​Πόσο χρήσιμο είναι.

, Η ίδια η λέξη είναι αρκετή για να δημιουργήσει μια σπίθα στο μυαλό κάθε μηχανικού Hadoop. ΠΡΟΣ ΤΟ n στη μνήμη εργαλείο επεξεργασίας η οποία είναι γρήγορη στην υπολογιστική συστάδα. Σε σύγκριση με το MapReduce, η κοινή χρήση δεδομένων στη μνήμη κάνει RDD 10-100χ γρηγορότερα από την κοινή χρήση δικτύου και δίσκου και όλα αυτά είναι δυνατά εξαιτίας RDD (σύνολα ανθεκτικών κατανεμημένων δεδομένων). Τα βασικά σημεία που εστιάζουμε σήμερα σε αυτό το RDD χρησιμοποιώντας το άρθρο Spark είναι:

Χρειάζεστε RDD;

Γιατί χρειαζόμαστε RDD; -RDD χρησιμοποιώντας το Spark





Ο κόσμος εξελίσσεται με και Επιστημονικά δεδομένα λόγω της προόδου στο . Αλγόριθμοι βασισμένο στο Οπισθοδρόμηση , , και που τρέχει Διανέμονται Επαναληπτικός υπολογισμός ιόντος μόδα που περιλαμβάνει επαναχρησιμοποίηση και κοινή χρήση δεδομένων μεταξύ πολλαπλών υπολογιστικών μονάδων.

Το παραδοσιακό οι τεχνικές χρειάζονταν έναν Σταθερό Ενδιάμεσο και Κατανεμημένο χώρο αποθήκευσης HDFS που περιλαμβάνει επαναλαμβανόμενους υπολογισμούς με αντιγραφές δεδομένων και σειριοποίηση δεδομένων, οι οποίοι καθιστούσαν τη διαδικασία πολύ πιο αργή. Η εύρεση λύσης δεν ήταν ποτέ εύκολη.



Εδώ είναι που RDD (Ανθεκτικά κατανεμημένα σύνολα δεδομένων) έρχεται στη μεγάλη εικόνα.

RDD Είναι εύκολο στη χρήση και χωρίς κόπο να δημιουργηθούν καθώς τα δεδομένα εισάγονται από πηγές δεδομένων και απορρίπτονται σε RDD. Περαιτέρω, οι λειτουργίες εφαρμόζονται για την επεξεργασία τους. Είναι ένα κατανεμημένη συλλογή μνήμης με δικαιώματα ως Μόνο για ανάγνωση και το πιο σημαντικό, είναι Ανθεκτικό σε σφάλματα .



Εάν υπάρχει διαμέρισμα δεδομένων του το RDD είναι χαμένος , μπορεί να αναγεννηθεί εφαρμόζοντας το ίδιο μεταμόρφωση λειτουργία σε αυτό το χαμένο διαμέρισμα στο καταγωγή , αντί να επεξεργαζόμαστε όλα τα δεδομένα από το μηδέν. Αυτό το είδος προσέγγισης σε σενάρια σε πραγματικό χρόνο μπορεί να προκαλέσει θαύματα σε καταστάσεις απώλειας δεδομένων ή όταν ένα σύστημα είναι εκτός λειτουργίας.

Τι είναι τα RDD;

RDD ή ( Ανθεκτικό σύνολο κατανεμημένων δεδομένων ) είναι θεμελιώδες δομή δεδομένων στο Spark. Ο όρος Ελαστικός ορίζει την ικανότητα που δημιουργεί τα δεδομένα αυτόματα ή δεδομένα γυρίζω πίσω στο αρχική κατάσταση όταν συμβαίνει μια απροσδόκητη καταστροφή με πιθανότητα απώλειας δεδομένων.

Τα δεδομένα που γράφονται σε RDD είναι χωρισμένο και αποθηκεύτηκε σε πολλαπλοί εκτελέσιμοι κόμβοι . Εάν ένας κόμβος εκτέλεσης αποτυγχάνει στο χρόνο εκτέλεσης, τότε παίρνει αμέσως το αντίγραφο ασφαλείας από το επόμενος εκτελέσιμος κόμβος . Αυτός είναι ο λόγος για τον οποίο τα RDD θεωρούνται ως προηγμένος τύπος δομών δεδομένων σε σύγκριση με άλλες παραδοσιακές δομές δεδομένων. Τα RDD μπορούν να αποθηκεύουν δομημένα, μη δομημένα και ημι-δομημένα δεδομένα.

Ας προχωρήσουμε με το RDD μας χρησιμοποιώντας το Spark blog και να μάθουμε για τις μοναδικές δυνατότητες των RDD που του δίνουν πλεονέκτημα έναντι άλλων τύπων δομών δεδομένων.

πώς να αποτρέψετε το αδιέξοδο στην Java

Χαρακτηριστικά του RDD

  • Εις μνήμην (ΕΜΒΟΛΟ) Υπολογισμοί : Η έννοια του υπολογισμού In-Memory μεταφέρει την επεξεργασία δεδομένων σε ένα πιο γρήγορο και αποτελεσματικό στάδιο όπου το συνολικό εκτέλεση του συστήματος είναι αναβαθμίστηκε.
  • μεγάλο η αξιολόγησή του : Ο όρος Lazy αξιολόγηση λέει το μετασχηματισμοί εφαρμόζονται στα δεδομένα σε RDD, αλλά η έξοδος δεν δημιουργείται. Αντ 'αυτού, οι εφαρμοσμένοι μετασχηματισμοί είναι καταγράφηκε.
  • επιμονή : Οι προκύπτουσες RDD είναι πάντα επαναχρησιμοποιήσιμο.
  • Χονδροειδείς λειτουργίες : Ο χρήστης μπορεί να εφαρμόσει μετασχηματισμούς σε όλα τα στοιχεία σε σύνολα δεδομένων μέσω χάρτης, φίλτρο ή ομάδα από λειτουργίες.
  • Ανεκτικό σφάλμα : Εάν υπάρχει απώλεια δεδομένων, το σύστημα μπορεί γυρίστε πίσω σε αυτό αρχική κατάσταση χρησιμοποιώντας το καταγεγραμμένο μετασχηματισμοί .
  • Αμετάβλητο : Τα δεδομένα που ορίζονται, ανακτώνται ή δημιουργούνται δεν μπορούν να είναι άλλαξε μόλις συνδεθεί στο σύστημα. Σε περίπτωση που πρέπει να έχετε πρόσβαση και να τροποποιήσετε το υπάρχον RDD, πρέπει να δημιουργήσετε ένα νέο RDD εφαρμόζοντας ένα σύνολο Μεταμόρφωση λειτουργεί στο τρέχον ή προηγούμενο RDD.
  • Διαμέριση : Είναι το κρίσιμη μονάδα του παραλληλισμού στο Spark RDD. Από προεπιλογή, ο αριθμός των κατατμήσεων που δημιουργούνται βασίζεται στην πηγή δεδομένων σας. Μπορείτε ακόμη και να αποφασίσετε τον αριθμό των κατατμήσεων που θέλετε να χρησιμοποιήσετε προσαρμοσμένο διαμέρισμα λειτουργίες.

Δημιουργία RDD με χρήση Spark

RDDs μπορούν να δημιουργηθούν σε τρεις τρόποι:

  1. Ανάγνωση δεδομένων από παράλληλες συλλογές
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () αποτέλεσμαRDD.collect () ) .foreach (println)
  1. Εφαρμογή μεταμόρφωση σε προηγούμενες RDD
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Ανάγνωση δεδομένων από εξωτερική αποθήκευση ή διαδρομές αρχείων όπως HDFS ή HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Λειτουργίες που εκτελούνται σε RDD:

Υπάρχουν κυρίως δύο τύποι λειτουργιών που εκτελούνται σε RDD, δηλαδή:

  • Μεταμορφώσεις
  • Ενέργειες

Μεταμορφώσεις : ο λειτουργίες εφαρμόζουμε σε RDD σε φίλτρο, πρόσβαση και τροποποιώ τα δεδομένα στο γονικό RDD για τη δημιουργία α διαδοχική RDD λέγεται μεταμόρφωση . Το νέο RDD επιστρέφει ένα δείκτη στο προηγούμενο RDD διασφαλίζοντας την εξάρτηση μεταξύ τους.

Οι μετασχηματισμοί είναι Lazy Αξιολογήσεις, Με άλλα λόγια, οι λειτουργίες που εφαρμόζονται στο RDD που εργάζεστε θα καταγράφονται αλλά όχι εκτελέστηκε. Το σύστημα ρίχνει ένα αποτέλεσμα ή εξαίρεση μετά την ενεργοποίηση του Δράση .

Μπορούμε να χωρίσουμε τους μετασχηματισμούς σε δύο τύπους όπως παρακάτω:

  • Στενοί μετασχηματισμοί
  • Ευρείς μετασχηματισμοί

Στενοί μετασχηματισμοί Εφαρμόζουμε στενούς μετασχηματισμούς σε ένα ένα διαμέρισμα του γονικού RDD για τη δημιουργία ενός νέου RDD καθώς τα δεδομένα που απαιτούνται για την επεξεργασία του RDD είναι διαθέσιμα σε ένα μόνο διαμέρισμα του μητρική ASD . Τα παραδείγματα για στενούς μετασχηματισμούς είναι:

  • χάρτης()
  • φίλτρο()
  • flatMap ()
  • χώρισμα()
  • mapPartitions ()

Ευρείες μεταμορφώσεις: Εφαρμόζουμε την ευρεία μεταμόρφωση πολλαπλά διαμερίσματα για να δημιουργήσετε ένα νέο RDD. Τα δεδομένα που απαιτούνται για την επεξεργασία του RDD είναι διαθέσιμα στα πολλαπλά διαμερίσματα του μητρική ASD . Τα παραδείγματα για ευρείς μετασχηματισμούς είναι:

  • Μειώστε με ()
  • ένωση()

Ενέργειες : Οι ενέργειες δίνουν οδηγίες στο Apache Spark να εφαρμόσει υπολογισμός και περάστε το αποτέλεσμα ή μια εξαίρεση πίσω στο πρόγραμμα οδήγησης RDD. Λίγες από τις δράσεις περιλαμβάνουν:

  • συλλέγω()
  • μετρώ()
  • παίρνω()
  • πρώτα()

Ας εφαρμόσουμε πρακτικά τις λειτουργίες σε RDD:

IPL (Ινδική Premier League) είναι ένα τουρνουά κρίκετ με υψηλό επίπεδο. Λοιπόν, ας πάρουμε σήμερα τα χέρια μας στο σύνολο δεδομένων IPL και να εκτελέσουμε το RDD χρησιμοποιώντας το Spark.

  • Πρώτα, ας κατεβάσουμε δεδομένα αντιστοίχισης CSV του IPL. Μετά τη λήψη του, αρχίζει να φαίνεται ως αρχείο EXCEL με σειρές και στήλες.

Στο επόμενο βήμα, ενεργοποιούμε τη σπίθα και φορτώνουμε το αρχείο match.csv από τη θέση του, στην περίπτωσή μουcsvη τοποθεσία του αρχείου είναι '/User/edureka_566977/test/matches.csv'

Τώρα ας ξεκινήσουμε με το Μεταμόρφωση πρώτο μέρος:

  • χάρτης():

Χρησιμοποιούμε Μεταμόρφωση χάρτη για να εφαρμόσετε μια συγκεκριμένη λειτουργία μετασχηματισμού σε κάθε στοιχείο μιας RDD. Εδώ δημιουργούμε ένα RDD με το όνομα CKfile όπου αποθηκεύουμε το δικό μαςcsvαρχείο. Θα δημιουργήσουμε μια άλλη ΕΑΑ που ονομάζεται Κράτη αποθηκεύστε τις λεπτομέρειες της πόλης .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) States.collect (). foreach (println)

  • φίλτρο():

Φίλτρο μετασχηματισμού, το ίδιο το όνομα περιγράφει τη χρήση του. Χρησιμοποιούμε αυτήν τη λειτουργία μετασχηματισμού για να φιλτράρουμε τα επιλεκτικά δεδομένα από μια συλλογή δεδομένων που δίνεται. Εφαρμόζουμε λειτουργία φίλτρου εδώ για να λάβετε τα ρεκόρ των αγώνων IPL της χρονιάς 2017 και αποθηκεύστε το σε fil RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Εφαρμόζουμε το FlatMap είναι μια λειτουργία μετασχηματισμού σε καθένα από τα στοιχεία ενός RDD για τη δημιουργία ενός νέουRDD. Είναι παρόμοιο με το μετασχηματισμό του χάρτη. εδώ εφαρμόζουμεFlatmapπρος το φτύσει τους αγώνες της πόλης Χαϊντεραμπάντ και αποθηκεύστε τα δεδομέναfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). συλλογή ()

  • χώρισμα():

Κάθε δεδομένα που γράφουμε σε RDD χωρίζεται σε έναν ορισμένο αριθμό κατατμήσεων. Χρησιμοποιούμε αυτόν τον μετασχηματισμό για να βρούμε το αριθμός κατατμήσεων τα δεδομένα χωρίζονται σε.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Θεωρούμε το MapPatitions ως εναλλακτική λύση του Map () καιγια κάθε() μαζί. Χρησιμοποιούμε το mapPartitions εδώ για να βρούμε το αριθμός σειρών έχουμε στο RDD fil μας.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • μειώστε με ():

ΧρησιμοποιούμεΜειώστε() επί Ζεύγη κλειδιού-τιμής . Χρησιμοποιήσαμε αυτόν τον μετασχηματισμό στο δικό μαςcsvαρχείο για να βρείτε το πρόγραμμα αναπαραγωγής με το υψηλότερο άτομο των αγώνων .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • ένωση():

Το όνομα τα εξηγεί όλα, Χρησιμοποιούμε την ένωση μετασχηματισμού είναι λέσχη δύο RDDs μαζί . Εδώ δημιουργούμε δύο RDD, δηλαδή fil και fil2. Το fil RDD περιέχει τις εγγραφές αγώνων IPL 2017 και το fil2 RDD περιέχει εγγραφή αγώνων IPL 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Ας ξεκινήσουμε με το Δράση μέρος όπου παρουσιάζουμε την πραγματική παραγωγή:

  • συλλέγω():

Συλλογή είναι η δράση στην οποία χρησιμοποιούμε εμφανίστε τα περιεχόμενα στο RDD.

τι είναι σειριοποιήσιμο στην Java
val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • μετρώ():

μετρώείναι μια ενέργεια που χρησιμοποιούμε για να μετρήσουμε το αριθμός εγγραφών παρόν στο RDD.Εδώχρησιμοποιούμε αυτήν τη λειτουργία για να μετρήσουμε τον συνολικό αριθμό εγγραφών στο αρχείο match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • παίρνω():

Η λήψη είναι μια ενέργεια δράσης παρόμοια με τη συλλογή, αλλά η μόνη διαφορά είναι ότι μπορεί να εκτυπώσει οποιαδήποτε επιλεκτικός αριθμός σειρών σύμφωνα με το αίτημα του χρήστη. Εδώ εφαρμόζουμε τον ακόλουθο κωδικό για την εκτύπωση του κορυφαίες δέκα κορυφαίες αναφορές.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. λήψη (10) .foreach (println)

  • πρώτα():

Το πρώτο () είναι μια ενέργεια δράσης παρόμοια με τη συλλογή () και τη λήψη ()τοχρησιμοποιείται για την εκτύπωση της κορυφαίας αναφοράς s την έξοδο Εδώ χρησιμοποιούμε την πρώτη () λειτουργία για να βρούμε το μέγιστος αριθμός αγώνων που παίζονται σε μια συγκεκριμένη πόλη και παίρνουμε τη Βομβάη ως έξοδο.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val states = CKfile.map (_. split (',') (2)) val Scount = States.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Για να κάνουμε τη διαδικασία μας μάθηση RDD χρησιμοποιώντας Spark, ακόμη περισσότερο, ενδιαφέρουσα, έχω βρει μια ενδιαφέρουσα περίπτωση χρήσης.

RDD χρησιμοποιώντας Spark: Pokemon Use Case

  • Πρώτα, Ας κατεβάσουμε ένα αρχείο Pokemon.csv και να το φορτώσουμε στο σπινθήρα όπως κάναμε και στο αρχείο Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Τα Pokemons είναι πραγματικά διαθέσιμα σε μεγάλη ποικιλία. Ας βρούμε μερικές ποικιλίες.

  • Κατάργηση σχήματος από το αρχείο Pokemon.csv

Ίσως δεν χρειαζόμαστε Σχέδιο του αρχείου Pokemon.csv. Ως εκ τούτου, το αφαιρούμε.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Εύρεση του αριθμού των χωρίσματα το pokemon.csv διανέμεται στο.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Νερό Pokemon

Βρίσκοντας το αριθμός Pokemon νερού

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Φωτιά Pokemon

Βρίσκοντας το αριθμός Pokemon Fire

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Μπορούμε επίσης να εντοπίσουμε το πληθυσμός διαφορετικού τύπου pokemon χρησιμοποιώντας τη συνάρτηση μέτρησης
WaterRDD.count () FireRDD.count ()

  • Δεδομένου ότι μου αρέσει το παιχνίδι αμυντική στρατηγική ας βρούμε το pokemon με μέγιστη άμυνα.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Γνωρίζουμε το μέγιστο αξία δύναμης άμυνας αλλά δεν ξέρουμε ποιο είναι το pokemon. λοιπόν, ας βρούμε ποιο είναι αυτό ΠΟΚΕΜΟΝ.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Παραγγελία [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Τώρα ας τακτοποιήσουμε το pokemon λιγότερο άμυνα
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Τώρα ας δούμε το Pokemon με ένα λιγότερο αμυντική στρατηγική.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (γραμμή =>! line.equals (Head)) val defWith .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Παραγγελία [Διπλό ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Έτσι, με αυτό, φτάνουμε στο τέλος αυτού του RDD χρησιμοποιώντας το άρθρο Spark. Ελπίζω να ρίξουμε λίγο φως στις γνώσεις σας σχετικά με τα RDD, τα χαρακτηριστικά τους και τους διάφορους τύπους λειτουργιών που μπορούν να εκτελεστούν σε αυτά.

Αυτό το άρθρο βασίζεται σε έχει σχεδιαστεί για να σας προετοιμάζει για τις εξετάσεις πιστοποίησης Cloudera Hadoop και Spark Developer (CCA175). Θα λάβετε μια εις βάθος γνώση για το Apache Spark και το Spark Ecosystem, το οποίο περιλαμβάνει Spark RDD, Spark SQL, Spark MLlib και Spark Streaming. Θα αποκτήσετε ολοκληρωμένες γνώσεις σχετικά με τη γλώσσα προγραμματισμού Scala, HDFS, Sqoop, Flume, Spark GraphX ​​και σύστημα ανταλλαγής μηνυμάτων όπως το Kafka.