Data Warehousing with AWS
Project Overview
Sparkify, a music streaming startup, has experienced significant growth in its user base and song database. To leverage this data for insights, they aim to transition their processes and data to the cloud. The existing data resides in Amazon S3, consisting of JSON logs on user activity and JSON metadata about the songs in their app.
The objective of this project is to build an ETL (Extract, Transform, Load) pipeline that extracts data from S3, stages it in AWS Redshift, and transforms it into a set of dimensional tables for the analytics team to derive insights into user listening patterns.
Technical Architecture
Project Structure
data_warehousing_aws/ │ ├── create_tables.py # Database and table creation script ├── etl.py # ETL pipeline script ├── sql_queries.py # SQL queries for CRUD operations ├── create_cluster.ipynb # Cloud resources setup and cleanup ├── dwh.cfg # Credentials for cloud resources └── README.md # Project documentation
Core Components
• create_tables.py
- • Establishes a connection to the Redshift database.
- • Drops existing tables to ensure a fresh start.
- • Creates new tables using schema definitions.
- • Acts as a reset script for testing.
• etl.py
- • Implements the ETL pipeline.
- • Reads and processes files from
song_data
andlog_data
. - • Transforms JSON data into appropriate formats.
- • Loads data into Redshift tables.
• sql_queries.py
- • Contains all SQL queries used throughout the project.
- • Includes CREATE, DROP, INSERT, and SELECT statements.
- • Centralizes query management for maintainability.
Database Schema Design
The database uses a star schema optimized for song play analysis. This design prioritizes denormalization and simplifies queries while maintaining data integrity.
Table Overview
Table | Description | ||
---|---|---|---|
staging_events | Staging table for events data | ||
staging_songs | Staging table for songs data | ||
songplays | Table for the songs played | ||
users | Table for the user data | ||
songs | Table for the songs data | ||
artists | Table for the artists data | ||
time | Table for time-related data |
Fact Table
songplays - Records in event data associated with song plays.
CREATE TABLE IF NOT EXISTS songplays ( songplay_id SERIAL PRIMARY KEY, start_time timestamp NOT NULL, user_id int NOT NULL, level varchar, song_id varchar, artist_id varchar, session_id int, location varchar, user_agent varchar, FOREIGN KEY (user_id) REFERENCES users (user_id), FOREIGN KEY (song_id) REFERENCES songs (song_id), FOREIGN KEY (artist_id) REFERENCES artists (artist_id), FOREIGN KEY (start_time) REFERENCES time (start_time) );
Dimension Tables
users - User information.
CREATE TABLE IF NOT EXISTS users ( user_id int PRIMARY KEY, first_name varchar, last_name varchar, gender varchar, level varchar );
songs - Song metadata.
CREATE TABLE IF NOT EXISTS songs ( song_id varchar PRIMARY KEY, title varchar, artist_id varchar, year int, duration float );
artists - Artist information.
CREATE TABLE IF NOT EXISTS artists ( artist_id varchar PRIMARY KEY, name varchar, location varchar, latitude float, longitude float );
time - Timestamps of records broken down.
CREATE TABLE IF NOT EXISTS time ( start_time timestamp PRIMARY KEY, hour int, day int, week int, month int, year int, weekday int );
ETL Pipeline Implementation
1. Song Data Processing
The ETL pipeline first processes song data from JSON files structured like:
{ "num_songs": 1, "artist_id": "ARD7TVE1187B99BFB1", "artist_latitude": null, "artist_longitude": null, "artist_location": "California - LA", "artist_name": "Casual", "song_id": "SOMZWCG12A8C13C480", "title": "I Didn't Mean To", "duration": 218.93179, "year": 0 }
2. Log Data Processing
Next, it processes user activity logs:
{ "artist": "Muse", "auth": "Logged In", "firstName": "Jordan", "gender": "F", "itemInSession": 3, "lastName": "Hicks", "length": 259.26485, "level": "free", "location": "Salinas, CA", "method": "PUT", "page": "NextSong", "registration": 1540008898796.0, "sessionId": 814, "song": "Supermassive Black Hole", "status": 200, "ts": 1543190563796, "userAgent": "Mozilla/5.0", "userId": "37" }
3. Loading Data into Redshift
The COPY
command is a powerful feature of Amazon Redshift that allows for efficient loading of large datasets from Amazon S3 into Redshift tables. It is optimized for high throughput and can load data in parallel, making it suitable for big data applications.
Example of the COPY Command
COPY songplays FROM 's3://your-bucket/songplays/' IAM_ROLE 'arn:aws:iam::your-iam-role' FORMAT AS JSON 'auto';
- a. FROM: Specifies the S3 bucket and path where the data files are located.
- b. IAM_ROLE: The IAM role that has permission to access the S3 bucket.
- c. FORMAT AS JSON: Indicates the format of the data being loaded. In this case, it is JSON.
Redshift Capabilities
Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. It allows you to run complex queries and perform analytics on large datasets quickly. Key capabilities include:
- a. Scalability: Easily scale your data warehouse up or down based on your needs.
- b. Performance: Redshift uses columnar storage and data compression to improve query performance.
- c. Integration: Seamlessly integrates with various AWS services, including S3, for data storage and retrieval.
- d. Security: Provides robust security features, including encryption and IAM roles for access control.
Example Queries and Results
1. Most Active Users
SELECT u.first_name, u.last_name, COUNT(*) as play_count FROM songplays sp JOIN users u ON sp.user_id = u.user_id GROUP BY u.user_id, u.first_name, u.last_name ORDER BY play_count DESC LIMIT 5;
2. Popular Music Hours
SELECT EXTRACT(HOUR FROM start_time) AS hour, COUNT(*) as play_count FROM songplays GROUP BY hour ORDER BY play_count DESC;
How to Run
To set up and run the project, follow these steps:
1. Set Up AWS Credentials:
- Create an IAM role with permissions to access S3 and Redshift. - Note the IAM role ARN for use in the `COPY` command.
2. Create and Activate a Python Virtual Environment:
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
3. Install Required Packages:
pip install -r requirements.txt
4. Configure AWS Credentials:
Create a dwh.cfg
file with your AWS credentials and Redshift cluster details. The file should look like this:
[AWS] KEY=your_aws_access_key SECRET=your_aws_secret_key [CLUSTER] HOST=your_redshift_cluster_endpoint DB_NAME=your_database_name DB_USER=your_database_user DB_PASSWORD=your_database_password PORT=5439
5. Run create_tables.py to Set Up the Database and Tables:
python create_tables.py
6. Run the ETL Pipeline to Process and Load Data:
python etl.py
Key Achievements
- a. Designed an optimized star schema for efficient querying of music streaming data.
- b. Built a robust ETL pipeline that successfully processes and transforms JSON data.
- c. Implemented data validation and quality checks throughout the pipeline.
- d. Created a queryable database that enables complex analysis of user listening patterns.
- e. Achieved efficient data loading with minimal duplicate records.
- f. Implemented error handling and logging for pipeline monitoring.
Technologies Used
• Python 3.7+
- • pandas for data manipulation
- • psycopg2 for PostgreSQL connection
- • json for parsing JSON files
• AWS Redshift
- • Columnar storage for efficient data retrieval
- • Scalability for handling large datasets
• SQL
- • DDL for schema definition
- • DML for data manipulation
- • Complex joins and aggregations
Future Improvements
- a. Add data quality checks and constraints.
- b. Implement incremental loading.
- c. Add indexing for performance optimization.
- d. Create an automated testing suite.
- e. Implement logging and monitoring.
- f. Add a data visualization dashboard.
Conclusion
This project successfully demonstrates the process of building a data warehousing solution on AWS for Sparkify. By leveraging AWS Redshift and an efficient ETL pipeline, the analytics team can now easily query and analyze user activity data, leading to valuable insights into user behavior and song popularity.
The complete implementation can be found in the GitHub repository.