Tutorial Streaming Spark - Ανάλυση συναισθημάτων με χρήση του Apache Spark



Αυτό το blog Spark Streaming θα σας παρουσιάσει το Spark Streaming, τις δυνατότητες και τα στοιχεία του. Περιλαμβάνει ένα έργο ανάλυσης συναισθημάτων που χρησιμοποιεί το Twitter.

Το Spark Streaming είναι μια επέκταση του πυρήνα Spark API που επιτρέπει την επεκτάσιμη, υψηλής απόδοσης, ανεκτική βλάβη επεξεργασία ροής ζωντανών ροών δεδομένων. Το Spark Streaming μπορεί να χρησιμοποιηθεί για ροή ζωντανών δεδομένων και η επεξεργασία μπορεί να πραγματοποιηθεί σε πραγματικό χρόνο. Η συνεχώς αυξανόμενη βάση χρηστών του Spark Streaming αποτελείται από οικιακά ονόματα όπως Uber, Netflix και Pinterest.

Όσον αφορά το Real Time Data Analytics, το Spark Streaming παρέχει μια μοναδική πλατφόρμα για την απορρόφηση δεδομένων για γρήγορη και ζωντανή επεξεργασία και αποδεικνύει την ικανότητά σας στο ίδιο.Μέσω αυτού του ιστολογίου, θα σας παρουσιάσω σε αυτόν τον νέο συναρπαστικό τομέα του Spark Streaming και θα περάσουμε από μια πλήρη περίπτωση χρήσης, Ανάλυση συναισθημάτων Twitter χρησιμοποιώντας το Spark Streaming.





Τα παρακάτω είναι τα θέματα που θα καλυφθούν σε αυτό το ιστολόγιο:

  1. Τι είναι η ροή;
  2. Γιατί η ροή Spark;
  3. Επισκόπηση ροής σπινθήρων
  4. Λειτουργίες ροής σπινθήρων
  5. Βασικές αρχές ροής σπινθήρων
    5.1 Περιεχόμενο ροής
    5.2 DStream
    5.3 Caching / Ανθεκτικότητα
    5.4 Συσσωρευτές, μεταβλητές μετάδοσης και σημεία ελέγχου
  6. Χρήση περίπτωσης - Ανάλυση συναισθημάτων Twitter

Τι είναι η ροή;

Η ροή δεδομένων είναι μια τεχνική για τη μεταφορά δεδομένων έτσι ώστε να μπορεί να υποστεί επεξεργασία ως σταθερή και συνεχής ροή. Οι τεχνολογίες ροής γίνονται όλο και πιο σημαντικές με την ανάπτυξη του Διαδικτύου.



Τι είναι η ροή - Spark Streaming - EdurekaΦιγούρα: Τι είναι η ροή;

Γιατί η ροή Spark;

Μπορούμε να χρησιμοποιήσουμε τη ροή Spark για ροή δεδομένων σε πραγματικό χρόνο από διάφορες πηγές όπως το Twitter, το χρηματιστήριο και τα γεωγραφικά συστήματα και να εκτελέσουμε ισχυρά αναλυτικά στοιχεία για να βοηθήσουμε τις επιχειρήσεις.

Φιγούρα: Γιατί η ροή Spark;



Επισκόπηση ροής σπινθήρων

Ροή σπινθήρων χρησιμοποιείται για την επεξεργασία δεδομένων ροής σε πραγματικό χρόνο. Είναι μια χρήσιμη προσθήκη στο βασικό API Spark. Το Spark Streaming επιτρέπει την επεξεργασία ροής υψηλής απόδοσης και ανεκτικό σε σφάλματα ζωντανών ροών δεδομένων.

Φιγούρα: Ροές σε ροή Spark

Η βασική μονάδα ροής είναι το DStreamπου είναι βασικά μια σειρά RDD για την επεξεργασία δεδομένων σε πραγματικό χρόνο.

