Getting Started with DataCompose
Transform your data pipelines in minutes with production-ready PySpark code.
Quick Start with Playground
Get started quickly by cloning the DataCompose Playground repository, which comes with all dependencies pre-configured:
# Clone the playground repo
git clone https://github.com/datacompose/datacompose-playground
cd datacompose-playground
# Start the environment (requires Docker and Docker Compose)
docker-compose up --build -d
Access Jupyter Notebook
- Open http://localhost:8888 in your browser
- Navigate to the notebooks directory
- Open the example notebooks to see DataCompose in action
The playground provides a complete PySpark environment with DataCompose pre-installed and example notebooks to get you started immediately.
Prerequisites: Docker and Docker Compose must be installed on your system.
Quick Start (Local Installation)
Follow these three simple steps to start transforming your data with DataCompose.
Step 1: Initialize Your Project
# Initialize DataCompose in your project
datacompose init
# Or auto-accept defaults
datacompose init --yes
This creates a datacompose.json
configuration file with default settings.
Step 2: Add Transformers
Generate production-ready PySpark transformation code:
# Add email cleaning transformers
datacompose add emails
# Add address standardization transformers
datacompose add addresses
# Add phone number formatting transformers
datacompose add phone_numbers
Each command generates fully-functional transformation primitives in your transformers/pyspark/
directory.
Step 3: Use the Generated Code
Integrate the generated primitives into your existing pipeline:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Import the transformer primitives
from transformers.pyspark.emails import emails
from transformers.pyspark.addresses import addresses
from transformers.pyspark.phone_numbers import phones
# Initialize Spark session
spark = SparkSession.builder
.appName("DataCompose Example")
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
# Load your data
df = spark.read.csv("customers.csv", header=True)
# Apply email transformations
df_validated = df.withColumn(
"email_clean",
emails.standardize_email(F.col("customer_email"))
).withColumn(
"email_domain",
emails.extract_domain(F.col("email_clean"))
).withColumn(
"email_is_valid",
emails.is_valid_email(F.col("email_clean"))
)
# Filter to valid emails
df_valid = df_validated.filter(F.col("email_is_valid"))
# Show results
df_valid.select(
"customer_email",
"email_clean",
"email_domain",
"email_is_valid"
).show(5, truncate=False)
Building Complex Pipelines
Chain multiple transformers to build comprehensive data cleaning pipelines:
from pyspark.sql import functions as F
from transformers.pyspark.emails import emails
from transformers.pyspark.phone_numbers import phones
from transformers.pyspark.addresses import addresses
# Apply all transformations
df_cleaned = (
df
# Email cleaning
.withColumn("email_clean", emails.standardize_email(F.col("email")))
.withColumn("email_valid", emails.is_valid_email(F.col("email_clean")))
.withColumn("email_domain", emails.extract_domain(F.col("email_clean")))
# Phone formatting
.withColumn("phone_clean", phones.standardize_phone(F.col("phone")))
.withColumn("phone_formatted", phones.format_nanp(F.col("phone_clean")))
.withColumn("phone_valid", phones.is_valid_nanp(F.col("phone_clean")))
# Address standardization
.withColumn("state_code", addresses.standardize_state(F.col("state")))
.withColumn("zip_valid", addresses.is_valid_zip_code(F.col("zip")))
# Filter to valid records only
.filter(F.col("email_valid") & F.col("phone_valid"))
)
# Create a reusable pipeline function
def clean_customer_data(df):
"""Apply all customer data transformations."""
return (
df
.withColumn("email", emails.standardize_email(F.col("email")))
.withColumn("phone", phones.format_nanp(phones.standardize_phone(F.col("phone"))))
.withColumn("state", addresses.standardize_state(F.col("state")))
.filter(emails.is_valid_email(F.col("email")))
.filter(phones.is_valid_nanp(F.col("phone")))
)
# Apply the pipeline
df_final = clean_customer_data(df)
Using Compose for Pipelines
DataCompose supports creating composed pipelines using decorators:
from datacompose.operators.primitives import PrimitiveRegistry
# Create a custom registry
custom = PrimitiveRegistry("custom")
# Register custom transformations
@custom.register()
def normalize_name(col):
return F.initcap(F.trim(col))
@custom.register()
def remove_special_chars(col):
return F.regexp_replace(col, r'[^a-zA-Z0-9s]', '')
# Create a composed pipeline
@custom.compose()
def clean_names():
custom.remove_special_chars()
custom.normalize_name()
# Use the pipeline
df = df.withColumn("name_clean", clean_names(F.col("name")))
Configuration
Customize DataCompose behavior with datacompose.json
:
{
"version": "1.0.0",
"targets": {
"pyspark": {
"output": "./transformers/pyspark",
"generator": "SparkPandasUDFGenerator"
}
},
"templates": {
"directory": "src/transformers/templates"
}
}
Next Steps
- Browse the full documentation
- Check out example notebooks
- View the API reference
- Join the community discussions
Frequently Asked Questions
Basic PySpark knowledge is helpful but not required. DataCompose is designed to be approachable for different skill levels:
- Beginners: Use generated code as-is with simple configuration
- Intermediate: Customize transformers to fit your needs
- Advanced: Build complex pipelines and contribute transformers
The generated code includes extensive comments and follows PySpark best practices, making it an excellent learning resource.
Absolutely! This is the core philosophy of DataCompose. The generated code is:
- Yours to own: No vendor lock-in or black boxes
- Version controlled: Track changes with Git
- Fully customizable: Modify any aspect to fit your needs
- Well documented: Clear code with comprehensive comments
Think of DataCompose as a smart code generator, not a restrictive framework.
Yes! DataCompose generates production-grade code with:
- Zero runtime dependencies beyond PySpark
- Comprehensive error handling with configurable strategies
- Type hints for better IDE support and type checking
- Performance optimizations using Spark best practices
- No UDFs - uses native Spark functions for optimal performance
- Test coverage with generated test templates
Many organizations use DataCompose-generated code in production pipelines processing billions of records daily.
DataCompose works anywhere PySpark runs:
Cloud Platforms
- AWS: EMR, Glue, SageMaker
- Azure: Synapse, Databricks, HDInsight
- GCP: Dataproc, Dataflow, Vertex AI
- Databricks: All runtime versions
- Snowflake: Snowpark integration
On-Premise
- Cloudera Data Platform
- Hortonworks HDP
- Apache Spark clusters
- Kubernetes (Spark Operator)
- Local development
DataCompose provides multiple error handling strategies:
- Permissive mode: Log errors and continue processing
- Strict mode: Fail fast on any error
- Custom handlers: Define your own error handling logic
- Null handling: Configurable strategies for null values
- Data quality metrics: Track and report data quality issues
All error handling is configurable through the datacompose.json configuration file.
DataCompose generates highly optimized PySpark code:
- No UDFs: Uses native Spark functions (10-100x faster)
- Catalyst optimized: Leverages Spark's query optimizer
- Minimal shuffles: Smart partitioning strategies
- Predicate pushdown: Filters data early in the pipeline
- Column pruning: Only processes required columns
- Adaptive execution: Supports Spark 3.0+ AQE features
Benchmark results show 5-10x performance improvement over hand-written naive implementations.
DataCompose provides several update strategies:
- Regenerate: Use
datacompose add --force
to overwrite - Merge: Generate to a temporary location and manually merge changes
- Extend: Create wrapper functions that call generated code
- Fork: Copy and rename for complete independence
Best Practice: Always use version control and review changes before merging.