Αθροιστικός κρατικός μετασχηματισμός στο Apache Spark Streaming



Αυτή η ανάρτηση ιστολογίου ασχολείται με μετασχηματισμούς στο Spark Streaming. Μάθετε τα πάντα σχετικά με τη σωρευτική παρακολούθηση και τις δεξιότητες για μια καριέρα Hadoop Spark.

Συνεισφορά από τον Prithviraj Bose

Στο προηγούμενο ιστολόγιό μου, έχω συζητήσει κρατικούς μετασχηματισμούς χρησιμοποιώντας την εκπληκτική ιδέα του Apache Spark Streaming. Μπορείτε να το διαβάσετε εδώ .





Σε αυτήν την ανάρτηση θα συζητήσω αθροιστικές κρατικές λειτουργίες στο Apache Spark Streaming. Αν είστε νέοι στο Spark Streaming, σας συνιστώ ανεπιφύλακτα να διαβάσετε το προηγούμενο ιστολόγιό μου για να κατανοήσετε πώς λειτουργεί το Window.

Τύποι μετασχηματισμού Stateful σε ροή σπινθήρων (συνέχεια)

> Αθροιστική παρακολούθηση

Είχαμε χρησιμοποιήσει το μείωσηByKeyAndWindow (…) API για την παρακολούθηση των καταστάσεων των κλειδιών, ωστόσο, το παράθυρο δημιουργεί περιορισμούς για ορισμένες περιπτώσεις χρήσης. Τι γίνεται αν θέλουμε να συσσωρεύσουμε τις καταστάσεις των πλήκτρων καθ 'όλη την παρά να το περιορίσουμε σε ένα χρονικό παράθυρο; Σε αυτήν την περίπτωση θα πρέπει να χρησιμοποιήσουμε ενημέρωσηStateByKey (…) ΦΩΤΙΑ.



Αυτό το API παρουσιάστηκε στο Spark 1.3.0 και ήταν πολύ δημοφιλές. Ωστόσο, αυτό το API έχει κάποια γενική απόδοση, η απόδοσή του υποβαθμίζεται καθώς το μέγεθος των καταστάσεων αυξάνεται με την πάροδο του χρόνου. Έχω γράψει ένα δείγμα για να δείξω τη χρήση αυτού του API. Μπορείτε να βρείτε τον κωδικό εδώ .

πώς να δοκιμάσετε μια βάση δεδομένων

Το Spark 1.6.0 παρουσίασε ένα νέο API mapWithState (…) που επιλύει τα γενικά έξοδα απόδοσης που θέτει η ενημέρωσηStateByKey (…) . Σε αυτό το blog θα συζητήσω αυτό το συγκεκριμένο API χρησιμοποιώντας ένα δείγμα προγράμματος που έχω γράψει. Μπορείτε να βρείτε τον κωδικό εδώ .

Πριν βυθιστώ σε έναν κώδικα, ας αφήσουμε μερικές λέξεις στο σημείο ελέγχου. Για κάθε κατάσταση μετασχηματισμού, το σημείο ελέγχου είναι υποχρεωτικό. Το Checkpointing είναι ένας μηχανισμός για την επαναφορά της κατάστασης των κλειδιών σε περίπτωση αποτυχίας του προγράμματος οδήγησης. Κατά την επανεκκίνηση του προγράμματος οδήγησης, η κατάσταση των κλειδιών αποκαθίσταται από τα αρχεία ελέγχου. Οι τοποθεσίες σημείων ελέγχου είναι συνήθως HDFS ή Amazon S3 ή οποιαδήποτε αξιόπιστη αποθήκευση. Κατά τη δοκιμή του κώδικα, μπορεί κανείς να αποθηκεύσει και στο τοπικό σύστημα αρχείων.