Λειτουργίες ροής σπινθήρων

  1. Απολέπιση: Το Spark Streaming μπορεί εύκολα να κλιμακωθεί σε εκατοντάδες κόμβους.
  2. Ταχύτητα: Είναι έναχαμηλή καθυστέρηση.
  3. Ανοχή σε σφάλματα: Το Spark έχει την ικανότητα να ενα ανακάμψει αποτελεσματικά από αστοχίες.
  4. Ενσωμάτωση: Το Spark ενσωματώνεται σε επεξεργασία παρτίδας και σε πραγματικό χρόνο.
  5. Επιχειρηματική Ανάλυση: Το Spark Streaming είναι εσείςνα παρακολουθείτε τη συμπεριφορά των πελατών που μπορεί να χρησιμοποιηθεί στην ανάλυση των επιχειρήσεων.

Ροή εργασίας ροής σπινθήρων

Η ροή εργασίας του Spark Streaming έχει τέσσερα στάδια υψηλού επιπέδου. Το πρώτο είναι η ροή δεδομένων από διάφορες πηγές. Αυτές οι πηγές μπορεί να είναι πηγές ροής δεδομένων όπως Akka, Kafka, Flume, AWS ή Parquet για ροή σε πραγματικό χρόνο. Ο δεύτερος τύπος πηγών περιλαμβάνει HBase, MySQL, PostgreSQL, Elastic Search, Mongo DB και Cassandra για στατική / παρτίδα ροή. Μόλις συμβεί αυτό, το Spark μπορεί να χρησιμοποιηθεί για την εκτέλεση μηχανικής εκμάθησης στα δεδομένα μέσω του API MLlib. Επιπλέον, το Spark SQL χρησιμοποιείται για την εκτέλεση περαιτέρω λειτουργιών σε αυτά τα δεδομένα. Τέλος, η έξοδος ροής μπορεί να αποθηκευτεί σε διάφορα συστήματα αποθήκευσης δεδομένων όπως HBase, Cassandra, MemSQL, Kafka, Elastic Search, HDFS και τοπικό σύστημα αρχείων.

Φιγούρα: Επισκόπηση της ροής Spark

Βασικές αρχές ροής σπινθήρων

  1. Περιεχόμενο ροής
  2. DStream
  3. Προσωρινή αποθήκευση
  4. Συσσωρευτές, μεταβλητές μετάδοσης και σημεία ελέγχου

Περιεχόμενο ροής

Περιεχόμενο ροής καταναλώνει μια ροή δεδομένων στο Spark. Καταγράφει ένα Εισαγωγή DStream να παράγει ένα Δέκτης αντικείμενο. Είναι το κύριο σημείο εισόδου για τη λειτουργία του Spark. Το Spark παρέχει μια σειρά προεπιλεγμένων εφαρμογών πηγών όπως το Twitter, το Akka Actor και το ZeroMQ που είναι προσβάσιμα από το περιβάλλον.

Ένα αντικείμενο StreamingContext μπορεί να δημιουργηθεί από ένα αντικείμενο SparkContext. Ένα SparkContext αντιπροσωπεύει τη σύνδεση σε ένα σύμπλεγμα Spark και μπορεί να χρησιμοποιηθεί για τη δημιουργία RDD, συσσωρευτών και μεταβλητών εκπομπής σε αυτό το σύμπλεγμα.

εισαγωγή org.apache.spark._ εισαγωγή org.apache.spark.streaming._ var ssc = νέο StreamingContext (sc, Seconds (1))

DStream

Διακριτική ροή (DStream) είναι η βασική αφαίρεση που παρέχεται από το Spark Streaming. Είναι μια συνεχής ροή δεδομένων. Λαμβάνεται από μια πηγή δεδομένων ή μια επεξεργασμένη ροή δεδομένων που δημιουργείται μετασχηματίζοντας τη ροή εισόδου.

Φιγούρα: Εξαγωγή λέξεων από μια Εισαγωγή DStream

