Xlera8

Εφαρμόστε αργά μεταβαλλόμενες διαστάσεις σε μια λίμνη δεδομένων χρησιμοποιώντας AWS Glue και Delta

Σε μια αποθήκη δεδομένων, α διάσταση είναι μια δομή που κατηγοριοποιεί τα γεγονότα και τα μέτρα προκειμένου να επιτρέπει στους χρήστες να απαντούν σε επιχειρηματικές ερωτήσεις. Για να δείξουμε ένα παράδειγμα, σε έναν τυπικό τομέα πωλήσεων, ο πελάτης, ο χρόνος ή το προϊόν είναι διαστάσεις και οι συναλλαγές πωλήσεων είναι γεγονός. Τα χαρακτηριστικά εντός της διάστασης μπορούν να αλλάξουν με την πάροδο του χρόνου — ένας πελάτης μπορεί να αλλάξει τη διεύθυνσή του, ένας υπάλληλος μπορεί να μετακινηθεί από θέση εργολάβου σε θέση πλήρους απασχόλησης ή ένα προϊόν μπορεί να έχει πολλές αναθεωρήσεις σε αυτό. ΕΝΑ αλλάζει σιγά σιγά διάσταση (SCD) είναι μια έννοια αποθήκευσης δεδομένων που περιέχει σχετικά στατικά δεδομένα που μπορούν να αλλάξουν αργά σε μια χρονική περίοδο. Υπάρχουν τρεις κύριοι τύποι SCD που διατηρούνται στην αποθήκευση δεδομένων: Τύπος 1 (χωρίς ιστορικό), Τύπος 2 (πλήρες ιστορικό) και Τύπος 3 (περιορισμένο ιστορικό). Η σύλληψη δεδομένων αλλαγής (CDC) είναι ένα χαρακτηριστικό μιας βάσης δεδομένων που παρέχει τη δυνατότητα αναγνώρισης των δεδομένων που άλλαξαν μεταξύ δύο φορτώσεων βάσης δεδομένων, έτσι ώστε να μπορεί να εκτελεστεί μια ενέργεια στα δεδομένα που έχουν αλλάξει.

Καθώς οργανισμοί σε όλο τον κόσμο εκσυγχρονίζουν τις πλατφόρμες δεδομένων τους με τις λίμνες δεδομένων Απλή υπηρεσία αποθήκευσης Amazon (Amazon S3), ο χειρισμός SCD σε λίμνες δεδομένων μπορεί να είναι δύσκολος. Γίνεται ακόμη πιο δύσκολο όταν τα συστήματα πηγής δεν παρέχουν μηχανισμό για τον προσδιορισμό των αλλαγμένων δεδομένων για επεξεργασία εντός της λίμνης δεδομένων και καθιστά την επεξεργασία δεδομένων εξαιρετικά περίπλοκη εάν η πηγή δεδομένων είναι ημιδομημένη αντί για βάση δεδομένων. Ο βασικός στόχος κατά τη διαχείριση των SCD τύπου 2 είναι να καθοριστούν με ακρίβεια οι ημερομηνίες έναρξης και λήξης στο σύνολο δεδομένων για την παρακολούθηση των αλλαγών στη λίμνη δεδομένων, επειδή αυτό παρέχει τη δυνατότητα αναφοράς σημείου-σε-χρόνου για τις εφαρμογές που καταναλώνουν.

Σε αυτήν την ανάρτηση, εστιάζουμε στην επίδειξη του τρόπου αναγνώρισης των αλλαγμένων δεδομένων για μια ημιδομημένη πηγή (JSON) και καταγραφής των πλήρους ιστορικών αλλαγών δεδομένων (SCD Type 2) και αποθήκευσης τους σε μια λίμνη δεδομένων S3, χρησιμοποιώντας Κόλλα AWS και μορφή ανοιχτών δεδομένων λίμνης Delta.io. Αυτή η υλοποίηση υποστηρίζει τις ακόλουθες περιπτώσεις χρήσης:

  • SCD τύπου 2 κομματιού με ημερομηνίες έναρξης και λήξης για τον εντοπισμό των τρεχόντων και πλήρεις ιστορικές εγγραφές και μια σημαία για την αναγνώριση των διαγραμμένων εγγραφών στη λίμνη δεδομένων (λογικές διαγραφές)
  • Χρησιμοποιήστε εργαλεία κατανάλωσης όπως π.χ Αμαζόν Αθηνά για την απρόσκοπτη αναζήτηση ιστορικών αρχείων

