Core Principles
Null Safety
Primitives handle null values gracefully using PySpark’s when().otherwise()
pattern:
@emails.register()
def filter_valid_emails(col: Column) -> Column:
"""Return email only if valid, otherwise return null."""
return F.when(is_valid_email(col), col).otherwise(F.lit(None))
No Exceptions
Primitives don’t throw exceptions - they return nulls or default values for invalid data:
# Instead of throwing an error for invalid phone numbers
# This returns null for invalid inputs
@phone.register()
def clean_phone(col: Column) -> Column:
"""Clean and validate phone number, returning null for invalid numbers."""
digits = extract_digits(col)
return F.when(
F.length(digits) == 10, # Valid NANP
format_nanp(digits)
).otherwise(F.lit(None))
Validation Patterns
Pre-validation
Use validation primitives to check data before transformation:
@text.compose()
def safe_email_processing():
if validation.is_valid_email():
email.normalize()
email.extract_domain()
else:
# Handle invalid emails differently
lambda col: F.lit("INVALID_EMAIL")
Filter Invalid Data
Use filter primitives to separate valid from invalid data:
# Process only valid emails, nulls for invalid
df.select(
email.filter_valid_emails(F.col("email_input"))
).filter(F.col("email_input").isNotNull())
Default Values
Primitives can accept default values for handling edge cases:
@customer.register()
def extract_account_type(col, default_type='STANDARD'):
"""Extracts account type with fallback to default."""
return F.when(F.substring(col, 1, 2) == 'PR', F.lit('PREMIUM'))
.when(F.substring(col, 1, 2) == 'EN', F.lit('ENTERPRISE'))
.otherwise(F.lit(default_type)) # Safe default
Handling Different Error Scenarios
Missing Data
# Handle missing required fields
@customer.compose()
def handle_missing_data():
lambda col: F.when(col.isNull(), F.lit("MISSING")).otherwise(col)
customer.standardize()
Invalid Format
# Mark invalid formats instead of failing
@phone.compose()
def mark_invalid_phones():
lambda col: F.when(
phone.validate_nanp(col),
phone.format_e164(col)
).otherwise(F.concat(F.lit("INVALID:"), col))
Data Quality Issues
# Track data quality while processing
df = df.withColumn(
"email_quality",
F.when(email.is_valid_email(F.col("email")), "valid")
.when(F.col("email").isNull(), "missing")
.otherwise("invalid")
)
Best Practices for Error Handling
- Return nulls, not exceptions - Let Spark handle null propagation
- Use filter primitives - Separate valid/invalid data cleanly
- Provide defaults - Always have sensible fallback values
- Track quality - Add columns to track data quality issues
- Validate early - Check data validity before complex transformations
- Document behavior - Clearly state how primitives handle edge cases
- Test edge cases - Include null, empty, and invalid data in tests
Advanced Features
Error Tracking Pattern
Create columns to track transformation status:
# Track data quality while processing
df = df.withColumn(
"processing_status",
F.struct(
F.col("email").alias("original"),
email.normalize(F.col("email")).alias("processed"),
F.when(email.is_valid_email(F.col("email")), "valid")
.otherwise("invalid").alias("status")
)
)
Multi-field Validation
Validate multiple fields together:
@customer.compose()
def validate_customer_record():
# Check if all required fields are present
lambda col: F.when(
F.col("email").isNotNull() &
F.col("phone").isNotNull() &
F.col("address").isNotNull(),
F.lit("complete")
).otherwise(F.lit("incomplete"))
Conditional Error Recovery
Apply different recovery strategies based on error type:
@email.compose()
def smart_error_recovery():
# Try to fix common typos first
if email.has_common_typo():
email.fix_typos()
# Then validate and mark if still invalid
if email.is_valid_email():
email.normalize()
else:
lambda col: F.concat(F.lit("INVALID:"), col)
Compilation Error Handling
Based on the PipelineCompiler
in DataCompose, the framework handles compilation errors gracefully:
AST Compilation Fallback
The @compose
decorator has multiple fallback strategies:
@namespace.compose()
def my_pipeline():
namespace.transform()
# If AST compilation fails:
# 1. PipelineCompiler attempts advanced compilation
# 2. Falls back to _fallback_compose for sequential extraction
# 3. Ultimate fallback returns identity function
Compilation Warnings
The compiler logs warnings without failing:
# From PipelineCompiler
if self.debug:
logger.warning(f"Failed to compile '{func.__name__}': {e}")
logger.debug("Compilation error details:", exc_info=True)
# Returns empty pipeline as fallback
return StablePipeline([], self.debug)
Step Validation
Each CompiledStep
validates itself:
# CompiledStep validation
def validate(self):
if self.step_type == "transform":
if not callable(self.action):
raise ValueError(f"Transform step requires callable action")
elif self.step_type == "conditional":
if not callable(self.condition):
raise ValueError(f"Conditional step requires callable condition")
Runtime Error Handling
StablePipeline Execution
The StablePipeline
executor handles errors during execution:
def _execute_steps(self, steps, col):
result = col
for step in steps:
if self.debug:
# Log each step for debugging
self.logger.debug(f"Executing step: {step_name}")
if step.step_type == "transform":
if callable(step.action):
result = step.action(result)
# Gracefully handles non-callable actions
Fallback Behavior
When compilation completely fails:
# Ultimate fallback - identity function
def pipeline(col):
return col # Returns input unchanged
pipeline.__doc__ = f"Failed to compile {func.__name__}"
Debugging Failed Transformations
Enable debug mode to trace issues:
@text.compose()
def debug_pipeline():
text.trim()
lambda col: F.when(F.length(col) == 0, F.lit("EMPTY_AFTER_TRIM")).otherwise(col)
text.lowercase()
text.validate()
# Debug output shows:
# - Successful compilation message with step count
# - Each step execution with name
# - Any compilation warnings or errors
Debug Logging Levels
import logging
# Enable detailed debugging
logging.getLogger("datacompose.operators.primitives").setLevel(logging.DEBUG)
# Now compilation and execution will show:
# - AST parsing attempts
# - Fallback strategies used
# - Step-by-step execution traces
Related Documentation
- Primitives - Core primitive concepts
- Composition - Building robust pipelines