February 3, 2025
CSV to PostgreSQL - Data Pipeline


Source code: https://github.com/sahil-172002/CSV-to-PostgreSQL-Data-Pipeline
Don't forget to star the repo if you find it helpful!
Live demo here
Introduction to Data Pipeline Development
If you're already familiar with Python and PostgreSQL basics, you can skip to the implementation section
Data pipelines are essential components of modern data engineering. Whether you're working with small datasets or handling massive data warehouses, knowing how to efficiently move data between different systems is crucial. In this guide, we'll build a robust data pipeline that reads CSV files and loads them into PostgreSQL using Python.
Prerequisites
Before we dive in, make sure you have:
- Python 3.8+ installed
- PostgreSQL server running
- Basic understanding of SQL
- Sample CSV data to work with
Setting Up the Environment
First, let's set up our Python environment with the necessary packages:
pip install psycopg2-binary pandas python-dotenv
We'll use:
psycopg2
for PostgreSQL connectionpandas
for efficient CSV handlingpython-dotenv
for environment variables
Implementation
1. Project Structure
Let's organize our code properly:
csv-to-postgresql/
├── .env
├── data/
│ └── sample.csv
├── src/
│ ├── __init__.py
│ ├── config.py
│ ├── database.py
│ └── pipeline.py
└── main.py
2. Configuration Setup
First, create a .env
file to store your database credentials:
DB_HOST=localhost
DB_PORT=5432
DB_NAME=your_database
DB_USER=your_username
DB_PASSWORD=your_password
Create config.py
to load these environment variables:
from dotenv import load_dotenv
import os
load_dotenv()
DB_CONFIG = {
'host': os.getenv('DB_HOST'),
'port': os.getenv('DB_PORT'),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}
3. Database Connection Handler
Create database.py
for managing database connections:
import psycopg2
from psycopg2.extras import execute_values
from contextlib import contextmanager
class DatabaseConnection:
def __init__(self, config):
self.config = config
@contextmanager
def get_connection(self):
conn = None
try:
conn = psycopg2.connect(**self.config)
yield conn
finally:
if conn is not None:
conn.close()
def create_table(self, table_name, columns):
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id SERIAL PRIMARY KEY,
{', '.join([f'{name} {dtype}' for name, dtype in columns.items()])}
)
"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(create_table_query)
conn.commit()
def bulk_insert(self, table_name, data, columns):
insert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES %s
"""
with self.get_connection() as conn:
with conn.cursor() as cur:
execute_values(cur, insert_query, data)
conn.commit()
4. Data Pipeline Implementation
Create pipeline.py
to handle the CSV processing:
import pandas as pd
from typing import List, Dict, Any
class DataPipeline:
def __init__(self, db_connection):
self.db = db_connection
def infer_sql_types(self, df: pd.DataFrame) -> Dict[str, str]:
"""Infer PostgreSQL column types from pandas DataFrame"""
type_mapping = {
'object': 'TEXT',
'int64': 'INTEGER',
'float64': 'FLOAT',
'datetime64[ns]': 'TIMESTAMP',
'bool': 'BOOLEAN'
}
return {
col: type_mapping.get(str(dtype), 'TEXT')
for col, dtype in df.dtypes.items()
}
def process_csv(self, file_path: str, table_name: str, chunk_size: int = 1000):
"""Process CSV file and load into PostgreSQL"""
# Read CSV in chunks
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# Clean column names
chunk.columns = [col.lower().replace(' ', '_') for col in chunk.columns]
# Create table if it doesn't exist
if not hasattr(self, 'table_created'):
column_types = self.infer_sql_types(chunk)
self.db.create_table(table_name, column_types)
self.table_created = True
# Convert DataFrame to list of tuples
columns = chunk.columns.tolist()
values = [tuple(row) for row in chunk.values]
# Bulk insert data
self.db.bulk_insert(table_name, values, columns)
5. Putting It All Together
Create main.py
to orchestrate the entire process:
from src.config import DB_CONFIG
from src.database import DatabaseConnection
from src.pipeline import DataPipeline
import logging
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def main():
try:
# Initialize database connection
db = DatabaseConnection(DB_CONFIG)
# Create pipeline
pipeline = DataPipeline(db)
# Process CSV file
start_time = time.time()
logging.info("Starting CSV import...")
pipeline.process_csv(
file_path='data/sample.csv',
table_name='imported_data',
chunk_size=1000
)
end_time = time.time()
logging.info(f"Import completed in {end_time - start_time:.2f} seconds")
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
raise
if __name__ == "__main__":
main()
Advanced Features
1. Data Validation
Let's add data validation to our pipeline:
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate and clean data before insertion"""
# Remove rows with all NULL values
df = df.dropna(how='all')
# Convert date strings to datetime
date_columns = df.select_dtypes(include=['object']).columns
for col in date_columns:
try:
df[col] = pd.to_datetime(df[col], errors='ignore')
except Exception:
pass
return df
2. Error Handling
Add comprehensive error handling:
class DataPipelineError(Exception):
"""Custom exception for pipeline errors"""
pass
def safe_process_chunk(self, chunk: pd.DataFrame, table_name: str):
"""Safely process a chunk of data"""
try:
# Validate data
chunk = self.validate_data(chunk)
# Convert DataFrame to list of tuples
columns = chunk.columns.tolist()
values = [tuple(row) for row in chunk.values]
# Bulk insert data
self.db.bulk_insert(table_name, values, columns)
except Exception as e:
raise DataPipelineError(f"Error processing chunk: {str(e)}")
3. Performance Monitoring
Add performance metrics:
class PerformanceMonitor:
def __init__(self):
self.start_time = time.time()
self.records_processed = 0
def update(self, chunk_size: int):
self.records_processed += chunk_size
def get_metrics(self):
elapsed_time = time.time() - self.start_time
records_per_second = self.records_processed / elapsed_time
return {
'elapsed_time': elapsed_time,
'records_processed': self.records_processed,
'records_per_second': records_per_second
}
Best Practices
-
Memory Management
- Process data in chunks to handle large files
- Use generators where possible
- Clean up resources properly
-
Error Handling
- Implement proper exception handling
- Log errors with context
- Use transactions for data consistency
-
Performance
- Use bulk inserts instead of single inserts
- Index important columns
- Monitor memory usage
-
Security
- Use environment variables for sensitive data
- Implement proper access controls
- Sanitize input data
Demo
Let's see the pipeline in action with a sample dataset:
# Sample usage
from src.pipeline import DataPipeline
from src.database import DatabaseConnection
from src.config import DB_CONFIG
# Initialize
db = DatabaseConnection(DB_CONFIG)
pipeline = DataPipeline(db)
# Process a sample CSV file
pipeline.process_csv(
file_path='data/sample.csv',
table_name='employees',
chunk_size=1000
)
Example output:
2024-03-20 10:15:30 - INFO - Starting CSV import...
2024-03-20 10:15:31 - INFO - Processing chunk 1 of 5...
2024-03-20 10:15:32 - INFO - Processing chunk 2 of 5...
2024-03-20 10:15:33 - INFO - Processing chunk 3 of 5...
2024-03-20 10:15:34 - INFO - Processing chunk 4 of 5...
2024-03-20 10:15:35 - INFO - Processing chunk 5 of 5...
2024-03-20 10:15:35 - INFO - Import completed in 5.23 seconds
Conclusion
We've built a robust data pipeline that can efficiently handle CSV data and load it into PostgreSQL. The solution includes:
- Efficient chunk processing for large files
- Proper error handling and logging
- Performance monitoring
- Data validation
- Security best practices
This pipeline can be extended further based on specific needs, such as:
- Adding more data transformations
- Implementing parallel processing
- Adding more validation rules
- Supporting different file formats
Remember to always test thoroughly with sample data before processing production data!
If you have any questions or suggestions, feel free to reach out to me on Twitter or create an issue in the GitHub repository.
Happy coding! 🚀