Επισκόπηση λύσεων

Αυτή η ανάρτηση δείχνει τη λύση με μια περίπτωση χρήσης από άκρο σε άκρο χρησιμοποιώντας ένα δείγμα δεδομένων εργαζομένων. Το σύνολο δεδομένων αντιπροσωπεύει τα στοιχεία των εργαζομένων, όπως ταυτότητα, όνομα, διεύθυνση, αριθμό τηλεφώνου, εργολάβο ή όχι και άλλα. Για να επιδείξετε την εφαρμογή του SCD, εξετάστε τις ακόλουθες παραδοχές:

  • Η ομάδα μηχανικής δεδομένων λαμβάνει καθημερινά αρχεία που είναι πλήρη στιγμιότυπα εγγραφών και δεν περιέχουν μηχανισμό για τον εντοπισμό αλλαγών στην εγγραφή προέλευσης
  • Η ομάδα έχει επιφορτιστεί με την εφαρμογή της λειτουργικότητας SCD Type 2 για τον εντοπισμό νέων, ενημερωμένων και διαγραμμένων εγγραφών από την πηγή και τη διατήρηση των ιστορικών αλλαγών στη λίμνη δεδομένων
  • Επειδή τα συστήματα πηγής δεν παρέχουν τη δυνατότητα CDC, πρέπει να αναπτυχθεί ένας μηχανισμός για τον εντοπισμό των νέων, ενημερωμένων και διαγραμμένων εγγραφών και τη διατήρηση τους στο επίπεδο λίμνης δεδομένων

Η αρχιτεκτονική υλοποιείται ως εξής:

  • Τα συστήματα πηγής απορροφούν αρχεία στον κάδο προσγείωσης S3 (αυτό το βήμα μιμείται δημιουργώντας τα δείγματα εγγραφών χρησιμοποιώντας το παρεχόμενο AWS Lambda λειτουργία στον κάδο προσγείωσης)
  • Μια εργασία κόλλας AWS (εργασία Delta) επιλέγει το αρχείο δεδομένων προέλευσης και επεξεργάζεται τα αλλαγμένα δεδομένα από την προηγούμενη φόρτωση αρχείου (νέα ένθετα, ενημερώσεις στις υπάρχουσες εγγραφές και διαγραμμένες εγγραφές από την πηγή) στη λίμνη δεδομένων S3 (επεξεργασμένος κάδος επιπέδων)
  • Η αρχιτεκτονική χρησιμοποιεί τη μορφή ανοιχτών δεδομένων λίμνης (Δέλτα) και δημιουργεί τη λίμνη δεδομένων S3 ως λίμνη Δέλτα, η οποία είναι μεταβλητή, επειδή οι νέες αλλαγές μπορούν να ενημερωθούν, μπορούν να προστεθούν νέα ένθετα και οι διαγραφές πηγής μπορούν να εντοπιστούν με ακρίβεια και να επισημανθούν με delete_flag αξία
  • Ένας ανιχνευτής AWS Glue καταγράφει τα δεδομένα, τα οποία μπορεί να ζητηθεί από την Athena

Το παρακάτω διάγραμμα απεικονίζει την αρχιτεκτονική μας.

Προϋποθέσεις

Πριν ξεκινήσετε, βεβαιωθείτε ότι έχετε τις ακόλουθες προϋποθέσεις:

Αναπτύξτε τη λύση

Για αυτήν τη λύση, παρέχουμε ένα πρότυπο CloudFormation που ρυθμίζει τις υπηρεσίες που περιλαμβάνονται στην αρχιτεκτονική, για να επιτρέψει επαναλαμβανόμενες αναπτύξεις. Αυτό το πρότυπο δημιουργεί τους ακόλουθους πόρους:

  • Δύο κάδοι S3: ένας κάδος προσγείωσης για την αποθήκευση δείγματος δεδομένων υπαλλήλων και ένας κάδος επεξεργασμένου στρώματος για τη λίμνη μεταβλητών δεδομένων (Λίμνη Δέλτα)
  • Μια συνάρτηση λάμδα για τη δημιουργία δειγμάτων εγγραφών
  • Μια εργασία εξαγωγής, μετασχηματισμού και φόρτωσης AWS Glue (ETL) για την επεξεργασία των δεδομένων πηγής από τον κάδο προσγείωσης στον επεξεργασμένο κάδο