Εσωτερικά, ένα DStream αντιπροσωπεύεται από μια συνεχή σειρά RDDs και κάθε RDD περιέχει δεδομένα από ένα συγκεκριμένο διάστημα.

Εισαγωγή DStreams: Εισαγωγή DStreams είναι DStreams που αντιπροσωπεύουν τη ροή δεδομένων εισαγωγής που λαμβάνονται από πηγές ροής.

Φιγούρα: Ο δέκτης στέλνει δεδομένα στο Input DStream όπου κάθε παρτίδα περιέχει RDD

Κάθε είσοδος DStream σχετίζεται με ένα αντικείμενο δέκτη που λαμβάνει τα δεδομένα από μια πηγή και τα αποθηκεύει στη μνήμη του Spark για επεξεργασία.

Μετασχηματισμοί σε DStreams:

Κάθε λειτουργία που εφαρμόζεται σε μια ροή DSt μεταφράζεται σε λειτουργίες των υποκείμενων RDD. Οι μετασχηματισμοί επιτρέπουν στα δεδομένα από την είσοδο DStream να τροποποιούνται παρόμοια με τα RDD. Τα DStreams υποστηρίζουν πολλούς από τους διαθέσιμους μετασχηματισμούς σε κανονικά Spark RDDs.

Φιγούρα: Μετασχηματισμοί DStream

Τα παρακάτω είναι μερικοί από τους δημοφιλείς μετασχηματισμούς στο DStreams:

χάρτης( func )χάρτης( func ) επιστρέφει ένα νέο DStream περνώντας κάθε στοιχείο της πηγής DStream μέσω μιας συνάρτησης func.
flatMap ( func )flatMap ( func ) είναι παρόμοιο με το χάρτη ( func ) αλλά κάθε στοιχείο εισαγωγής μπορεί να αντιστοιχιστεί σε 0 ή περισσότερα στοιχεία εξόδου και επιστρέφει ένα νέο DStream περνώντας κάθε στοιχείο προέλευσης μέσω μιας συνάρτησης func.
φίλτρο( func )φίλτρο( func ) επιστρέφει ένα νέο DStream επιλέγοντας μόνο τις εγγραφές της πηγής DStream στην οποία func επιστρέφει αλήθεια.
περιορίζω( func )περιορίζω( func ) επιστρέφει ένα νέο DStream RDD ενός στοιχείου συγκεντρώνοντας τα στοιχεία σε κάθε RDD της πηγής DStream χρησιμοποιώντας μια συνάρτηση func .
groupBy ( func )groupBy ( func ) επιστρέφει το νέο RDD που βασικά αποτελείται από ένα κλειδί και αντίστοιχη λίστα στοιχείων αυτής της ομάδας.

Έξοδος DStreams:

Οι λειτουργίες εξόδου επιτρέπουν την προώθηση των δεδομένων της DStream σε εξωτερικά συστήματα όπως βάσεις δεδομένων ή συστήματα αρχείων. Οι λειτουργίες εξόδου ενεργοποιούν την πραγματική εκτέλεση όλων των μετασχηματισμών DStream.

Φιγούρα: Λειτουργίες εξόδου σε DStreams

Προσωρινή αποθήκευση

DStreams επιτρέψτε στους προγραμματιστές να αποθηκεύουν προσωρινά τα δεδομένα της ροής στη μνήμη. Αυτό είναι χρήσιμο εάν τα δεδομένα στο DStream θα υπολογιστούν πολλές φορές. Αυτό μπορεί να γίνει χρησιμοποιώντας το επιμένω() μέθοδος σε DStream.

Φιγούρα: Προσωρινή αποθήκευση σε 2 κόμβους

Για ροές εισόδου που λαμβάνουν δεδομένα μέσω του δικτύου (όπως Kafka, Flume, Sockets κ.λπ.),Το προεπιλεγμένο επίπεδο επιμονής έχει ρυθμιστεί για να αναπαράγει τα δεδομένα σε δύο κόμβους για ανοχή σφαλμάτων.

