A complete end-to-end data pipeline that scrapes websites, processes data through a lakehouse architecture, and provides RAG (Retrieval-Augmented Generation) capabilities.
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Scraper βββββΆβ MinIO βββββΆβ Spark βββββΆβ Delta β
β β β (Storage) β β (ETL) β β Lake β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β RAG API ββββββ Chroma ββββββ Embeddings βββββββββ
β (FastAPI) β β (Vector DB) β β β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
- Web Scraping: Robust scraper with crawling capabilities
- Data Lake: MinIO-based storage with bronze/silver/gold layers
- ETL Pipeline: Spark-powered transformations with Delta Lake
- Quality Monitoring: Comprehensive data quality checks
- Vector Search: Chroma-based embeddings and retrieval
- RAG API: FastAPI service for question answering
- Orchestration: Airflow DAGs for pipeline automation
- Monitoring: Detailed logging and statistics
lakehouse-to-rag/
βββ README.md # This file
βββ docker-compose.yaml # Service orchestration
βββ Makefile # Development commands
βββ architecture.png # System architecture diagram
β
βββ src/ # Source code
β βββ scraper/ # Web scraping module
β β βββ scraper.py # Main scraper class
β β βββ minio_utils.py # MinIO integration
β β βββ __main__.py # CLI interface
β β βββ requirements.txt # Dependencies
β β βββ Dockerfile # Container definition
β β
β βββ api/ # FastAPI service
β β βββ main.py # API endpoints
β β βββ requirements.txt # Dependencies
β β βββ Dockerfile # Container definition
β β
β βββ helpers/ # Shared utilities
β β βββ duckdb_queries.py # Query utilities
β β βββ delta_queries.py # Delta table queries
β β βββ requirements.txt # Dependencies
β β
β βββ tests/ # Test suite
β βββ test_scraper.py # Scraper tests
β βββ test_etl.py # ETL tests
β βββ test_api.py # API tests
β
βββ airflow/ # Airflow configuration
β βββ dags/ # Pipeline DAGs
β βββ scrape_etl_dag.py # Main ETL pipeline
β βββ etl_utils.py # ETL utilities
β βββ requirements.txt # DAG dependencies
β
βββ config/ # Configuration files
β βββ scraper/ # Scraper configurations
β β βββ example.yaml # Example scraper config
β βββ etl/ # ETL configurations
β βββ api/ # API configurations
β
βββ data/ # Data storage
β βββ delta/ # Delta Lake tables
β β βββ bronze/ # Raw data
β β βββ silver/ # Cleaned data
β β βββ gold/ # Final data
β βββ lineage/ # Data lineage logs
β
βββ tests/ # Integration tests
βββ test_pipeline.py # End-to-end tests
βββ test_integration.py # Service integration tests
- Docker and Docker Compose
- Python 3.10+
- 4GB+ RAM available
-
Clone and setup:
git clone <repository-url> cd lakehouse-to-rag
-
Start services:
docker compose up -d
-
Access services:
- Airflow UI: http://localhost:8080 (admin/admin)
- MinIO Console: http://localhost:9001 (minioadmin/minioadmin)
- FastAPI: http://localhost:8001
- Chroma: http://localhost:8000
-
Create virtual environment:
python -m venv .venv source .venv/bin/activate # Linux/Mac # or .venv\Scripts\activate # Windows
-
Install dependencies:
pip install -r src/scraper/requirements.txt pip install -r src/api/requirements.txt pip install -r src/helpers/requirements.txt
-
Run tests:
make test
Using CLI:
# Single page scraping
python -m scraper --url https://example.com \
--selectors '{"title": "h1", "content": ".main"}'
# Site crawling
python -m scraper --config config/scraper/example.yaml \
--crawl --max-pages 50 --statsUsing Airflow:
- Set Airflow Variables for configuration
- Trigger the
lakehouse_etl_pipelineDAG - Monitor progress in Airflow UI
The ETL pipeline processes data through three stages:
- Bronze: Raw scraped data with basic cleaning
- Silver: Cleaned and normalized data
- Gold: Deduplicated and enriched data
Query the knowledge base:
curl -X POST "http://localhost:8001/ask" \
-H "Content-Type: application/json" \
-d '{"question": "What is the main topic?"}'The project includes comprehensive configuration examples in sample_config/:
Basic Example (sample_config/example.yaml):
site_url: "https://example.com"
selectors:
title: "h1, .title, .page-title"
content: ".content, .main-content, .article-content"
author: ".author, .byline"
date: ".date, .published-date"
advanced:
rate_limit: 1.0
timeout: 30
max_retries: 3
min_content_length: 100
respect_robots: true
max_pages: 50Comprehensive Examples (sample_config/config.yaml):
- Blog/News sites
- Documentation sites
- E-commerce pages
- Academic papers
- API documentation
- Forum/Community sites
- Portfolio sites
Usage Examples:
# Use basic example (Project Gutenberg)
python -m scraper --config sample_config/example.yaml --crawl
# List available examples
python -m scraper --config sample_config/config.yaml --list-examples
# Use specific Project Gutenberg example
python -m scraper --config sample_config/config.yaml --example gutenberg_classics --crawl
# Use catalog browsing
python -m scraper --config sample_config/config.yaml --example gutenberg_catalog --crawl --max-pages 10
# Use author pages
python -m scraper --config sample_config/config.yaml --example gutenberg_authors --crawl
# Create custom configuration
cp sample_config/example.yaml my_gutenberg_config.yaml
# Edit my_gutenberg_config.yaml with your preferences
python -m scraper --config my_gutenberg_config.yaml --crawlTest the Configuration:
# Test the Project Gutenberg configuration
python src/scraper/test_config.pySet these in Airflow UI β Admin β Variables:
scraper_site_url: Target website URLscraper_selectors: YAML string of CSS selectorsscraper_max_pages: Maximum pages to crawlscraper_crawl: Enable/disable crawling
# Test scraper
python -m pytest src/tests/test_scraper.py
# Test ETL
python -m pytest src/tests/test_etl.py
# Test API
python -m pytest src/tests/test_api.py# End-to-end pipeline test
python -m pytest tests/test_pipeline.py
# Service integration test
python -m pytest tests/test_integration.pyThe pipeline includes comprehensive data quality checks:
- Record counts at each stage
- Content length analysis
- Missing value detection
- Duplicate identification
- Scraper: Progress and error logs
- ETL: Transformation statistics
- API: Request/response logs
- Airflow: Task execution logs
-
Environment Variables:
export MINIO_ENDPOINT=your-minio-endpoint export MINIO_ACCESS_KEY=your-access-key export MINIO_SECRET_KEY=your-secret-key
-
Docker Compose:
docker compose -f docker-compose.prod.yaml up -d
-
Monitoring:
- Set up Prometheus/Grafana for metrics
- Configure alerting for pipeline failures
- Monitor resource usage
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
- Follow PEP 8 style guide
- Add type hints to all functions
- Include docstrings for all classes and methods
- Write tests for new features
- Update documentation as needed
This project is licensed under the MIT License - see the LICENSE file for details.
- Issues: Create an issue on GitHub
- Documentation: Check the docs/ directory
- Community: Join our discussions
Built with β€οΈ for the data community