Για να αναπτύξετε τη λύση, ακολουθήστε τα παρακάτω βήματα:

  1. Επιλέξτε Εκκίνηση στοίβας για να εκκινήσετε τη στοίβα CloudFormation:

  1. Εισαγάγετε ένα όνομα στοίβας.
  2. Αγορά Αναγνωρίζω ότι το AWS CloudFormation μπορεί να δημιουργήσει πόρους IAM με προσαρμοσμένα ονόματα.
  3. Επιλέξτε Δημιουργία στοίβας.

Αφού ολοκληρωθεί η ανάπτυξη της στοίβας CloudFormation, μεταβείτε στην κονσόλα AWS CloudFormation για να σημειώσετε τους ακόλουθους πόρους στο Έξοδοι καρτέλα:

  • Πόροι λίμνης δεδομένων – Οι κάδοι S3 scd-blog-landing-xxxx και scd-blog-processed-xxxx (αναφέρεται ως scd-blog-landing και scd-blog-processed στις επόμενες ενότητες αυτής της ανάρτησης)
  • Συνάρτηση λάμδα γεννήτριας δειγμάτων εγγραφών - SampleDataGenaratorLambda-<CloudFormation Stack Name> (αναφέρεται ως SampleDataGeneratorLambda)
  • Βάση δεδομένων καταλόγου δεδομένων AWS Glue - deltalake_xxxxxx (αναφέρεται ως deltalake)
  • Εργασία AWS Glue Delta - <CloudFormation-Stack-Name>-src-to-processed (αναφέρεται ως src-to-processed)

Λάβετε υπόψη ότι η ανάπτυξη της στοίβας CloudFormation στον λογαριασμό σας επιφέρει χρεώσεις χρήσης AWS.

Δοκιμή εφαρμογής SCD Τύπου 2

Με τη διαθέσιμη υποδομή, είστε έτοιμοι να δοκιμάσετε τη συνολική σχεδίαση λύσεων και να υποβάλετε ερωτήματα σε ιστορικά αρχεία από το σύνολο δεδομένων υπαλλήλων. Αυτή η ανάρτηση έχει σχεδιαστεί για να εφαρμοστεί για μια πραγματική περίπτωση χρήσης πελατών, όπου λαμβάνετε πλήρη δεδομένα στιγμιότυπου σε καθημερινή βάση. Δοκιμάζουμε τις ακόλουθες πτυχές της εφαρμογής SCD:

  • Εκτελέστε μια εργασία κόλλας AWS για το αρχικό φορτίο
  • Προσομοιώστε ένα σενάριο όπου δεν υπάρχουν αλλαγές στην πηγή
  • Προσομοίωση εισαγωγής, ενημέρωσης και διαγραφής σεναρίων προσθέτοντας νέες εγγραφές και τροποποιώντας και διαγράφοντας υπάρχουσες εγγραφές
  • Προσομοιώστε ένα σενάριο όπου η διαγραμμένη εγγραφή επιστρέφει ως νέο ένθετο

Δημιουργήστε ένα δείγμα δεδομένων εργαζομένων

Για να δοκιμάσετε τη λύση και προτού ξεκινήσετε την αρχική απορρόφηση δεδομένων, πρέπει να προσδιοριστεί η πηγή δεδομένων. Για να απλοποιηθεί αυτό το βήμα, έχει αναπτυχθεί μια συνάρτηση Lambda στη στοίβα CloudFormation που μόλις αναπτύξατε.

Ανοίξτε τη συνάρτηση και διαμορφώστε ένα δοκιμαστικό συμβάν, με την προεπιλογή hello-world συμβάν προτύπου JSON όπως φαίνεται στο παρακάτω στιγμιότυπο οθόνης. Δώστε ένα όνομα συμβάντος χωρίς αλλαγές στο πρότυπο και αποθηκεύστε το δοκιμαστικό συμβάν.

Επιλέξτε Δοκιμή για να καλέσετε ένα συμβάν δοκιμής, το οποίο καλεί τη συνάρτηση Lambda για να δημιουργήσει τα δείγματα εγγραφών.

Όταν η συνάρτηση Lambda ολοκληρώσει την επίκλησή της, θα μπορείτε να δείτε το ακόλουθο δείγμα δεδομένων εργαζομένων στον κάδο προσγείωσης.

Εκτελέστε την εργασία κόλλας AWS