Συσσωρευτές, μεταβλητές μετάδοσης και σημεία ελέγχου

Συσσωρευτές: Συσσωρευτές είναι μεταβλητές που προστίθενται μόνο μέσω μιας συσχετιστικής και εναλλακτικής λειτουργίας. Χρησιμοποιούνται για την εφαρμογή μετρητών ή ποσών. Η παρακολούθηση των συσσωρευτών στο περιβάλλον χρήστη μπορεί να είναι χρήσιμη για την κατανόηση της προόδου των σταδίων εκτέλεσης. Το Spark υποστηρίζει εγγενώς αριθμητικούς συσσωρευτές. Μπορούμε να δημιουργήσουμε ονομαστικούς ή ανώνυμους συσσωρευτές.

Μεταβλητές μετάδοσης: Μεταβλητές μετάδοσης επιτρέψτε στον προγραμματιστή να διατηρεί μια μεταβλητή μόνο για ανάγνωση σε κρυφή μνήμη σε κάθε μηχάνημα αντί να στέλνει ένα αντίγραφο της με εργασίες. Μπορούν να χρησιμοποιηθούν για να δώσουν σε κάθε κόμβο ένα αντίγραφο ενός μεγάλου συνόλου δεδομένων εισόδου με αποτελεσματικό τρόπο. Ο Spark προσπαθεί επίσης να διανείμει μεταβλητές μετάδοσης χρησιμοποιώντας αποτελεσματικούς αλγόριθμους μετάδοσης για τη μείωση του κόστους επικοινωνίας.

Σημεία ελέγχου: Σημεία ελέγχου είναι παρόμοια με τα σημεία ελέγχου στο παιχνίδι. Το κάνουν να λειτουργεί 24/7 και το καθιστούν ανθεκτικό σε αστοχίες που δεν σχετίζονται με τη λογική της εφαρμογής.


Φιγούρα:
Χαρακτηριστικά των σημείων ελέγχου

Χρήση περίπτωσης - Ανάλυση συναισθημάτων Twitter

Τώρα που έχουμε κατανοήσει τις βασικές έννοιες του Spark Streaming, ας λύσουμε ένα πραγματικό πρόβλημα χρησιμοποιώντας το Spark Streaming.

Δήλωση προβλήματος: Για να σχεδιάσουμε ένα Σύστημα Ανάλυσης Συναισθημάτων Twitter, όπου συγκεντρώνουμε συναισθήματα σε πραγματικό χρόνο για διαχείριση κρίσεων, προσαρμογή υπηρεσιών και μάρκετινγκ στόχων.

Εφαρμογές ανάλυσης συναισθημάτων:

πίνακας αντικειμένων σε Java
  • Προβλέψτε την επιτυχία μιας ταινίας
  • Προβλέψτε την επιτυχία της πολιτικής εκστρατείας
  • Αποφασίστε εάν θα επενδύσετε σε μια συγκεκριμένη εταιρεία
  • Στοχευμένη διαφήμιση
  • Αξιολογήστε προϊόντα και υπηρεσίες

Εφαρμογή ροής Spark:

Βρείτε τον Ψευδο Κωδικό παρακάτω:

