February 3, 2025

CSV to PostgreSQL - Data Pipeline

CSV to PostgreSQL -  Data Pipeline

Source code: https://github.com/sahil-172002/CSV-to-PostgreSQL-Data-Pipeline

Don't forget to star the repo if you find it helpful!

Live demo here

Introduction to Data Pipeline Development

If you're already familiar with Python and PostgreSQL basics, you can skip to the implementation section

Data pipelines are essential components of modern data engineering. Whether you're working with small datasets or handling massive data warehouses, knowing how to efficiently move data between different systems is crucial. In this guide, we'll build a robust data pipeline that reads CSV files and loads them into PostgreSQL using Python.

Prerequisites

Before we dive in, make sure you have:

  • Python 3.8+ installed
  • PostgreSQL server running
  • Basic understanding of SQL
  • Sample CSV data to work with

Setting Up the Environment

First, let's set up our Python environment with the necessary packages:

pip install psycopg2-binary pandas python-dotenv

We'll use:

  • psycopg2 for PostgreSQL connection
  • pandas for efficient CSV handling
  • python-dotenv for environment variables

Implementation

1. Project Structure

Let's organize our code properly:

csv-to-postgresql/
├── .env
├── data/
│   └── sample.csv
├── src/
│   ├── __init__.py
│   ├── config.py
│   ├── database.py
│   └── pipeline.py
└── main.py

2. Configuration Setup

First, create a .env file to store your database credentials:

DB_HOST=localhost
DB_PORT=5432
DB_NAME=your_database
DB_USER=your_username
DB_PASSWORD=your_password

Create config.py to load these environment variables:

from dotenv import load_dotenv
import os
 
load_dotenv()
 
DB_CONFIG = {
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
    'database': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD')
}

3. Database Connection Handler

Create database.py for managing database connections:

import psycopg2
from psycopg2.extras import execute_values
from contextlib import contextmanager
 