Επιβεβαιώστε εάν βλέπετε το σύνολο δεδομένων υπαλλήλου στη διαδρομή s3://scd-blog-landing/dataset/employee/. Μπορείτε να κατεβάσετε το σύνολο δεδομένων και να το ανοίξετε σε ένα πρόγραμμα επεξεργασίας κώδικα όπως το VS Code. Το παρακάτω είναι ένα παράδειγμα του συνόλου δεδομένων:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas SpringsnDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson TunnelnMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert ViewsnWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce MeadowsnLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks EstatesnEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah CreeknWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes PlainnHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Κατεβάστε το σύνολο δεδομένων και διατηρήστε το έτοιμο, γιατί θα τροποποιήσετε το σύνολο δεδομένων για μελλοντικές περιπτώσεις χρήσης για να προσομοιώσετε τα ένθετα, τις ενημερώσεις και τις διαγραφές. Το δείγμα δεδομένων που δημιουργείται για εσάς θα είναι εντελώς διαφορετικό από αυτό που βλέπετε στο προηγούμενο παράδειγμα.

Για να εκτελέσετε την εργασία, ολοκληρώστε τα παρακάτω βήματα:

  1. Στην κονσόλα κόλλας AWS, επιλέξτε Θέσεις εργασίας στο παράθυρο πλοήγησης.
  2. Επιλέξτε τη δουλειά src-to-processed.
  3. Στις Τρέχει καρτέλα, επιλέξτε τρέξιμο.

Όταν η εργασία AWS Glue εκτελείται για πρώτη φορά, η εργασία διαβάζει το σύνολο δεδομένων υπαλλήλου από τη διαδρομή του κάδου προσγείωσης και απορροφά τα δεδομένα στον επεξεργασμένο κάδο ως πίνακας Delta.

Όταν ολοκληρωθεί η εργασία, μπορείτε να δημιουργήσετε ένα πρόγραμμα ανίχνευσης για να δείτε την αρχική φόρτωση δεδομένων. Το ακόλουθο στιγμιότυπο οθόνης δείχνει τη βάση δεδομένων που είναι διαθέσιμη στο Βάσεις Δεδομένων .

  1. Επιλέξτε Ανιχνευτές στο παράθυρο πλοήγησης.
  2. Επιλέξτε Δημιουργία ανιχνευτή.

  1. Ονομάστε τον ανιχνευτή σας delta-lake-crawler, κατόπιν επιλέξτε Επόμενο.

  1. Αγορά Οχι ακόμα για δεδομένα που έχουν ήδη αντιστοιχιστεί σε πίνακες AWS Glue.
  2. Επιλέξτε Προσθέστε μια πηγή δεδομένων.

  1. Στις Πηγή δεδομένων αναπτυσσόμενο μενού, επιλέξτε Λίμνη Δέλτα.
  2. Εισαγάγετε τη διαδρομή προς τον πίνακα Delta.
  3. Αγορά Δημιουργήστε εγγενείς πίνακες.
  4. Επιλέξτε Προσθέστε μια πηγή δεδομένων Delta Lake.

  1. Επιλέξτε Επόμενο.

  1. Επιλέξτε τον ρόλο που δημιουργήθηκε από το πρότυπο CloudFormation και, στη συνέχεια, επιλέξτε Επόμενο.

  1. Επιλέξτε τη βάση δεδομένων που δημιουργήθηκε από το πρότυπο CloudFormation και, στη συνέχεια, επιλέξτε Επόμενο.

  1. Επιλέξτε Δημιουργία ανιχνευτή.

  1. Επιλέξτε το πρόγραμμα ανίχνευσης και επιλέξτε τρέξιμο.

Ρωτήστε τα δεδομένα

Αφού ολοκληρωθεί ο ανιχνευτής, μπορείτε να δείτε τον πίνακα που δημιούργησε.

Για να αναζητήσετε τα δεδομένα, ολοκληρώστε τα παρακάτω βήματα:

  1. Επιλέξτε τον πίνακα εργαζομένων και στο Δράσεις μενού, επιλέξτε Προβολή δεδομένων.

