Utilizing data from Denver.gov, build a data pipeline which would allow users to
interact with large datasets effectively taking into account frequent data
updates. Requirements found at de_task.md
Non-emergency calls for help. 311 Service
Accident related data up to the present day. Traffic Data
This is a perfect use-case for Spark for a few reasons:
- Scale. Apache Spark is designed to handle enterprise levels of datasets.
- Speed. Exploring your big data interactively is possible using implementations of Spark such as Pyspark.
- Syntax. Spark (in particular PySpark) allows the user to use familiar Pandas-like syntax to query & transform datasets. In addition, SQL is available for users who prefer the declarative style of coding.
- Support. Most major cloud providers have implementations available.
- Apache Spark (Setup to follow)
- git
- Python 3.x
- zsh
- Optional (If following remote code)
- AWS Account
- AWS CLI Configured
In order to start using Apache Spark locally, you will need to install a local
copy. Fortunately, Python users have an easy way to get started.
Simply pip install pyspark in your environment will be enough to get a copy
installed locally. It is recommended to use a virtual environment for this step
of the process.
If you would like to follow the remote code, you will need to have an active AWS
Account & preferably an IAM Administrator account set up. In addition, this
sample code uses AWS CLI to perform operations. The easiest option to get
accounts setup is to use aws configure to configure your settings.
All code can be found under src. It is split by type of operation, with a
local and remote version of the same code. This is to showcase how you could
accomplish each task on your local computer or in the cloud.
Each version includes a directory for tasks 1-3 making it easy to understand the logic flow matching the brief.
Install required Python packages.
pip install -r requirements.txtNOTE: All code expects the working directory set to the project. As mentioned above, it is a good idea to create a virtual env to run project code.
NOTE2: The shell code assumes you are on the newer zsh.
The project assumes a daily cadence of data delivery and partitions the data directories by the execution date. Shell was chosen for the ability to handle large data downloads and simple integration with the AWS api. Alternatively, python could be used to perform the same operation.
zsh src/local/task_1/dl_local.shThis will download the datasets to your local machine under the
/data/takehome-staging/exec_date directory.
The remote version of this code would instead of sending this data to our local
machine place itself into an S3 bucket with a similar partitioning scheme to
local. Under the config.json you can find the naming convention used by this
project.
zsh src/remote/task_1/stream_to_s3.shMaking datasets queryable by Spark is straightforward. Although Spark can use CSVs directly, it is more common to convert data files into an optimized format such as Parquet. Subsequent loads of the data can take advantage of the format for increased speed and reduced memory usage.
Our local implementation uses Spark to convert the data files into parquet files
located within data/takehome-process.
spark-submit --master "local[3]" src/local/task_2/create_datastore.pyRunning the above code in your terminal will transform both files in parquet
format optimized for Spark. One note here is that local[3] will use 3 parallel
threads to process your application. Low powered machines may need to change
that setting.
The primary difference of the remote implementation is that we will be setting up a small AWS EMR cluster to run our data jobs. The primary steps are:
- Create additional buckets to hold logs and our pyspark scripts.
- Setting up Key Pair for use with the project.
- Create & setup the EMR cluster on AWS. This takes ~10 min to start up.
- Submit the pyspark scripts to our running EMR cluster.
I've combined the setup steps (1-3) into a helper shell script. We'll keep the EMR cluster running for now as it will be used in the final task.
zsh src/remote/task_2/setup/setup.shOnce the setup is complete, you can go ahead and submit our jobs to the EMR cluster. You can monitor the jobs under the AWS management console.
python3 src/remote/task_2/add_job_steps.py --job-type processThe final task is query our datasets and provide insights into the data. Our analysis aims to answer these 4 questions:
- Do the amount of traffic accidents vary over a given year? Do they show seasonality?
- Does that also correlate with road conditions? What times of year are better/worse?
- What geographical areas do 311 Service Calls occur in?
- Do these translate to a similar number of traffic accidents?
The local version of this report is straightforward. Saved to the
src/local/task_3 directory is an example analysis of these questions using
Jupyter. The spark implementation here is using a local spark instance to run
the analysis code.
A pattern you will see in this notebook with pyspark is using spark SQL notation to aggregate the large amount of data efficiently into tabular formats useful for analysis. From there, you convert them to more data science friendly format for general visualization. To launch, run the shell code below to open jupyter lab. From there, you can click on the report in your browser and run the analysis.
jupyter notebook src/local/task_3/analysis.ipynbThis version takes advantage of a utility called SparkMagic. It allows the user to submit spark jobs from a local Jupyter notebook. The main advantage is that you can take advantage of your local environment and avoid setting up a bootstrap file for the cluster.
First, you will need to run the setup script.
zsh src/remote/task_3/setup/setup.shThis will:
- Create a key-value pair
- Download, and install SparkMagic which will enable local jupyter notebooks to interact with the cluster
- Enable port forwarding to connect the notebook to the EMR cluster
You will need to leave this terminal window running while interacting with the jupyter notebook. In a separate terminal, you can run the command below to open the analysis notebook.
jupyter notebook src/remote/task_3/analysis/spark_analysis.ipynbIncluded is a teardown script. Run this script to:
- Stop running EMR clusters
- Delete the take-home project buckets on S3.
- Delete the key pair we created earlier.
zsh src/teardown.shIn the case of repeated pulls of the data from the website, I would recommend a scheduling service such as Apache Airflow. We avoided it for this project since the data is relatively straightforward. The main benefit would be parameterizing the jobs and reducing potential mistakes.
Jupyter is great for sharing analyses between data analysts. However, as was showcased here, it takes a bit of work to set up an appropriate environment. In a company environment where resources would be shared, I could foresee using a centralized service such as Databricks which combines computation with a notebook environment making sharing relatively painless. ](https://github.com/jupyter-incubator/sparkmagic)
The current pipeline assumes the data is correct and will always be the same going forward. However, in the real world that is not always the case. A more robust project would include tests on the data from Denver.gov. The 3 main testing areas would be:
- End-to-End system testing
- Given a sample set of data, does the pipeline match expected output.
- Is run outside of the normal pipeline logic.
- Data quality testing
- Check data output against business rules.
- Occurs in pipeline every time.
- Alerting / Monitoring
- Implement logging of the pipeline to monitor data quality statistics.
- Examples include datadog, or could even be used with something like Metabase with a database of logs.