Στο δείγμα προγράμματος, ακούμε ροή κειμένου υποδοχής στο host = localhost και port = 9999. Συμβολίζει την εισερχόμενη ροή σε (λέξεις, αριθμός συμβάντων) και παρακολουθεί τον αριθμό λέξεων χρησιμοποιώντας το 1.6.0 API mapWithState (…) . Επιπλέον, τα κλειδιά χωρίς ενημερώσεις καταργούνται χρησιμοποιώντας StateSpec.timeout API. Έχουμε έλεγχο σε HDFS και η συχνότητα ελέγχου είναι κάθε 20 δευτερόλεπτα.

Ας δημιουργήσουμε πρώτα μια περίοδο λειτουργίας ροής Spark,

Spark-streaming-session

Δημιουργούμε ένα σημείο ελέγχου Ντιρ στο HDFS και, στη συνέχεια, καλέστε τη μέθοδο αντικειμένου getOrCreate (…) . ο getOrCreate Το API ελέγχει το σημείο ελέγχου Ντιρ για να δείτε εάν υπάρχουν προηγούμενες καταστάσεις για επαναφορά, εάν υπάρχει, τότε δημιουργεί ξανά τη συνεδρία Spark Streaming και ενημερώνει τις καταστάσεις των κλειδιών από τα δεδομένα που είναι αποθηκευμένα στα αρχεία πριν προχωρήσετε με νέα δεδομένα. Διαφορετικά δημιουργεί μια νέα συνεδρία Spark Streaming.

ο getOrCreate παίρνει το όνομα καταλόγου σημείου ελέγχου και μια συνάρτηση (την οποία έχουμε ονομάσει δημιουργίαFunc ) του οποίου η υπογραφή πρέπει να είναι () => StreamingContext .

Ας εξετάσουμε τον κώδικα μέσα δημιουργίαFunc .

Γραμμή 2: Δημιουργούμε ένα περιβάλλον ροής με όνομα εργασίας στο 'TestMapWithStateJob' και μεσοδιαστήματα = 5 δευτερόλεπτα.

Γραμμή # 5: Ορίστε τον κατάλογο σημείων ελέγχου.

Γραμμή # 8: Ορίστε τις προδιαγραφές κατάστασης χρησιμοποιώντας την κλάση org.apache.streaming.StateSpec αντικείμενο. Αρχικά ορίσαμε τη συνάρτηση που θα παρακολουθεί την κατάσταση και μετά θα ορίσουμε τον αριθμό των κατατμήσεων για τα προκύπτοντα DStreams που θα δημιουργηθούν κατά τη διάρκεια μεταγενέστερων μετασχηματισμών. Τέλος, ορίζουμε το χρονικό όριο (σε 30 δευτερόλεπτα) όπου εάν δεν ληφθεί ενημέρωση για ένα κλειδί σε 30 δευτερόλεπτα τότε η κατάσταση κλειδιού θα αφαιρεθεί.

Γραμμή 12 #: Ρύθμιση της ροής υποδοχής, ισοπέδωση των εισερχόμενων δεδομένων παρτίδας, δημιουργία ζεύγους κλειδιού-τιμής, κλήση mapWithState , ορίστε το διάστημα ελέγχου στα 20s και τέλος εκτυπώστε τα αποτελέσματα.

Το πλαίσιο Spark καλεί th e createFunc για κάθε κλειδί με την προηγούμενη τιμή και την τρέχουσα κατάσταση. Υπολογίζουμε το άθροισμα και ενημερώνουμε την κατάσταση με το αθροιστικό άθροισμα και τέλος επιστρέφουμε το άθροισμα για το κλειδί.

πώς να χρησιμοποιήσετε το iterator java

Πηγές Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Έχετε μια ερώτηση για εμάς; Παρακαλώ αναφέρετέ το στην ενότητα σχολίων και θα επικοινωνήσουμε μαζί σας.

Σχετικές αναρτήσεις:

Ξεκινήστε με το Apache Spark & ​​Scala

Stateful Transformations with Windowing in Spark Streaming