Generating CSV Files from MySQL Database using Apache Airflow
In this tutorial, we will explore how to generate CSV files from a MySQL database using Apache Airflow. Apache Airflow is an open-source platform that allows you to programmatically author, schedule, and monitor workflows. We will create a workflow that connects to a MySQL database, fetches data, and generates CSV files on a scheduled basis.
Prerequisites:
- Basic knowledge of SQL and MySQL.
- Familiarity with Python and Apache Airflow concepts.
- Apache Airflow installed and configured on your system.
Set Up Your Environment
Make sure you have Apache Airflow installed. You can install it using pip:
pip install apache-airflow
Create a Python Script for Generating CSV
Create a Python script, let’s call it mysql_to_csv.py
, where we will write the logic to fetch data from the MySQL database and generate CSV files. Here’s a basic example:
import csv
import mysql.connector
# MySQL connection details
db_config = {
'host': 'your_mysql_host',
'user': 'your_mysql_user',
'password': 'your_mysql_password',
'database': 'your_database_name'
}
# Fetch data from MySQL and generate CSV
def generate_csv():
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
query = "SELECT * FROM your_table_name"
cursor.execute(query)
data = cursor.fetchall()
csv_file = open('output.csv', 'w', newline='')
csv_writer = csv.writer(csv_file)
# Write header
csv_writer.writerow([i[0] for i in cursor.description])
# Write data
csv_writer.writerows(data)
csv_file.close()
connection.close()
if __name__ == '__main__':
generate_csv()
Create an Apache Airflow DAG
Now, let’s create an Apache Airflow DAG (Directed Acyclic Graph) to schedule the execution of our Python script at specified intervals. Create a Python script named mysql_to_csv_dag.py
:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'your_name',
'depends_on_past': False,
'start_date': datetime(2023, 8, 6),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'mysql_to_csv_dag',
default_args=default_args,
schedule_interval=timedelta(days=1), # Adjust as needed
)
def generate_csv_task():
os.system('python /path/to/mysql_to_csv.py')
generate_csv_operator = PythonOperator(
task_id='generate_csv_task',
python_callable=generate_csv_task,
dag=dag,
)
generate_csv_operator
Start Apache Airflow Web Server and Scheduler
Start the Apache Airflow web server and scheduler using the following commands:
airflow webserver -p 8080
airflow scheduler
Congratulations! You’ve successfully set up a workflow using Apache Airflow to generate CSV files from a MySQL database on a scheduled basis. This tutorial provides a basic example, and you can further enhance it by handling errors, customizing the CSV output, and incorporating additional tasks into your DAG.