// Εισαγάγετε τα απαραίτητα πακέτα στο Spark Program import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.SparkContext._ ... import java.io.File αντικείμενο twitterSentiment {def main (args : Array [String]) {if (args.length<4) { System.err.println('Usage: TwitterPopularTags ' + ' []') System.exit(1) } StreamingExamples.setStreamingLogLevels() //Passing our Twitter keys and tokens as arguments for authorization val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = args.takeRight(args.length - 4) // Set the system properties so that Twitter4j library used by twitter stream // Use them to generate OAuth credentials System.setProperty('twitter4j.oauth.consumerKey', consumerKey) ... System.setProperty('twitter4j.oauth.accessTokenSecret', accessTokenSecret) val sparkConf = new SparkConf().setAppName('twitterSentiment').setMaster('local[2]') val ssc = new Streaming Context val stream = TwitterUtils.createStream(ssc, None, filters) //Input DStream transformation using flatMap val tags = stream.flatMap { status =>Λήψη κειμένου από το Hashtags} // Μετασχηματισμός RDD χρησιμοποιώντας το sortBy και, στη συνέχεια, αντιστοιχίστε τη συνάρτηση tags.countByValue () .foreachRDD {rdd => val now = Λάβετε την τρέχουσα ώρα κάθε Tweet rdd. (x, τώρα)) // Αποθήκευση της εξόδου μας στον κατάλογο ~ / twitter / .saveAsTextFile (s '~ / twitter / $ now')} // Μετασχηματισμός DStream χρησιμοποιώντας λειτουργίες φίλτρου και χάρτη val tweets = stream.filter {t => ετικέτες val = t. Split On Spaces .filter (_. Έναρξη με ('#')). Μετατροπή σε πεζά tags.exists {x => true}} val data = tweets.map {status => val sentiment = SentimentAnalysisUtils.detectSentiment (status.getText) val tags = status.getHashtagEntities.map (_. GetText.toLowerCase) (status.getText, sentiment.toString, tagss.toString ())} data.print () // Αποθήκευση της εξόδου μας στο ~ / με ονόματα αρχείων που ξεκινούν όπως twitters data.saveAsTextFiles ('~ / twitters', '20000') ssc. έναρξη () ssc.awaitTermination ()}}

Αποτελέσματα:

Ακολουθούν τα αποτελέσματα που εμφανίζονται στο Eclipse IDE κατά την εκτέλεση του προγράμματος Twitter Sentiment Streaming.

Φιγούρα: Έξοδος ανάλυσης συναισθημάτων στο Eclipse IDE

Όπως μπορούμε να δούμε στο στιγμιότυπο οθόνης, όλα τα tweets κατηγοριοποιούνται σε θετικά, ουδέτερα και αρνητικά σύμφωνα με το συναίσθημα του περιεχομένου των tweets.

Η έξοδος των συναισθημάτων των Tweets αποθηκεύεται σε φακέλους και αρχεία ανάλογα με το χρόνο που δημιουργήθηκαν. Αυτή η έξοδος μπορεί να αποθηκευτεί στο τοπικό σύστημα αρχείων ή HDFS, όπως απαιτείται. Ο κατάλογος εξόδου μοιάζει με αυτό:

Φιγούρα: Φάκελοι εξόδου μέσα στο φάκελο έργου «twitter»

Εδώ, μέσα στον κατάλογο twitter, μπορούμε να βρούμε τα ονόματα χρήστη των χρηστών του Twitter μαζί με τη χρονική σήμανση για κάθε tweet όπως φαίνεται παρακάτω:

Φιγούρα: Αρχείο εξόδου που περιέχει ονόματα χρήστη Twitter με χρονική σήμανση

Τώρα που έχουμε τα ονόματα χρήστη και τη χρονική σήμανση του Twitter, ας δούμε τα Sentiments και τα tweets που είναι αποθηκευμένα στον κύριο κατάλογο. Εδώ, κάθε tweet ακολουθείται από το συναίσθημα του συναισθήματος. Αυτό το συναίσθημα που αποθηκεύεται χρησιμοποιείται περαιτέρω για την ανάλυση ενός τεράστιου πλήθους πληροφοριών από εταιρείες.

Φιγούρα: Αρχείο εξόδου που περιέχει tweets με συναισθήματα

Κωδικός Tweaking:

Τώρα, ας τροποποιήσουμε λίγο τον κώδικα μας για να λάβουμε συναισθήματα για συγκεκριμένα hashtag (θέματα). Επί του παρόντος, ο Ντόναλντ Τραμπ, ο Πρόεδρος των Ηνωμένων Πολιτειών είναι δημοφιλής στα κανάλια ειδήσεων και στα διαδικτυακά κοινωνικά μέσα. Ας δούμε τα συναισθήματα που σχετίζονται με τη λέξη-κλειδί « Ατού «.

