Data preparation is a critical step in any data analytics or machine learning project. This lecture covers the essential techniques for sourcing data from various sources and cleaning it for analysis.
[!WARNING] Can you use any data?
Always check data sources for restrictions before use!
Common licenses:
Characteristics:
Advantages:
Examples:
Characteristics:
Sources:
Challenges:
from bs4 import BeautifulSoup
html_content = """
<html><body><ul>
<li class="shoe-item">Air Jordans</li>
<li class="shoe-item">Light up Sketchers</li>
</ul></body></html>
"""
soup = BeautifulSoup(html_content, 'html.parser')
for item in soup.find_all(attrs={'class': 'shoe-item'}):
print(item.text)
# Output:
# Air Jordans
# Light up Sketchers
⚠ Rate limiting: Servers may block excessive requests
⚠ IP blocking: May get banned for aggressive scraping
⚠ Legal issues: May violate terms of service
⚠ Fragile code: Breaks when HTML structure changes
⚠ Dynamic content: JavaScript-rendered content requires special handling
Best Practice: Use official APIs when available instead of scraping
Data quality issues include:
Goal: Ensure data is as representative and accurate as possible for machine learning algorithms
Data should be in a normalized format:
Before: After:
Name: "John Smith" first_name: "John"
Age: "25 years" last_name: "Smith"
City: "NYC" age: 25
city: "New York City"
import pandas as pd
# Drop rows with any missing values
df_clean = df.dropna()
# Drop rows where specific column is missing
df_clean = df.dropna(subset=['age'])
# Drop columns with too many missing values
df_clean = df.dropna(axis=1, thresh=len(df)*0.5)
Considerations:
# Fill with mean
df['age'].fillna(df['age'].mean(), inplace=True)
# Fill with median (better for skewed data)
df['salary'].fillna(df['salary'].median(), inplace=True)
# Fill with mode (for categorical data)
df['category'].fillna(df['category'].mode()[0], inplace=True)
Considerations:
# Forward fill (use previous value)
df['temperature'].fillna(method='ffill', inplace=True)
# Backward fill (use next value)
df['temperature'].fillna(method='bfill', inplace=True)
Use case: Time series data
# Linear interpolation
df['value'].interpolate(method='linear', inplace=True)
# Polynomial interpolation
df['value'].interpolate(method='polynomial', order=2, inplace=True)
import numpy as np
# Sample from distribution
mean = df['age'].mean()
std = df['age'].std()
missing_count = df['age'].isnull().sum()
df.loc[df['age'].isnull(), 'age'] = np.random.normal(mean, std, missing_count)
Some algorithms handle missing values:
Advantage: Minimal bias introduction
# Find duplicate rows
duplicates = df.duplicated()
# Find duplicates based on specific columns
duplicates = df.duplicated(subset=['name', 'email'])
# View duplicate rows
df[df.duplicated(keep=False)]
# Remove all duplicates
df_clean = df.drop_duplicates()
# Keep first occurrence
df_clean = df.drop_duplicates(keep='first')
# Keep last occurrence
df_clean = df.drop_duplicates(keep='last')
# Remove based on specific columns
df_clean = df.drop_duplicates(subset=['user_id'])
[!IMPORTANT] Duplicates can have meaning!
- Multiple purchases by same customer
- Repeated sensor readings
- Time-series data
Questions to ask:
Problem domain specifies what is valid:
# Example: Age must be positive
df = df[df['age'] > 0]
# Example: Percentage must be 0-100
df = df[(df['percentage'] >= 0) & (df['percentage'] <= 100)]
# Example: Date must be in past
from datetime import datetime
df = df[df['date'] <= datetime.now()]
# Z-score method
from scipy import stats
z_scores = np.abs(stats.zscore(df['value']))
df_clean = df[z_scores < 3]
# IQR method
Q1 = df['value'].quantile(0.25)
Q3 = df['value'].quantile(0.75)
IQR = Q3 - Q1
df_clean = df[(df['value'] >= Q1 - 1.5*IQR) &
(df['value'] <= Q3 + 1.5*IQR)]
Tip: Keep them around - they may be useful later!
┌─────────────┐
│ Data Source │
└──────┬──────┘
│
┌──────▼──────┐
│ Extract │ (Get data from source)
└──────┬──────┘
│
┌──────▼──────┐
│ Transform │ (Clean, normalize, enrich)
└──────┬──────┘
│
┌──────▼──────┐
│ Load │ (Store in destination)
└──────┬──────┘
│
┌──────▼──────┐
│ Validate │ (Check quality)
└─────────────┘
import pandas as pd
def data_pipeline(input_file, output_file):
# 1. Extract
df = pd.read_csv(input_file)
# 2. Transform
# Handle missing values
df['age'].fillna(df['age'].median(), inplace=True)
df['category'].fillna('Unknown', inplace=True)
# Remove duplicates
df = df.drop_duplicates(subset=['user_id'])
# Handle invalid values
df = df[df['age'] > 0]
df = df[df['age'] < 120]
# Normalize formats
df['email'] = df['email'].str.lower().str.strip()
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True)
# 3. Load
df.to_csv(output_file, index=False)
# 4. Validate
print(f"Original rows: {len(pd.read_csv(input_file))}")
print(f"Cleaned rows: {len(df)}")
print(f"Missing values:\n{df.isnull().sum()}")
return df
# Run pipeline
clean_data = data_pipeline('raw_data.csv', 'clean_data.csv')
# Copy to HDFS
hdfs dfs -copyFromLocal local_file.csv /data/input/
# Or use Spark
spark-submit --master yarn load_data.py
import pandas as pd
from sqlalchemy import create_engine
# Create connection
engine = create_engine('postgresql://user:pass@localhost/db')
# Load data
df.to_sql('table_name', engine, if_exists='replace', index=False)
# AWS S3
import boto3
s3 = boto3.client('s3')
s3.upload_file('local_file.csv', 'bucket-name', 'data/file.csv')
# Google Cloud Storage
from google.cloud import storage
client = storage.Client()
bucket = client.bucket('bucket-name')
blob = bucket.blob('data/file.csv')
blob.upload_from_filename('local_file.csv')
For very large datasets (> 1 PB):
# Keep track of transformations
cleaning_log = {
'missing_values_filled': ['age', 'salary'],
'duplicates_removed': 150,
'outliers_removed': 23,
'invalid_values_removed': 5
}
def validate_data(df, stage):
print(f"\n=== Validation at {stage} ===")
print(f"Shape: {df.shape}")
print(f"Missing values:\n{df.isnull().sum()}")
print(f"Duplicates: {df.duplicated().sum()}")
print(f"Data types:\n{df.dtypes}")
# Never modify original
df_original = pd.read_csv('data.csv')
df_working = df_original.copy()
# Work on copy
df_working = clean_data(df_working)
def clean_column(series, strategy='mean'):
"""Reusable cleaning function"""
if strategy == 'mean':
return series.fillna(series.mean())
elif strategy == 'median':
return series.fillna(series.median())
elif strategy == 'mode':
return series.fillna(series.mode()[0])
return series
Data sourcing and cleaning are critical steps in the data pipeline:
Remember: Data scientists spend 60-80% of their time on data preparation - it’s worth doing well!