class DatabaseConnection:
    def __init__(self, config):
        self.config = config
 
    @contextmanager
    def get_connection(self):
        conn = None
        try:
            conn = psycopg2.connect(**self.config)
            yield conn
        finally:
            if conn is not None:
                conn.close()
 
    def create_table(self, table_name, columns):
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id SERIAL PRIMARY KEY,
            {', '.join([f'{name} {dtype}' for name, dtype in columns.items()])}
        )
        """
        
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(create_table_query)
                conn.commit()
 
    def bulk_insert(self, table_name, data, columns):
        insert_query = f"""
        INSERT INTO {table_name} ({', '.join(columns)})
        VALUES %s
        """
        
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_query, data)
                conn.commit()

4. Data Pipeline Implementation

Create pipeline.py to handle the CSV processing:

import pandas as pd
from typing import List, Dict, Any
 
class DataPipeline:
    def __init__(self, db_connection):
        self.db = db_connection
 
    def infer_sql_types(self, df: pd.DataFrame) -> Dict[str, str]:
        """Infer PostgreSQL column types from pandas DataFrame"""
        type_mapping = {
            'object': 'TEXT',
            'int64': 'INTEGER',
            'float64': 'FLOAT',
            'datetime64[ns]': 'TIMESTAMP',
            'bool': 'BOOLEAN'
        }
        
        return {
            col: type_mapping.get(str(dtype), 'TEXT')
            for col, dtype in df.dtypes.items()
        }
 
    def process_csv(self, file_path: str, table_name: str, chunk_size: int = 1000):
        """Process CSV file and load into PostgreSQL"""
        # Read CSV in chunks
        for chunk in pd.read_csv(file_path, chunksize=chunk_size):
            # Clean column names
            chunk.columns = [col.lower().replace(' ', '_') for col in chunk.columns]
            
            # Create table if it doesn't exist
            if not hasattr(self, 'table_created'):
                column_types = self.infer_sql_types(chunk)
                self.db.create_table(table_name, column_types)
                self.table_created = True
            
            # Convert DataFrame to list of tuples
            columns = chunk.columns.tolist()
            values = [tuple(row) for row in chunk.values]
            
            # Bulk insert data
            self.db.bulk_insert(table_name, values, columns)

5. Putting It All Together

Create main.py to orchestrate the entire process:

from src.config import DB_CONFIG
from src.database import DatabaseConnection
from src.pipeline import DataPipeline
import logging
import time
 
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
 
def main():
    try:
        # Initialize database connection
        db = DatabaseConnection(DB_CONFIG)
        
        # Create pipeline
        pipeline = DataPipeline(db)
        
        # Process CSV file
        start_time = time.time()
        
        logging.info("Starting CSV import...")
        pipeline.process_csv(
            file_path='data/sample.csv',
            table_name='imported_data',
            chunk_size=1000
        )
        
        end_time = time.time()
        logging.info(f"Import completed in {end_time - start_time:.2f} seconds")
        
    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
        raise
 
if __name__ == "__main__":
    main()

Advanced Features

1. Data Validation

Let's add data validation to our pipeline:

def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
    """Validate and clean data before insertion"""
    # Remove rows with all NULL values
    df = df.dropna(how='all')
    
    # Convert date strings to datetime
    date_columns = df.select_dtypes(include=['object']).columns
    for col in date_columns:
        try:
            df[col] = pd.to_datetime(df[col], errors='ignore')
        except Exception:
            pass
    
    return df

2. Error Handling

Add comprehensive error handling:

class DataPipelineError(Exception):
    """Custom exception for pipeline errors"""
    pass
 
def safe_process_chunk(self, chunk: pd.DataFrame, table_name: str):
    """Safely process a chunk of data"""
    try:
        # Validate data
        chunk = self.validate_data(chunk)
        
        # Convert DataFrame to list of tuples
        columns = chunk.columns.tolist()
        values = [tuple(row) for row in chunk.values]
        
        # Bulk insert data
        self.db.bulk_insert(table_name, values, columns)
        
    except Exception as e:
        raise DataPipelineError(f"Error processing chunk: {str(e)}")

3. Performance Monitoring

Add performance metrics:

class PerformanceMonitor:
    def __init__(self):
        self.start_time = time.time()
        self.records_processed = 0
        
    def update(self, chunk_size: int):
        self.records_processed += chunk_size
        
    def get_metrics(self):
        elapsed_time = time.time() - self.start_time
        records_per_second = self.records_processed / elapsed_time
        return {
            'elapsed_time': elapsed_time,
            'records_processed': self.records_processed,
            'records_per_second': records_per_second
        }

Best Practices

  1. Memory Management

    • Process data in chunks to handle large files
    • Use generators where possible
    • Clean up resources properly
  2. Error Handling

    • Implement proper exception handling
    • Log errors with context
    • Use transactions for data consistency
  3. Performance

    • Use bulk inserts instead of single inserts
    • Index important columns
    • Monitor memory usage
  4. Security

    • Use environment variables for sensitive data
    • Implement proper access controls
    • Sanitize input data

Demo

Let's see the pipeline in action with a sample dataset:

# Sample usage
from src.pipeline import DataPipeline
from src.database import DatabaseConnection
from src.config import DB_CONFIG
 
# Initialize
db = DatabaseConnection(DB_CONFIG)
pipeline = DataPipeline(db)
 
# Process a sample CSV file
pipeline.process_csv(
    file_path='data/sample.csv',
    table_name='employees',
    chunk_size=1000
)

Example output:

2024-03-20 10:15:30 - INFO - Starting CSV import...
2024-03-20 10:15:31 - INFO - Processing chunk 1 of 5...
2024-03-20 10:15:32 - INFO - Processing chunk 2 of 5...
2024-03-20 10:15:33 - INFO - Processing chunk 3 of 5...
2024-03-20 10:15:34 - INFO - Processing chunk 4 of 5...
2024-03-20 10:15:35 - INFO - Processing chunk 5 of 5...
2024-03-20 10:15:35 - INFO - Import completed in 5.23 seconds

Conclusion

We've built a robust data pipeline that can efficiently handle CSV data and load it into PostgreSQL. The solution includes:

  • Efficient chunk processing for large files
  • Proper error handling and logging
  • Performance monitoring
  • Data validation
  • Security best practices

This pipeline can be extended further based on specific needs, such as:

  • Adding more data transformations
  • Implementing parallel processing
  • Adding more validation rules
  • Supporting different file formats

Remember to always test thoroughly with sample data before processing production data!

If you have any questions or suggestions, feel free to reach out to me on Twitter or create an issue in the GitHub repository.

Happy coding! 🚀