Getting Started

Get up and running with DataCompose in minutes

Getting Started with DataCompose

Transform your data pipelines in minutes with production-ready PySpark code.

Quick Start with Playground

Get started quickly by cloning the DataCompose Playground repository, which comes with all dependencies pre-configured:

# Clone the playground repo
git clone https://github.com/datacompose/datacompose-playground
cd datacompose-playground

# Start the environment (requires Docker and Docker Compose)
docker-compose up --build -d

Access Jupyter Notebook

  • Open http://localhost:8888 in your browser
  • Navigate to the notebooks directory
  • Open the example notebooks to see DataCompose in action

The playground provides a complete PySpark environment with DataCompose pre-installed and example notebooks to get you started immediately.

Prerequisites: Docker and Docker Compose must be installed on your system.

Quick Start (Local Installation)

Follow these three simple steps to start transforming your data with DataCompose.

Step 1: Initialize Your Project

# Initialize DataCompose in your project
datacompose init

# Or auto-accept defaults
datacompose init --yes

This creates a datacompose.json configuration file with default settings.

Step 2: Add Transformers

Generate production-ready PySpark transformation code:

# Add email cleaning transformers
datacompose add emails

# Add address standardization transformers
datacompose add addresses

# Add phone number formatting transformers
datacompose add phone_numbers

Each command generates fully-functional transformation primitives in your transformers/pyspark/ directory.

Step 3: Use the Generated Code

Integrate the generated primitives into your existing pipeline:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Import the transformer primitives
from transformers.pyspark.emails import emails
from transformers.pyspark.addresses import addresses
from transformers.pyspark.phone_numbers import phones

# Initialize Spark session
spark = SparkSession.builder 
    .appName("DataCompose Example") 
    .config("spark.sql.adaptive.enabled", "true") 
    .getOrCreate()

# Load your data
df = spark.read.csv("customers.csv", header=True)

# Apply email transformations
df_validated = df.withColumn(
    "email_clean",
    emails.standardize_email(F.col("customer_email"))
).withColumn(
    "email_domain",
    emails.extract_domain(F.col("email_clean"))
).withColumn(
    "email_is_valid",
    emails.is_valid_email(F.col("email_clean"))
)

# Filter to valid emails
df_valid = df_validated.filter(F.col("email_is_valid"))

# Show results
df_valid.select(
    "customer_email",
    "email_clean",
    "email_domain",
    "email_is_valid"
).show(5, truncate=False)

Building Complex Pipelines

Chain multiple transformers to build comprehensive data cleaning pipelines:

from pyspark.sql import functions as F
from transformers.pyspark.emails import emails
from transformers.pyspark.phone_numbers import phones
from transformers.pyspark.addresses import addresses

# Apply all transformations
df_cleaned = (
    df
    # Email cleaning
    .withColumn("email_clean", emails.standardize_email(F.col("email")))
    .withColumn("email_valid", emails.is_valid_email(F.col("email_clean")))
    .withColumn("email_domain", emails.extract_domain(F.col("email_clean")))
    
    # Phone formatting
    .withColumn("phone_clean", phones.standardize_phone(F.col("phone")))
    .withColumn("phone_formatted", phones.format_nanp(F.col("phone_clean")))
    .withColumn("phone_valid", phones.is_valid_nanp(F.col("phone_clean")))
    
    # Address standardization
    .withColumn("state_code", addresses.standardize_state(F.col("state")))
    .withColumn("zip_valid", addresses.is_valid_zip_code(F.col("zip")))
    
    # Filter to valid records only
    .filter(F.col("email_valid") & F.col("phone_valid"))
)

# Create a reusable pipeline function
def clean_customer_data(df):
    """Apply all customer data transformations."""
    return (
        df
        .withColumn("email", emails.standardize_email(F.col("email")))
        .withColumn("phone", phones.format_nanp(phones.standardize_phone(F.col("phone"))))
        .withColumn("state", addresses.standardize_state(F.col("state")))
        .filter(emails.is_valid_email(F.col("email")))
        .filter(phones.is_valid_nanp(F.col("phone")))
    )

# Apply the pipeline
df_final = clean_customer_data(df)

Using Compose for Pipelines

DataCompose supports creating composed pipelines using decorators:

from datacompose.operators.primitives import PrimitiveRegistry

# Create a custom registry
custom = PrimitiveRegistry("custom")

# Register custom transformations
@custom.register()
def normalize_name(col):
    return F.initcap(F.trim(col))

@custom.register()
def remove_special_chars(col):
    return F.regexp_replace(col, r'[^a-zA-Z0-9s]', '')

# Create a composed pipeline
@custom.compose()
def clean_names():
    custom.remove_special_chars()
    custom.normalize_name()

# Use the pipeline
df = df.withColumn("name_clean", clean_names(F.col("name")))

Configuration

Customize DataCompose behavior with datacompose.json:

{
  "version": "1.0.0",
  "targets": {
    "pyspark": {
      "output": "./transformers/pyspark",
      "generator": "SparkPandasUDFGenerator"
    }
  },
  "templates": {
    "directory": "src/transformers/templates"
  }
}

Next Steps

Frequently Asked Questions