Introduction
In today's data-driven world, building efficient and scalable data pipelines is crucial for extracting insights from raw data. One common approach to achieving this is through ETL (Extract, Transform, Load) pipelines, which streamline the process of collecting, processing, and storing data. In this article, I will walk you through creating a data pipeline with Apache Airflow and deploying it on AWS, demonstrating ETL in action from data extraction to transformation and storage.
![figure 1: Graph of sample data pipeline on Apache Airflow](https://static.wixstatic.com/media/81114d_46b624763f574c8db1d9b5e1946632ea~mv2.png/v1/fill/w_980,h_526,al_c,q_90,usm_0.66_1.00_0.01,enc_auto/81114d_46b624763f574c8db1d9b5e1946632ea~mv2.png)
Workflow
*This article uses MacOS or Linux (for AWS) terminal for CLI commands, please make sure you use CLI commands that are relevant to your development environment
Full Github code available here
I will be using my personal project for an interactive dashboard to demonstrate the process here, we will begin by scraping articles from Medium's archives using Scrapy, a powerful Python library for web crawling (feel free to use your choice of data source, such as outside API, internal data, etc.). Once we've gathered the data, we will clean and transform it with Pandas and NumPy to make the data analysis-ready and load it into a PostgreSQL database.
I will also briefly go through the process of setting up PostgreSQL and Airflow. You can skip both of these sections and head straight to the data collection part if you already have them set up. At the end, I will also show you an optional step to deploy a simple Airflow pipeline on AWS EC2. I recommend setting up AWS EC2 instance first and doing the steps in setting up Airflow in the instance if you want to deploy it there.
This pipeline forms the foundation for downstream tasks such as data analysis and building dashboards to derive meaningful trends and insights.
![figure 2: System workflow for this project](https://static.wixstatic.com/media/81114d_2f4f9752154446e58fbb7e331f4af009~mv2.png/v1/fill/w_980,h_734,al_c,q_90,usm_0.66_1.00_0.01,enc_auto/81114d_2f4f9752154446e58fbb7e331f4af009~mv2.png)
Set up PostgreSQL
*Postgres and PostgreSQL is mentioned interchangeably in this article, they mean the same thing.
To get started, make sure you already have Postgres available in your local computer, if you haven't, you can download it from the official page here
Once you have Postgres installed on your computer, start the PostgreSQL service:
brew services start postgresql
Access the Postgres CLI:
psql -U postgres
*The -U postgres specifies the default Postgres superuser. It will prompt you for a password. Enter the one you set during installation
Create a database to hold the table you want to create. For example:
CREATE DATABASE my_articles_db;
Switch to the new database:
\c my_articles_db
Define and create your table. For example:
CREATE TABLE articles_db (
id SERIAL PRIMARY KEY, -- Auto-incrementing ID
title TEXT, -- Article title
author VARCHAR(100), -- Author name
...
);
Verify the structure of your table:
\d articles_db
You can query your data with SQL:
SELECT * FROM articles_db
Exit the PostgreSQL CLI:
\q
Set up Airflow
*In this section, I will show you how to set up Airflow on your local computer, you may use a Virtual Machine of your choice or a managed Airflow service. I will also go through how to set up your own Airflow on AWS EC2 down below.
** There are a few different ways to install Airflow, the complete list is available here.
Instead of using the default superuser to connect Airflow with Postgres, we will create a new user in Postgres specifically for Airflow with limited privileges that has just enough permissions to perform the necessary operations. This approach enhances security and minimizes potential issues.
Log back in to Postgres:
psql -U postgres
Create a new user:
CREATE USER new_user WITH PASSWORD 'your_password';
Allow the new user to access to the database
GRANT CONNECT ON DATABASE my_articles_db TO new_user;
Switch to the database:
\c my_articles_db
Grant the CREATE privileges:
GRANT CREATE ON SCHEMA public TO new_user;
If you want to allow full access to all tables in that particular database:
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO new_user;
If you want to let airflow_user to modify the tables in that particular database:
GRANT INSERT, UPDATE, DELETE, SELECT ON ALL TABLES IN SCHEMA public TO new_user;
Exit out of Postgres.
As noted at the beginning of this section, there are more than one ways to install Airflow. In this tutorial, I will be using pip to install Airflow in my local virtual environment. Before showing you the steps to install Airflow, I will briefly go through the key folders that you will see later on:
dags/
This is where you store your DAG (Directed Acrylic Graphs) definitions. In Airflow, a DAG is a collection of all the tasks you want to run organized in a way that reflects their relationships and dependencies. [1]
Airflow looks in this folder for files containing DAGs to schedule and execute. The default location is usually shown in your airflow.cfg under the [core] section 'dags_folder = path/to/dag'. This is where Airflow will automatically look for your DAGs. If you don't have a dags folder set up when you installed Airflow, you can manually create a new folder and name it 'dags' and set the path manually in airflow.cfg
logs/
This is where Airflow stores log files for your tasks (webserver, scheduler, worker)
airflow.cfg
This file is the main configuration file for Airflow. It determines how Airflow behaves, including its connections to the database, logging, scheduling, and other operational settings. For details, check out their documentation here
Now that we understand the basics, let's go ahead and set up Airflow on your local machine:
Create a virtual environment and activate:
python -m venv airflow_venv
source airflow_venv/bin/activate
Install Airflow with Postgres:
pip install apache-airflow-providers-postgres
Install dependencies we will need for this:
pip install psycopg2-binary
pip install SQLAlchemy
*psycopg2 is the PostgreSQL adapter for Python that SQLAlchemy uses to communicate with PostgreSQL databases.Airflow already depends on SQLAlchemy, so it should be installed when you set up Airflow.
Look for the Airflow folder that has just been installed in your local machine, in MacOS, it may show up in your user folder. You can now exit from your previous folder and go to where the Airflow folder is, for example if you were in 'path/to/folder/A' and your Airflow was installed in 'path/to/folder/B', change your directory to 'path/to/folder/B', make sure your have your virtual environment active.
Initialize Airflow to connect to the database:
airflow db init
Look for the airflow.cfg file in that folder and look for sql_alchemy_conn. This setting in the airflow.cfg specifies the connection string that Airflow uses to connect to its metadata database via SQLAlchemy. The metadata database is central to Airflow's operation, as it stores information about DAGs, tasks, users, logs, and configurations.
The syntax for the sql_alchemy_conn is as follows:
sql_alchemy_conn = dialect+driver://username:password@host:port/database
For example:
sql_alchemy_conn = postgresql+psycopg2://new_user:your_password@localhost:5432/my_articles_db
Create a new Airflow user:
airflow users create --username your_username --password your_password --firstname yourfirstname --lastname yourlastname --role Admin --email email@youremail.com
List all the Airflow users to check:
airflow users list
Run the Airflow scheduler:
airflow scheduler
In a separate terminal, run the Airflow webserver:
airflow webserver # remember to activate virtual env
Access the Airflow UI:
You will see the default example DAGs in the Airflow UI. If you would like to remove them, go to airflow.cfg and change this to False:
load_examples = False
Data Collection
Before collecting the data you need for a project, it's important to consider the objectives of your project. For me, as I write technical articles in my free time, I wanted to analyze the current trending topics in the software engineering and its adjacent industries, I was also curious if there's a pattern for better engagement for longer articles vs shorter, and so on. At this stage, it's useful to think of the metrics and final form that you want for your project.
For this purpose, I first created a first draft of the data and visualization that I think I will need on a blank word document, I then scraped the initial data and used Jupyter notebooks and Figma to make it easier for me to visualize them in my head.
Since the purpose of this article is to build a data pipeline with Airflow and not web scraping, I will try to keep this section short. For the purpose of scraping the data that I need, I used Doug Grott's repo for medium scraper available here with some modifications. He has also written a few articles on his process when scraping available here
I made some modifications to the code, mainly because I wanted the DAG to automatically run on a monthly basis so I don't have to manually set the date. To do that, I created a calculate_dates function to calculate the start and end dates. It sets the start date to the previous month and the end date to the last day of the previous month. This made sure the spider always targets data from the most recent full month. I then modified the parse_month and parse_year section to follow these guidelines.
from datetime import datetime, timedelta
def __init__(self, *args, **kwargs):
super(ArticleSpider, self).__init__(*args, **kwargs)
self.start_date, self.end_date = self.calculate_dates()
def calculate_dates(self):
# Get current date
today = datetime.now()
start_date = today.replace(day=1) - timedelta(days=1)
start_date = start_date.replace(day=1) # Set to the first of last month
# Calculate end date
end_date = today.replace(day=1) - timedelta(days=1)
return start_date, end_date
I also used the csv pipeline to save the data I have just collected into a csv file to further process in the next section.
Data Transformation
The purpose of the data transformation section is to clean and transform the data so that it meets the requirements for analysis or storage in the database. For this part, I followed the guidelines I have set from the previous section and did a draft of the visualizations on Jupyter Notebook.
If you're new to data cleaning, I recommend referring to guidelines to ensure thoroughness. For example, Omar Elgabry's s article on data cleaning is a great article for guidance.
In my case, I applied standard data cleaning techniques, such as removing incomplete rows that could affect analysis, standardizing data types, among others. I also calculated the word count as the raw data only includes the read time by multiplying it with the avg word per minute.
Finally, I applied a logarithmic transformation to the claps to reduce skewness and normalize the data for better model interpretability. This minimizes the impact of extreme outliers and helps meet assumptions of normality in my analysis.
Please note that this section is responsible for converting raw data into clean, structured, and analyzable format and you don't need to do a complete analysis here.
Data Loading
There are multiple ways to load the data into the database, you can insert into the database column by column, copy the entire CSV, or use a message queue.
In this section, we will begin by connecting to the database and testing the connection. Next, we will insert the transformed CSV data into the database.
Connecting to the database
Before inserting data into the database, it's important to test the connection and validate that the setup works as expected. Start by creating a .env to securely load your sensitive credentials in the following format:
POSTGRES_DB='your_database_name'
POSTGRES_USER='your_postgres_username'
POSTGRES_PWD='your_postgress_password'
POSTGRES_HOST='localhost' # usually, this would be localhost on your local computer
POSTGRES_PORT='5432' # Postgres's default port
We will then initialize the connection to the database using the psycopg2.connect() function, and call the cursor() method on the connection object to create a cursor() object. [2]
import psycopg2
from dotenv import load_dotenv
load_dotenv()
try:
conn = psycopg2.connect(
database=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PWD'),
host=os.getenv('POSTGRES_HOST'),
port=os.getenv('POSTGRES_PORT')
)
print('Connection successful')
except Exception as e:
print(f"Error connecting to the database {e}")
The cursor() object allows us to interact with our Postgres database and execute SQL queries, for the purpose of testing our connection, we will try to return one row from our database and print out the result in our CLI.
# Test a query
cursor = conn.cursor()
cursor.execute("SELECT * FROM articles LIMIT 1;")
result = cursor.fetchone()
print('Query Result: ', result)
Finally, we will close both our cursor and connection to the database, we will do so regardless of whether the operation was successful. This is critical to avoid resource leaks that can degrade performance or cause issues over time.
finally:
if cursor:
cursor.close()
if conn:
conn.close()
Insert the data to the database
Once we have confirmed a successful connection to the database, we set an input path that stores the files from which we want to insert data. In this example, the file names may change based on the year and month, as intended.
The code below follows the name format specified in the data transformation section.
current_date = datetime.now()
previous_date = (current_date.replace(day=1) - timedelta(days=1))
year = previous_date.year
month = previous_date.month
input_file_path = f'/path/to/data/clean_df_{year}_{month:02}.csv'
df = pd.read_csv(input_file_path)
We will start by setting autocommit to True to allow automatic transaction management, by default, PostgreSQL operates in manual transaction (you must call conn.commit() to save changes to the database), by setting autocommit to True, every SQL statement we execute is committed immediately.
We will then iterate through the columns and insert the data into the database by using cursor.execute(), which executes a single SQL query. The first argument is the SQL statement and the second argument is a tuple of values to replace the placeholders. For example:
cursor.execute("INSERT INTO my_table(col1) VALUES (%s)", ('value1',))
Before executing the insertion, we ensure the data aligns with the database schema to avoid errors. Since autocommit is enabled, every statement is immediately committed, making conn.commit() unnecessary.
conn.autocommit = True
cursor = conn.cursor()
# Iterate through rows and insert into the database
for index, row in df.iterrows():
cursor.execute("""
INSERT INTO articles (
author, title, collection, read_time, claps, responses,
published_date, pub_year, pub_month, pub_date, pub_day,
word_count, title_cleaned, week, log_claps, word_count_title
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", tuple(row))
print("Data successfully inserted to database.")
In this is example, we set autocommit=True, but it is not always recommended for production. See references for more information. [3] [4]
Putting it all together on Airflow DAG
Before moving ahead with the process I'm using to run Airflow DAG, I want to introduce the concept of operators in Airflow. Based on Airflow's documentation:
An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG. [5]
Some of the most popular operators are Bash, Python, and email operators. In this project, I'm mainly using Bash Operators as I need to activate an external proxy for specific scripts. The DAG consists of 3 primary tasks, each executed via 'BashOperator', starting from the web scraper, followed by data transformation, and finally data loading into the database.
We will first start setting the default arguments for the DAG, here I have set the basics such as the date and time it should first run, allows for 1 retries, and wait for 5 minutes before attempting retries. There are other options available, such as, email on failure, queue, etc. [6]
# Default Arguments for the DAG
default_args = {
'owner': 'your_username',
'depends_on_past': False,
'start_date': datetime(2024, 10, 16, 17, 20, 0), # (YYYY, MM, DD, HH, MM, SS)
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
Next, we will define the DAG using the DAG class, which orchestrates task execution based on specified schedule. Here, I used an identifier to name the dag, you can use any descriptive name, the default args is defined above, and the schedule interval here uses a cron expression to schedule the DAG, which translates to execution at 5:20 PM UTC on the 16th of every month.
# Define the DAG
with DAG(
'spider_dag',
default_args=default_args,
description='Scrapy Spider DAG',
schedule_interval='20 17 16 * *'
) as dag:
Finally, we are ready to set up our operators, as mentioned above, we will first run the scraper followed by the data transformation, and the data loading to the database. We will use BashOperator for these tasks, the first example activates the virtual environment, the proxy, and then run the spider.
# Task to activate the virtual environment, activate proxy, and run the spider
run_scraper = BashOperator(
task_id='run_scraper',
bash_command='source /path/to/airflow_venv/bin/activate &&'
'cd /path/to/airflow/scripts/articles_collector/articles_collector &&'
f'export http_proxy={os.getenv("HTTP_PROXY")} &&'
f'export https_proxy={os.getenv("HTTPS_PROXY")} &&'
'scrapy crawl first_spider'
)
We'll do the same with the other two tasks:
run_data_transform = BashOperator(
task_id='run_data_transform',
bash_command='cd /path/to/airflow/scripts &&'
'python transform_data.py'
)
run_copy_to_db = BashOperator(
task_id='run_copy_to_db',
bash_command='cd /path/to/airflow/scripts &&'
'python save_to_db.py'
)
Finally, we will set our task dependencies. [7] Airflow allows us to define task execution order using >> for downstream tasks and << for upstream tasks instead explicitly setting dependencies with .set_upstream() or .set_downstream()
Here, we will execute a sequential execution by starting the scraper, followed by data transformation, and finaly, load the data to the database.
run_scraper >> run_data_transform >> run_copy_to_db
Running the Airflow DAG
At this point, we are ready to run our Airflow DAG. Start by running the scheduler and in a separate terminal, run the webserver:
airflow scheduler
airflow webserver # in a separate terminal
It should look something like this in the terminal:
![Figure 3: Airflow Scheduler on terminal (left), Airflow Webserver (right)](https://static.wixstatic.com/media/81114d_ecbea0487f20454aa83119a76811c71c~mv2.png/v1/fill/w_980,h_259,al_c,q_85,usm_0.66_1.00_0.01,enc_auto/81114d_ecbea0487f20454aa83119a76811c71c~mv2.png)
We can open up the UI for Airflow through:
The UI should look something like this:
![Figure 4: Airflow homepage accessed through localhost](https://static.wixstatic.com/media/81114d_bb5ecaeae70a4b58a47f9b2da6ec2a81~mv2.png/v1/fill/w_980,h_563,al_c,q_90,usm_0.66_1.00_0.01,enc_auto/81114d_bb5ecaeae70a4b58a47f9b2da6ec2a81~mv2.png)
It will show the available DAGs we have set up, we can now run our DAG manually by pressing the play (▶) -right pointing triangle- button under Actions. The DAG will run automatically at the selected time if you've set it in your DAG. It will also show successful and failed runs.
Setting up Airflow on AWS EC2 and connect to AWS RDS Postgres (optional)
*This article assumes you already have an AWS account set up and know how to launch an EC2 instance, otherwise, check out the following video tutorial to get started: [video]
Set up a Security Group for this project
Look up "Security Group" in the search bar (usually located at the top of the screen). Create a security group, you can set your inbound and outbound rules after the security group has been created, for now, just add in the name and create the security group. For example, we might name it "temp-security"
The inbound rules control incoming traffic to an instance, while outbound rules manage outgoing traffic from that instance. To allow PostgreSQL connections to and from the instance we will use later, we can add the rule in this Security Group, by adding inbound rule with the following configuration:
Type: PostgreSQL
Source: Custom
In the search bar next to it, look for the name "temp-security"
We will also add an SSH rule from local IP to allow SSH connection from our local computer to access our instances later on:
Type: SSH
Source: My IP
Set Up AWS RDS PostgreSQL
Go to RDS > Databases > Create Database.
This is the configuration we will use in this tutorial:
Database Creation Method: Standard create
Engine: PostgreSQL
Templates: Free Tier
enter your preferred database name, username, and password
Credentials Management: Self Managed
VPC Security Group: Choose Existing (temp-security) -the name you have chosen for your security group earlier-
Set the availability zone (AZ) closest to you, and make sure that the resources you will need later is always set up in the same AZ
You can also disable the performance monitoring for now and leave everything else as is.
Set Up AWS EC2 to connect to RDS PostgreSQL and make changes
In the previous steps, we have configured security groups to allow SSH access from our local machine and PostgreSQL connections within the same security group. However, in order to directly manage and interact with the RDS instance, we need to establish a secure communication channel. This is done through SSH tunneling via an EC2 instance, which acts as an intermediary between your local machine and the AWS RDS instance.
Why use EC2 as a Gateway for SSH into RDS?
Security: RDS instances are often not publicly accessible to avoid direct internet exposure. By using EC2 as an intermediary, we keep RDS protected while allowing access through a controlled, secure entry point.
Network Access: EC2 instances, unlike RDS, have public IP addresses or can be configured to access RDS via private subnets. By SSHing into the EC2 instance, we can securely forward traffic to RDS without exposing it to the public internet.
To do that, set up a new EC2 instance using the Security Group we have set up earlier with the free-tier eligible instance type, in this tutorial, we will be using the Amazon Linux AMI. Once you have launched your EC2 instance, select the EC2 instance > Connect > SSH client
Open your local terminal, make sure to change your directory to where your key-pair is located. If you haven't already, use the chmod 400 command to make sure that only you (the file owner) can read the key for security.
Next, create the SSH tunnel from your local machine to the EC2 instance, which then forwards traffic to your RDS instance:
# ssh -i "yourKeyPair.pem" -L <local-unused-port>:<RDS-endpoint>:<rds-port> <EC2-public-DNS>
# it should look something like this
ssh -i "yourKeyPair.pem" -L 9000:db-name.abcd1234efg.us-west-2.rds.amazonaws.com:5432 ec2-instance@ec2-00-00-000-000.compute-2.amazonaws.com
If successful, it should show something like this:
![Figure 5: successfully SSHed into your instance](https://static.wixstatic.com/media/81114d_60d069c8bc65466182f03982903388e7~mv2.png/v1/fill/w_980,h_282,al_c,q_85,usm_0.66_1.00_0.01,enc_auto/81114d_60d069c8bc65466182f03982903388e7~mv2.png)
Install PSQL:
sudo dnf install postgresql15
psql --version
Connect to the RDS instance:
# psql -h <rds-endpoint> -U <rds-username> -d postgres -p 5432
psql -h db-name.abcd1234efg.us-west-2.rds.amazonaws.com -U username -d postgres -p 5432
It will then prompt you for your RDS instance password. Once you entered, you can start by creating your database and making changes as specified in the Set Up PostgreSQL section above.
Set Up AWS EC2 for Airflow
Setting up EC2 for Airflow is pretty much the same as Set up AWS EC2 to connect to RDS PostgreSQL section, but make sure to use a larger instance, at least t3.medium, and set the same security group. You can SSH into your EC2 instance here with your IDE to make it easier to organize, create, and change your scripts, I used VSCode with the extension Remote SSH for this.
To do this, go to VSCode > extensions > install remote ssh. Restart your VSCode and press "cmd + Shift + p" enter the ssh details as shown in your EC2 > connect section, make sure to use the correct path for your key-pair location. You can then create scripts, make changes, and installations to your Airflow as specified in the Set Up Airflow above.
AWS Clean Up
Once you're done with your instances and don't plan to have them up, don't forget to stop and terminate your instances, make sure to check your volumes, VPC (Security Groups), and snapshots that you don't plan to keep to prevent charges for unused resources.
Conclusion
In this article, we built an ETL data pipeline by creating an Airflow DAG, we started by setting up PostgreSQL and Airflow, which can be done on our local machine or in the cloud. We then fetched the necessary data using Scrapy, cleaned and transformed it using NumPy and Pandas, and loaded it into PostgreSQL with pscopg2, a Python module for PostgreSQL database interaction.
Finally, we put these processes together within an Airflow DAG to automate the ETL workflow. The entire pipeline was then deployed on AWS EC2 and RDS, making it scalable and accessible for future use.
References
[1] Airflow Documentation, Concepts. [Documentation]
[2] Psycopg2 Documentation, Cursor. [Documentation]
[3] A guide to autocommit in JDBC, Baeldung. [Article]
[5] Airflow Documentation, Operators. [Documentation]
[6] Airflow Documentation, Example Pipeline Definition. [Documentation]
[7] Airflow Documentation, Tasks. [Documentation]
[8] Web Scraping Medium Articles with Scrapy, Doug's Blog. [Article]
Comentarios