Apache Spark και Amazon S3 - Gotchas και βέλτιστες πρακτικές

Το S3 είναι ένα αντικείμενο αποθήκευσης και όχι ένα σύστημα αρχείων, εξ ου και τα ζητήματα που προκύπτουν από ενδεχόμενη συνέπεια, οι μη ατομικές μετονομασίες πρέπει να αντιμετωπίζονται στον κώδικα εφαρμογής. Ο κατάλογος διακομιστή σε ένα σύστημα αρχείων έχει αντικατασταθεί από αλγόριθμο κατακερματισμού του ονόματος αρχείου. Αυτό είναι κακό για την καταχώρηση των πραγμάτων, τις λειτουργίες καταλόγων, τη διαγραφή και τη μετονομασία (αντιγραφή και διαγραφή, καθώς τεχνικά δεν υπάρχει μετονομασία σε καταστήματα αντικειμένων)

Ξεκινήστε τη χρήση του S3A (σχήμα URI: s3a: //) - Hadoop 2.7+. Το S3a είναι το συνιστώμενο S3 Client για το Hadoop 2.7 και αργότερα το S3a είναι πιο αποδοτικό και υποστηρίζει μεγαλύτερα αρχεία (upto 5TB) και έχει υποστήριξη για upload multipart. Όλα τα αντικείμενα που είναι προσβάσιμα από το s3n: // URL θα πρέπει επίσης να είναι προσβάσιμα από το s3a απλώς αντικαθιστώντας το σχήμα URL. Οι περισσότερες αναφορές σφαλμάτων κατά του S3N θα κλείσουν ως WONTFIX

Δημιουργία Spark 2.0.1 με S3a Για Spark 2.0.1 χρησιμοποιήστε hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar στο ταξικό σας ταξίδι. μην ξεχάσετε να ενημερώσετε το spark-default.conf με τα πλήκτρα AWS και το S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Χρησιμοποιήστε σίγουρα το Dataframes ως αναδιάταξη ερωτήσεων και πιέστε κάτω από το κουτί και είναι έτσι διαθέσιμα λιγότερα δεδομένα και τελικά επιταχύνετε τα ερωτήματά σας

Εάν διαβάζετε τα ίδια δεδομένα πολλές φορές, δοκιμάστε να χρησιμοποιήσετε το .cache ή το s3distcp για να μεταφέρετε τα αρχεία στο τοπικό σας σύμπλεγμα EMR για να επωφεληθείτε από την καλύτερη απόδοση ανάγνωσης αρχείων ενός πραγματικού συστήματος αρχείων. Η επιλογή groupBy του s3distcp είναι μια μεγάλη επιλογή για την επίλυση του προβλήματος του μικρού αρχείου, με τη συγχώνευση μεγάλου αριθμού μικρών αρχείων.

Αυτό με φέρνει στο θέμα της ανάγνωσης μεγάλου αριθμού μικρών αρχείων. Εάν η συγχώνευση των αρχείων με χρήση ενός εργαλείου δεν είναι επιλογή, δοκιμάστε τον ακόλουθο κώδικα ο οποίος λειτουργεί αποτελεσματικά γύρω από τη συμφόρηση του slow directory listing S3

εισαγωγή com.amazonaws.services.s3._, model._
    εισαγωγή com.amazonaws.auth.BasicAWSCredentials

    request request = new ListObjectsRequest ()
    request.setBucketName (κάδος)
    request.setPrefix (πρόθεμα)
    request.setMaxKeys (σελίδαLength)
    def s3 = νέο AmazonS3Client (νέο BasicAWSCredentials (κλειδί, μυστικό))

    val objs = s3.listObjects (αίτημα) // Σημειώστε ότι αυτή η μέθοδος επιστρέφει τα περικομμένα δεδομένα εάν είναι μεγαλύτερο από το "pageLength" παραπάνω. Ίσως χρειαστεί να το αντιμετωπίσετε.
    sc.parallelize (objs.getObjectSummaries.map (_. getKey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (κουβά, κλειδί) .getObjectContent: InputStream) .getLines}

Βεβαιωθείτε ότι η επιλογή spark.sql.parquet.filterPushdown είναι αληθής και το spark.sql.parquet.mergeSchema είναι ψευδές (για να αποφύγετε συγχώνευση σχήματος κατά τη διάρκεια των εγγραφών που πραγματικά επιβραδύνει να γράφετε στάδιο). Ευτυχώς το Spark 2.0 έχει τη σωστή προεπιλογή

Έχετε αναρωτηθεί γιατί ακριβώς τη στιγμή που μια εργασία είναι έτοιμη να ολοκληρωθεί, τίποτα δεν γράφεται στα αρχεία καταγραφής και όλες οι εργασίες σπινθήρων φαίνεται ότι έχουν σταματήσει αλλά τα αποτελέσματα δεν είναι ακόμα στον κατάλογο εξόδου του S3 ... τι συμβαίνει; Καλά κάθε φορά που οι εκτελεστές γράφουν το αποτέλεσμα της δουλειάς, κάθε ένας από αυτούς γράφει σε έναν προσωρινό κατάλογο εκτός του κεντρικού καταλόγου όπου έπρεπε να γραφτούν τα αρχεία και αφού εκτελεστούν όλοι οι εκτελεστές, γίνεται μια μετονομασία για να επιτευχθεί ατομική αποκλειστικότητα. Αυτό είναι όλα ωραία σε ένα τυποποιημένο σύστημα αρχείων, όπως το hdfs όπου οι μετοχές είναι στιγμιαίες, αλλά σε ένα κατάστημα αντικειμένων όπως το S3, αυτό δεν είναι ευνοϊκό καθώς οι μετονομασίες σε S3 γίνονται στα 6MB / s.

Αν είναι δυνατόν, γράψτε την έξοδο των εργασιών στο EMR hdfs (για να επωφεληθείτε από τις σχεδόν στιγμιαίες μετονομασίες και το καλύτερο αρχείο IO των τοπικών hdfs) και προσθέστε ένα βήμα dstcp για να μετακινήσετε τα αρχεία στο S3, για να εξοικονομήσετε όλα τα προβλήματα χειρισμού των εσωτερικών ένα κατάστημα αντικειμένων που προσπαθεί να είναι ένα σύστημα αρχείων. Επίσης, η εγγραφή στα τοπικά hdfs θα σας επιτρέψει να ενεργοποιήσετε την κερδοσκοπία για να ελέγξετε τις εκλεπτυσμένες εργασίες χωρίς να πέσετε στις παγίδες αδιεξόδου που σχετίζονται με το DirectOutputCommiter.

Εάν πρέπει να χρησιμοποιήσετε τον S3 ως τον κατάλογο εξόδου, βεβαιωθείτε ότι έχουν ρυθμιστεί οι ακόλουθες ρυθμίσεις Spark

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Σημείωση: Το DirectParquetOutputCommitter αφαιρείται από το Spark 2.0 λόγω της πιθανότητας απώλειας δεδομένων. Δυστυχώς έως ότου έχουμε βελτιώσει τη συνέπεια από το S3a, πρέπει να εργαστούμε με τους εναλλακτικούς τρόπους αντιμετώπισης. Τα πράγματα βελτιώνονται με το Hadoop 2.8

Αποφύγετε τα ονόματα κλειδιών με λεξικογραφική σειρά. Κάποιος θα μπορούσε να χρησιμοποιήσει hashing / τυχαία προθέματα ή αντίστροφη ημερομηνία-χρόνο για να get.The γύρω τέχνασμα είναι να ονομάσετε τα κλειδιά σας ιεραρχικά, θέτοντας τα πιο συνηθισμένα πράγματα που φιλτράρετε από την αριστερή πλευρά του κλειδιού σας. Και ποτέ δεν έχουν υπογράμμιση στα ονόματα κουτιών λόγω των ζητημάτων DNS.

Ενεργοποιώντας το fs.s3a.fast.upload να φορτώνει παράλληλα τμήματα ενός αρχείου στο Amazon S3

Λοιπόν αυτό ήταν το χωριό των εγκεφάλων των ζητημάτων στην παραγωγή που έχω λύσει πρόσφατα για να κάνει το Spark να δουλέψει με το S3. Μείνετε συντονισμένοι για περισσότερα σχετικά με αυτό καθώς σκάβω βαθύτερα στην επόμενη θέση ...