Skip to content

Commit 90479ed

Browse files
authored
Merge pull request #39 from DataTalksClub/feature/week2_airflow
Feature/week2 airflow
2 parents 96fae76 + 4294abf commit 90479ed

File tree

5 files changed

+31
-8
lines changed

5 files changed

+31
-8
lines changed

week_2_data_ingestion/airflow/1_setup.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
2. You may need to upgrade your docker-compose version to v2.x+, and set the memory for your Docker Engine to minimum 5GB
1212
(ideally 8GB). If enough memory is not allocated, it might lead to airflow-webserver continuously restarting.
1313

14+
3. Python version: 3.7+
15+
16+
1417
### Airflow Setup
1518

1619
1. Create a new sub-directory called `airflow` in your `project` dir (such as the one we're currently in)

week_2_data_ingestion/airflow/2_concepts.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,22 @@ Backend to the Airflow environment. Used by the scheduler, executor and webserve
2727
All these services allow you to run Airflow with CeleryExecutor.
2828
For more information, see [Architecture Overview](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html).
2929

30-
Directories:
30+
#### Project Structure:
3131
* `./dags` - `DAG_FOLDER` for DAG files
3232
* `./logs` - contains logs from task execution and scheduler.
3333
* `./plugins` - for custom plugins
34+
35+
36+
#### Workflow components
37+
* `DAG`: Directed acyclic graph, specifies the dependencies between a set of tasks with explicit execution order, and has a beginning as well as an end. (Hence, “acyclic”)
38+
* `DAG Structure`: DAG Definition, Tasks (eg. Operators), Task Dependencies (control flow: `>>` or `<<` )
39+
40+
* `Task`: a defined unit of work (aka, operators in Airflow). The Tasks themselves describe what to do, be it fetching data, running analysis, triggering other systems, or more.
41+
* Common Types: Operators (used in this workshop), Sensors, TaskFlow decorators
42+
* Sub-classes of Airflow's BaseOperator
43+
44+
* `DAG Run`: individual execution/run of a DAG
45+
* scheduled or triggered
46+
47+
* `Task Instance`: an individual run of a single task. Task instances also have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “up for retry”, etc.
48+
* Ideally, a task should flow from `none`, to `scheduled`, to `queued`, to `running`, and finally to `success`.

week_2_data_ingestion/airflow/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ docker-compose up
2727

2828
5. Run your DAG on the Web Console.
2929

30+
6. On finishing your run or to shut down the container/s:
31+
```shell
32+
docker-compose down
33+
```
3034

3135
For more info, check out these official docs:
3236
* https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

week_2_data_ingestion/airflow/dags/data_ingestion_gcs_dag.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
from datetime import datetime
32
import logging
43

54
from airflow import DAG
@@ -12,15 +11,13 @@
1211
import pyarrow.csv as pv
1312
import pyarrow.parquet as pq
1413

15-
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "pivotal-surfer-336713")
16-
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc_data_lake_pivotal-surfer-336713")
14+
PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
15+
BUCKET = os.environ.get("GCP_GCS_BUCKET")
1716

1817
dataset_file = "yellow_tripdata_2021-01.csv"
1918
dataset_url = f"https://s3.amazonaws.com/nyc-tlc/trip+data/{dataset_file}"
2019
path_to_local_home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/")
2120
parquet_file = dataset_file.replace('.csv', '.parquet')
22-
23-
path_to_creds = f"{path_to_local_home}/google_credentials.json"
2421
BIGQUERY_DATASET = os.environ.get("BIGQUERY_DATASET", 'trips_data_all')
2522

2623

@@ -61,6 +58,7 @@ def upload_to_gcs(bucket, object_name, local_file):
6158
"retries": 1,
6259
}
6360

61+
# NOTE: DAG declaration - using a Context Manager (an implicit way)
6462
with DAG(
6563
dag_id="data_ingestion_gcs_dag",
6664
schedule_interval="@daily",
@@ -83,7 +81,7 @@ def upload_to_gcs(bucket, object_name, local_file):
8381
},
8482
)
8583

86-
# TODO: Homework: research and try XCOM to communicate output values between 2 tasks/operators
84+
# TODO: Homework - research and try XCOM to communicate output values between 2 tasks/operators
8785
local_to_gcs_task = PythonOperator(
8886
task_id="local_to_gcs_task",
8987
python_callable=upload_to_gcs,
@@ -109,4 +107,4 @@ def upload_to_gcs(bucket, object_name, local_file):
109107
},
110108
)
111109

112-
download_dataset_task >> format_to_parquet >> local_to_gcs_task >> bigquery_external_table_task
110+
download_dataset_task >> format_to_parquet >> local_to_gcs_task >> bigquery_external_table_task

week_2_data_ingestion/airflow/docker-compose.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,15 @@ x-airflow-common:
6060
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
6161
GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json
6262
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'
63+
GCP_PROJECT_ID: 'pivotal-surfer-336713'
64+
GCP_GCS_BUCKET: "dtc_data_lake_pivotal-surfer-336713"
6365

6466
volumes:
6567
- ./dags:/opt/airflow/dags
6668
- ./logs:/opt/airflow/logs
6769
- ./plugins:/opt/airflow/plugins
6870
- ~/.google/credentials/:/.google/credentials:ro
71+
6972
user: "${AIRFLOW_UID:-50000}:0"
7073
depends_on:
7174
&airflow-common-depends-on

0 commit comments

Comments
 (0)