Φιγούρα: Εκτέλεση ανάλυσης συναισθημάτων σε Tweets με τη λέξη-κλειδί «Trump»

Προχωρώντας:

Όπως έχουμε δει από την επίδειξη ανάλυσης συναισθημάτων, μπορούμε να εξαγάγουμε συναισθήματα συγκεκριμένων θεμάτων, όπως κάναμε για το «Trump». Ομοίως, το Sentiment Analytics μπορεί να χρησιμοποιηθεί στη διαχείριση κρίσεων, στην προσαρμογή υπηρεσιών και στοχευόμενο μάρκετινγκ από εταιρείες σε όλο τον κόσμο.

Οι εταιρείες που χρησιμοποιούν Spark Streaming για ανάλυση συναισθημάτων έχουν εφαρμόσει την ίδια προσέγγιση για να επιτύχουν τα εξής:

  1. Βελτίωση της εμπειρίας των πελατών
  2. Κερδίζοντας ανταγωνιστικό πλεονέκτημα
  3. Απόκτηση επιχειρηματικής ευφυΐας
  4. Αναζωογόνηση μιας χαμένης μάρκας

Με αυτό, φτάσαμε στο τέλος αυτού Εκμάθηση ροής Spark ιστολόγιο. Μέχρι τώρα, πρέπει να έχετε αποκτήσει καλή κατανόηση για το τι είναι το Spark Streaming. Η περίπτωση χρήσης Twitter Sentiment Analysis θα σας δώσει την απαιτούμενη αυτοπεποίθηση για να εργαστείτε σε μελλοντικά έργα που συναντάτε στο Spark Streaming και στο Apache Spark. Η εξάσκηση είναι το κλειδί για την εξάσκηση οποιουδήποτε θέματος και ελπίζω ότι αυτό το ιστολόγιο έχει δημιουργήσει αρκετό ενδιαφέρον για να εξερευνήσετε περαιτέρω το Apache Spark.

Συνιστούμε να ακολουθήσετε το παρακάτω Tutorial Spark Streaming YouTube από το Edureka:

Ροή σπινθήρων | Παράδειγμα ανάλυσης συναισθημάτων Twitter | Έντρεκα

Αυτή η σειρά βίντεο στο Spark Tutorial παρέχει ένα πλήρες υπόβαθρο στα στοιχεία μαζί με τις περιπτώσεις χρήσης της πραγματικής ζωής όπως Ανάλυση συναισθημάτων Twitter , Ανάλυση πρόβλεψης παιχνιδιού NBA , Σύστημα Ανίχνευσης Σεισμών , Ανάλυση δεδομένων πτήσης και Συστήματα σύστασης ταινιών . Έχουμε σχεδιάσει προσωπικά τις περιπτώσεις χρήσης έτσι ώστε να παρέχουμε μια ολοκληρωμένη τεχνογνωσία σε οποιονδήποτε εκτελεί τον κώδικα.

Έχετε μια ερώτηση για εμάς; Παρακαλώ αναφέρετέ το στην ενότητα σχολίων και θα επικοινωνήσουμε μαζί σας το νωρίτερο. Αν θέλετε να μάθετε το Spark και να δημιουργήσετε μια καριέρα στον τομέα του Spark και να αποκτήσετε εξειδίκευση για να εκτελέσετε επεξεργασία δεδομένων μεγάλης κλίμακας χρησιμοποιώντας RDD, Spark Streaming, SparkSQL, MLlib, GraphX ​​και Scala με πραγματικές περιπτώσεις χρήσης, δείτε τις διαδραστικές μας Σε σύνδεση εδώ, που έρχεται με υποστήριξη 24 * 7 για να σας καθοδηγήσει καθ 'όλη τη διάρκεια της μαθησιακής σας περιόδου.