Ανακατευθυνθείτε στην κονσόλα Athena. Εάν δεν έχετε τον πιο πρόσφατο κινητήρα Athena, δημιουργήστε μια νέα ομάδα εργασίας Athena με τον πιο πρόσφατο κινητήρα Athena.

  1. Κάτω από Διαχείριση στο παράθυρο πλοήγησης, επιλέξτε Ομάδες εργασίας.

  1. Επιλέξτε Δημιουργία ομάδας εργασίας.

  1. Δώστε ένα όνομα για την ομάδα εργασίας, όπως π.χ DeltaWorkgroup.
  2. Αγορά Athena SQL ως κινητήρας και επιλέξτε κινητήρας Athena έκδοση 3 for Έκδοση μηχανής ερωτήματος.

  1. Επιλέξτε Δημιουργία ομάδας εργασίας.

  1. Αφού δημιουργήσετε την ομάδα εργασίας, επιλέξτε την ομάδα εργασίας (DeltaWorkgroup) στο αναπτυσσόμενο μενού στο πρόγραμμα επεξεργασίας ερωτημάτων Athena.

  1. Εκτελέστε το ακόλουθο ερώτημα στο employee τραπέζι:
SELECT * FROM "deltalake_2438fbd0"."employee";

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από τις εξόδους CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

Μπορείτε να παρατηρήσετε ότι το employee ο πίνακας έχει 25 εγγραφές. Το ακόλουθο στιγμιότυπο οθόνης δείχνει το σύνολο των εγγραφών των εργαζομένων με ορισμένα δείγματα εγγραφών.

Ο πίνακας Delta αποθηκεύεται με ένα emp_key, το οποίο είναι μοναδικό για κάθε αλλαγή και χρησιμοποιείται για την παρακολούθηση των αλλαγών. ο emp_key δημιουργείται για κάθε εισαγωγή, ενημέρωση και διαγραφή και μπορεί να χρησιμοποιηθεί για την εύρεση όλων των αλλαγών που σχετίζονται με ένα μεμονωμένο emp_id.

Η emp_key δημιουργείται χρησιμοποιώντας τον αλγόριθμο κατακερματισμού SHA256, όπως φαίνεται στον ακόλουθο κώδικα:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"), col("phone_number"), col("isContractor")), 256))

Εκτελέστε εισαγωγές, ενημερώσεις και διαγραφές

Πριν κάνετε αλλαγές στο σύνολο δεδομένων, ας εκτελέσουμε την ίδια εργασία άλλη μια φορά. Υποθέτοντας ότι το τρέχον φορτίο από την πηγή είναι το ίδιο με το αρχικό φορτίο χωρίς αλλαγές, η εργασία AWS Glue δεν θα πρέπει να κάνει αλλαγές στο σύνολο δεδομένων. Αφού ολοκληρωθεί η εργασία, εκτελέστε την προηγούμενη Select κάντε ερώτημα στο πρόγραμμα επεξεργασίας ερωτημάτων Athena και επιβεβαιώστε ότι υπάρχουν ακόμα 25 ενεργές εγγραφές με τις ακόλουθες τιμές:

  • Και οι 25 εγγραφές με τη στήλη isCurrent=true
  • Και οι 25 εγγραφές με τη στήλη end_date=Null
  • Και οι 25 εγγραφές με τη στήλη delete_flag=false

Αφού επιβεβαιώσετε την προηγούμενη εργασία που εκτελέστηκε με αυτές τις τιμές, ας τροποποιήσουμε το αρχικό μας σύνολο δεδομένων με τις ακόλουθες αλλαγές:

  1. Αλλαξε το isContractor σημαία προς false (αλλάξτε το σε true εάν το σύνολο δεδομένων σας εμφανίζεται ήδη false) Για emp_id=12.
  2. Διαγράψτε ολόκληρη τη σειρά όπου emp_id=8 (Φροντίστε να αποθηκεύσετε την εγγραφή σε ένα πρόγραμμα επεξεργασίας κειμένου, επειδή χρησιμοποιούμε αυτήν την εγγραφή σε άλλη περίπτωση χρήσης).
  3. Αντιγράψτε τη σειρά για emp_id=25 και εισάγετε μια νέα σειρά. Αλλαξε το emp_id να είναι 26, και φροντίστε να αλλάξετε τις τιμές και για άλλες στήλες.

