This repository presents a ETL pipeline solution based on Amazon Reviews data. The purpose of it is mainly for practice and showcasing data engineering approach for data transformation. The pipeline outputs an insight about a top 5 rated categories for Amazon products for each month in the time frame of 1994 to 2014.
Data pipeline implementation for processing Amazon product reviews using SNAP dataset (2018):
- Reviews source;
- Metadata source;
- Source: Stanford SNAP Project (http://jmcauley.ucsd.edu/data/amazon/)
This pipeline performs ETL operations for category-based review analysis, focusing on top product categories and their performance metrics.
/airflow/- Airflow workflow orchestration/dags/- DAG files for data pipeline steps/plugins/- Custom plugins and utilities/source_data/- Local storage for processed data
/dbt/- dbt models and transformations/images/- images folder with screenshotsDockerfile- Airflow webserver container definitiondocker-compose.yml- Multi-container orchestrationentrypoint.sh- Initialization script for Airflowrequirements.txt- Python dependenciesdata_sources.tsv- Source dataset URLs in GCS tranfer formatlocal.env- Environment variables for local setupoutput.csv- Output data mart of top rated categoriesREADME.md- Project documentation
This project is a multi-service data processing platform orchestrated with Docker Compose. It integrates:
- Apache Airflow for workflow orchestration
- PostgreSQL as the relational database for Airflow metadata
- BigQuery for analytical processing
- Goolge Cloud Storage for object storage
- DataFlow for decompression of the source data
- dbt for data modeling and transformation
- Docker compose for service management
| Software | Name and version |
|---|---|
| Operating System | MacOS 15.3 (version 20.04) M1 |
| CPU configuration | Apple M1 Pro - 16 GB RAM |
| Container host | OrbStack |
| Container software | Docker (version 2.7.3) |
| Container orchestration | Docker Compose (version 3.8) |
| Orchestrator software | Apache Airflow (version 2.7.3) |
| Database | PostgreSQL (version 13) |
| Analytical database | BigQuery (version 24.10.4) |
| Data transformation tool | dbt (version 1.8.0) |
- Ensure Docker and Docker Compose are installed.
- Install Google Cloud SDK Follow the instructions to install the Google Cloud SDK: https://cloud.google.com/sdk/docs/install
- Create a GCP Project Create a new project in the Google Cloud Console: https://console.cloud.google.com/projectcreate. You can use free trial credits if available.
- Set the project ID for the Google Cloud SDK:
gcloud config set project <your-project-id>
- Enable Required APIs
Enable the necessary Google Cloud APIs:
gcloud services enable dataflow.googleapis.com gcloud services enable bigquery.googleapis.com gcloud services enable storage.googleapis.com
- Clone this repository.
- For the local test you can limit the number of batches to be processed by changing the
NUMBER_STAG_BATCHESvariable in thelocal.envfile.BUCKET_NAME=<your-bucket-name> PROJECT_ID=<your-project-id> SERVICE_ACCOUNT=<your-service-account>@<your-project>.iam.gserviceaccount.com
- Create Service Account Key
Create a service account key for authentication:
gcloud iam service-accounts keys create OUTPUT_FILE --iam-account <your-service-account>@<your-project>.iam.gserviceaccount.com
- Build and start all services:
docker-compose up -d --build
- Monitor containers and logs as needed.
- Access the Airflow web interface at
http://localhost:8080with credentials fromentrypoint.shand trigger the000_ingestion_gcsDAG.
The data processing pipeline consists of the following Airflow DAGs:
- Data Ingestion:
000_ingerstion_gcs.py: Creates a GCS bucket, uploads data source list, initiates transfer from public URLs to GCS, waits for files to appear, then triggers decompression
Note
At this stage if the ingestion transfer is having trouble. Make sure that the service account has the necessary permissions of Storage Object Admin.
-
Data Decompression:
010_decompression.py: Uses Google Cloud Dataflow to decompress gzipped files stored in GCS, monitors for completion, then triggers staging load
-
Data Loading:
020_load_staging.py: Creates BigQuery dataset, loads raw data from GCS to BigQuery tables, then triggers dbt processing
-
Data Transformation:
030_dbt_processing.py: Runs dbt models to transform raw data into analytical tables and data marts
-
Data Access:
040_flush_source_data.py: Downloads processed data from GCS to local storage for further analysis
Each DAG is designed to run sequentially, with automatic triggering of the next step upon successful completion.
- Add asynchronous processing to the ingestion DAG and staging DAG to optimize the process for better throughput and latency.
- Add more modeling according to the business requirements.
- Add data quality check frameworks like Great expectations for data profiling and analysis.
#TODO:
- Test run with time estimation - 2 hours
- Presentation
- Adjust README.md
- Reflect on learing points prepared by Sergei Romanov