Generating CSV Files from MySQL Database using Apache Airflow

In this tutorial, we will explore how to generate CSV files from a MySQL database using Apache Airflow. Apache Airflow is an open-source platform that allows you to programmatically author, schedule, and monitor workflows. We will create a workflow that connects to a MySQL database, fetches data, and generates CSV files on a scheduled basis.

Prerequisites:

  1. Basic knowledge of SQL and MySQL.
  2. Familiarity with Python and Apache Airflow concepts.
  3. Apache Airflow installed and configured on your system.

Set Up Your Environment

Make sure you have Apache Airflow installed. You can install it using pip:

pip install apache-airflow

Create a Python Script for Generating CSV

Create a Python script, let’s call it mysql_to_csv.py, where we will write the logic to fetch data from the MySQL database and generate CSV files. Here’s a basic example:

import csv
import mysql.connector

# MySQL connection details
db_config = {
    'host': 'your_mysql_host',
    'user': 'your_mysql_user',
    'password': 'your_mysql_password',
    'database': 'your_database_name'
}

# Fetch data from MySQL and generate CSV
def generate_csv():
    connection = mysql.connector.connect(**db_config)
    cursor = connection.cursor()

    query = "SELECT * FROM your_table_name"
    cursor.execute(query)
    data = cursor.fetchall()

    csv_file = open('output.csv', 'w', newline='')
    csv_writer = csv.writer(csv_file)

    # Write header
    csv_writer.writerow([i[0] for i in cursor.description])

    # Write data
    csv_writer.writerows(data)

    csv_file.close()
    connection.close()

if __name__ == '__main__':
    generate_csv()

Create an Apache Airflow DAG

Now, let’s create an Apache Airflow DAG (Directed Acyclic Graph) to schedule the execution of our Python script at specified intervals. Create a Python script named mysql_to_csv_dag.py:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os

default_args = {
    'owner': 'your_name',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 6),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'mysql_to_csv_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),  # Adjust as needed
)

def generate_csv_task():
    os.system('python /path/to/mysql_to_csv.py')

generate_csv_operator = PythonOperator(
    task_id='generate_csv_task',
    python_callable=generate_csv_task,
    dag=dag,
)

generate_csv_operator

Start Apache Airflow Web Server and Scheduler

Start the Apache Airflow web server and scheduler using the following commands:

airflow webserver -p 8080
airflow scheduler

Congratulations! You’ve successfully set up a workflow using Apache Airflow to generate CSV files from a MySQL database on a scheduled basis. This tutorial provides a basic example, and you can further enhance it by handling errors, customizing the CSV output, and incorporating additional tasks into your DAG.

Leave a Reply

Your email address will not be published. Required fields are marked *

Post comment