Αφού κάνουμε αυτές τις αλλαγές, το σύνολο δεδομένων πηγής υπαλλήλου μοιάζει με τον ακόλουθο κώδικα (για αναγνωσιμότητα, έχουμε συμπεριλάβει μόνο τις αλλαγμένες εγγραφές όπως περιγράφεται στα τρία προηγούμενα βήματα):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce MeadowsnLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes PlainnHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}

  1. Τώρα, ανεβάστε το αλλαγμένο fake_emp_data.json αρχείο στο ίδιο πρόθεμα πηγής.

  1. Αφού ανεβάσετε το αλλαγμένο σύνολο δεδομένων υπαλλήλων στο Amazon S3, μεταβείτε στην κονσόλα AWS Glue και εκτελέστε την εργασία.
  2. Όταν ολοκληρωθεί η εργασία, εκτελέστε το ακόλουθο ερώτημα στο πρόγραμμα επεξεργασίας ερωτημάτων Athena και επιβεβαιώστε ότι υπάρχουν συνολικά 27 εγγραφές με τις ακόλουθες τιμές:
SELECT * FROM "deltalake_2438fbd0"."employee";

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

  1. Εκτελέστε ένα άλλο ερώτημα στο πρόγραμμα επεξεργασίας ερωτημάτων Athena και επιβεβαιώστε ότι επιστράφηκαν 4 εγγραφές με τις ακόλουθες τιμές:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

Θα δείτε δύο εγγραφές για emp_id=12:

  • Ένας emp_id=12 εγγραφή με τις ακόλουθες τιμές (για την εγγραφή που απορροφήθηκε ως μέρος του αρχικού φορτίου):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • Ενα δεύτερο emp_id=12 εγγραφή με τις ακόλουθες τιμές (για την εγγραφή που απορροφήθηκε ως μέρος της αλλαγής στην πηγή):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (ή κενή συμβολοσειρά)

Το ρεκόρ για emp_id=8 που διαγράφηκε στην πηγή ως μέρος αυτής της εκτέλεσης θα εξακολουθεί να υπάρχει, αλλά με τις ακόλουθες αλλαγές στις τιμές:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

Η νέα εγγραφή υπαλλήλου θα εισαχθεί με τις ακόλουθες τιμές:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (ή κενή συμβολοσειρά)
  • delete_flag=false

Σημειώστε ότι το emp_key Οι τιμές στον πραγματικό σας πίνακα μπορεί να είναι διαφορετικές από αυτές που παρέχονται εδώ ως παράδειγμα.

  1. Για τις διαγραφές, ελέγχουμε το emp_id από τον βασικό πίνακα μαζί με το νέο αρχείο προέλευσης και την εσωτερική ένωση του emp_key.
  2. Εάν η συνθήκη αξιολογηθεί ως αληθής, τότε ελέγχουμε εάν ο πίνακας βάσης υπαλλήλου emp_key ισούται με τις νέες ενημερώσεις emp_key και λαμβάνουμε την τρέχουσα, μη διαγραμμένη εγγραφή (isCurrent=true και delete_flag=false).
  3. Συγχωνεύουμε τις αλλαγές διαγραφής από το νέο αρχείο με τον βασικό πίνακα για όλες τις αντίστοιχες σειρές συνθηκών διαγραφής και ενημερώνουμε τα ακόλουθα:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

Δείτε τον ακόλουθο κώδικα:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true" base_tbl.alias("employee") .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond) .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false", "end_date": current_date(), "delete_flag": "true"}).execute()

  1. Τόσο για τις ενημερώσεις όσο και για τα ένθετα, ελέγχουμε την κατάσταση εάν ο βασικός πίνακας employee.emp_id είναι ίσο με το new changes.emp_id και την employee.emp_key είναι ίσο με new changes.emp_key, ενώ ανακτά μόνο τις τρέχουσες εγγραφές.
  2. Εάν αυτή η συνθήκη αξιολογηθεί σε true, τότε παίρνουμε την τρέχουσα εγγραφή (isCurrent=true και delete_flag=false).
  3. Συγχωνεύουμε τις αλλαγές ενημερώνοντας τα ακόλουθα:
    1. Εάν η δεύτερη συνθήκη αξιολογείται σε true:
      1. isCurrent=false
      2. end_date=current_date
    2. Ή εισάγουμε ολόκληρη τη σειρά ως εξής, εάν η δεύτερη συνθήκη αξιολογείται ως false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (ή κενή συμβολοσειρά)
      10. isCurrent=true
      11. delete_flag=false

Δείτε τον ακόλουθο κώδικα:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false" base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond) .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false", "end_date": current_date() }) .whenNotMatchedInsert( values={ "isCurrent": "true", "emp_id": "employeeUpdates.emp_id", "first_name": "employeeUpdates.first_name", "last_name": "employeeUpdates.last_name", "Address": "employeeUpdates.Address", "phone_number": "employeeUpdates.phone_number", "isContractor": "employeeUpdates.isContractor", "emp_key": "employeeUpdates.emp_key", "start_date": current_date(), "delete_flag": "employeeUpdates.delete_flag", "end_date": "null" }) .execute()

Ως τελευταίο βήμα, ας επαναφέρουμε τη διαγραμμένη εγγραφή από την προηγούμενη αλλαγή στο σύνολο δεδομένων προέλευσης και ας δούμε πώς εισάγεται ξανά στο employee πίνακα στη λίμνη δεδομένων και παρατηρήστε πώς διατηρείται το πλήρες ιστορικό.

Ας τροποποιήσουμε το αλλαγμένο σύνολο δεδομένων μας από το προηγούμενο βήμα και ας κάνουμε τις ακόλουθες αλλαγές.

  1. Προσθέστε τα διαγραμμένα emp_id=8 πίσω στο σύνολο δεδομένων.

Μετά την πραγματοποίηση αυτών των αλλαγών, το σύνολο δεδομένων πηγής των υπαλλήλων μου μοιάζει με τον ακόλουθο κώδικα (για αναγνωσιμότητα, έχουμε συμπεριλάβει μόνο την προστιθέμενη εγγραφή όπως περιγράφεται στο προηγούμενο βήμα):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Μεταφορτώστε το αλλαγμένο αρχείο δεδομένων υπαλλήλου στο ίδιο πρόθεμα πηγής.
  2. Αφού ανεβάσετε το αλλαγμένο fake_emp_data.json δεδομένων στο Amazon S3, μεταβείτε στην κονσόλα AWS Glue και εκτελέστε ξανά την εργασία.
  3. Όταν ολοκληρωθεί η εργασία, εκτελέστε το ακόλουθο ερώτημα στο πρόγραμμα επεξεργασίας ερωτημάτων Athena και επιβεβαιώστε ότι υπάρχουν συνολικά 28 εγγραφές με τις ακόλουθες τιμές:
SELECT * FROM "deltalake_2438fbd0"."employee";

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

  1. Εκτελέστε το ακόλουθο ερώτημα και επιβεβαιώστε ότι υπάρχουν 5 εγγραφές:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

Θα δείτε δύο εγγραφές για emp_id=8:

  • Ένας emp_id=8 εγγραφή με τις ακόλουθες τιμές (η παλιά εγγραφή που διαγράφηκε):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Άλλος emp_id=8 εγγραφή με τις ακόλουθες τιμές (η νέα εγγραφή που εισήχθη στην τελευταία εκτέλεση):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (ή κενή συμβολοσειρά)

Η emp_key Οι τιμές στον πραγματικό σας πίνακα μπορεί να είναι διαφορετικές από αυτές που παρέχονται εδώ ως παράδειγμα. Σημειώστε επίσης ότι επειδή αυτή είναι η ίδια διαγραμμένη εγγραφή που εισήχθη ξανά στην επόμενη φόρτωση χωρίς αλλαγές, δεν θα υπάρξει καμία αλλαγή στο emp_key.

Δείγματα ερωτημάτων τελικού χρήστη

Ακολουθούν ορισμένα δείγματα ερωτημάτων τελικού χρήστη για να δείξουν πώς μπορεί να διασχιστεί το ιστορικό δεδομένων αλλαγών του υπαλλήλου για αναφορά:

  • Ερώτημα 1 – Ανακτήστε μια λίστα με όλους τους υπαλλήλους που αποχώρησαν από τον οργανισμό τον τρέχοντα μήνα (για παράδειγμα, Μάρτιος 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

Το προηγούμενο ερώτημα θα επέστρεφε δύο αρχεία υπαλλήλων που αποχώρησαν από τον οργανισμό.

  • Ερώτημα 2 – Ανακτήστε μια λίστα νέων υπαλλήλων που εντάχθηκαν στον οργανισμό τον τρέχοντα μήνα (για παράδειγμα, Μάρτιος 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

Το προηγούμενο ερώτημα θα επέστρεφε 23 αρχεία ενεργών υπαλλήλων που εντάχθηκαν στον οργανισμό.

  • Ερώτημα 3 – Βρείτε το ιστορικό οποιουδήποτε συγκεκριμένου υπαλλήλου στον οργανισμό (στην περίπτωση αυτή υπάλληλος 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Σημείωση: Ενημερώστε το σωστό όνομα βάσης δεδομένων από την έξοδο CloudFormation πριν εκτελέσετε το παραπάνω ερώτημα.

Στο προηγούμενο ερώτημα, μπορούμε να παρατηρήσουμε ότι ο υπάλληλος 18 είχε δύο αλλαγές στα αρχεία των υπαλλήλων του πριν φύγει από τον οργανισμό.

Σημειώστε ότι τα αποτελέσματα δεδομένων που παρέχονται σε αυτό το παράδειγμα είναι διαφορετικά από αυτά που θα δείτε στις συγκεκριμένες εγγραφές σας με βάση τα δείγματα δεδομένων που δημιουργούνται από τη συνάρτηση Lambda.

εκκαθάριση

Όταν ολοκληρώσετε τον πειραματισμό με αυτήν τη λύση, καθαρίστε τους πόρους σας, για να αποτρέψετε την επιβολή χρεώσεων AWS:

  1. Αδειάστε τους κάδους S3.
  2. Διαγράψτε τη στοίβα από την κονσόλα AWS CloudFormation.

Συμπέρασμα

Σε αυτήν την ανάρτηση, δείξαμε πώς να αναγνωρίζουμε τα αλλαγμένα δεδομένα για μια ημιδομημένη πηγή δεδομένων και να διατηρούμε τις ιστορικές αλλαγές (SCD Type 2) σε μια λίμνη δέλτα S3, όταν τα συστήματα πηγής δεν είναι σε θέση να παρέχουν τη δυνατότητα λήψης δεδομένων αλλαγής, με AWS Κόλλα. Μπορείτε να επεκτείνετε περαιτέρω αυτήν τη λύση για να επιτρέψετε στις μεταγενέστερες εφαρμογές να δημιουργήσουν πρόσθετες προσαρμογές από δεδομένα CDC που καταγράφονται στη λίμνη δεδομένων.

Επιπλέον, μπορείτε να επεκτείνετε αυτήν τη λύση ως μέρος μιας ενορχήστρωσης χρησιμοποιώντας Λειτουργίες βημάτων AWS ή άλλους κοινώς χρησιμοποιούμενους ενορχηστρωτές με τους οποίους γνωρίζει ο οργανισμός σας. Μπορείτε επίσης να επεκτείνετε αυτήν τη λύση προσθέτοντας διαμερίσματα όπου χρειάζεται. Μπορείτε επίσης να διατηρήσετε τον πίνακα δέλτα με συμπίεση τα μικρά αρχεία.


Σχετικά με τους συγγραφείς

Νιθ Γκοβιντάσιβαν, είναι αρχιτέκτονας Data Lake με τις Επαγγελματικές Υπηρεσίες AWS, όπου βοηθά τους πελάτες να ενσωματωθούν στο ταξίδι τους στη σύγχρονη αρχιτεκτονική δεδομένων μέσω της εφαρμογής λύσεων Big Data & Analytics. Εκτός δουλειάς, ο Nith είναι φανατικός θαυμαστής του κρίκετ, παρακολουθεί σχεδόν οποιοδήποτε κρίκετ στον ελεύθερο χρόνο του και απολαμβάνει μεγάλες διαδρομές και ταξιδεύει διεθνώς.

Vijay Velpula είναι αρχιτέκτονας δεδομένων με AWS Professional Services. Βοηθά τους πελάτες να εφαρμόσουν Big Data και Analytics Solutions. Εκτός δουλειάς, του αρέσει να περνά χρόνο με την οικογένεια, τα ταξίδια, την πεζοπορία και την ποδηλασία.

Sriharsh Adari είναι Senior Solutions Architect στο Amazon Web Services (AWS), όπου βοηθά τους πελάτες να εργαστούν αντίστροφα από τα επιχειρηματικά αποτελέσματα για να αναπτύξουν καινοτόμες λύσεις στο AWS. Με τα χρόνια, έχει βοηθήσει πολλούς πελάτες σε μετασχηματισμούς πλατφόρμας δεδομένων σε κάθε κλάδο. Ο βασικός τομέας εξειδίκευσής του περιλαμβάνει Τεχνολογική Στρατηγική, Ανάλυση Δεδομένων και Επιστήμη Δεδομένων. Στον ελεύθερο χρόνο του, του αρέσει να παίζει αθλήματα, να παρακολουθεί τηλεοπτικές εκπομπές και να παίζει Tabla.

Συνομιλία με μας

Γεια σου! Πώς μπορώ να σε βοηθήσω?