DataCompose's composition system enables you to build complex data transformation pipelines from simple primitives. The @compose
decorator provides a declarative syntax that improves code readability while maintaining performance.
Why Composition Matters
Building data pipelines traditionally requires nested function calls or intermediate variables that clutter code and reduce maintainability. Composition solves this by providing a linear, readable syntax for transformation sequences.
The @compose Decorator
The @compose
decorator transforms declarative function bodies into efficient transformation pipelines:
# Using @compose decorator
@text.compose()
def clean_text():
text.trim()
text.lowercase()
text.remove_special_chars()
# Apply the composed pipeline
df.select(clean_text(F.col("input")))
# This is exactly equivalent to:
df.select(
text.remove_special_chars(
text.lowercase(
text.trim(F.col("input"))
)
)
)
# Or using sequential application:
col = F.col("input")
col = text.trim(col)
col = text.lowercase(col)
col = text.remove_special_chars(col)
df.select(col)
The decorator is a convenience wrapper—it produces the same execution as manual chaining but with better readability.
Choosing Your Style: Decorator vs Traditional
Both approaches are equally valid - it’s entirely your choice! DataCompose supports both decorator-based composition and traditional function style:
# Option 1: Using @compose decorator (declarative style)
@text.compose()
def clean():
text.trim()
text.lowercase()
text.remove_special_chars()
# Option 2: Traditional function style
def clean(col):
col = text.trim(col)
col = text.lowercase(col)
col = text.remove_special_chars(col)
return col
# Both work exactly the same way:
df.select(clean(F.col("input"))) # Same result with either approach
When to Use Each Style
Use the @compose decorator when:
- You prefer declarative, readable syntax
- Building reusable transformation pipelines
- Working with conditional logic (if/else statements)
- Composing multiple namespaces together
Use traditional functions when:
- You’re more comfortable with explicit function calls
- Integrating with existing codebases
- Need full control over the transformation flow
- Debugging specific transformation steps
More Examples: Both Styles
# Email validation - Decorator style
@email.compose()
def validate_email():
email.lowercase()
email.trim()
if email.is_valid():
email.extract_domain()
else:
email.mark_invalid()
# Email validation - Traditional style
def validate_email(col):
col = email.lowercase(col)
col = email.trim(col)
col = F.when(
email.is_valid(col),
email.extract_domain(col)
).otherwise(
email.mark_invalid(col)
)
return col
# Phone formatting - Decorator style
@phone.compose()
def format_phone():
phone.remove_non_digits()
phone.validate_length()
phone.format_e164()
# Phone formatting - Traditional style
def format_phone(col):
col = phone.remove_non_digits(col)
col = phone.validate_length(col)
col = phone.format_e164(col)
return col
Remember: There’s no performance difference between the two approaches. Choose the style that makes your code most maintainable for your team.
Under the Hood
Pipeline Compilation Process
DataCompose uses AST (Abstract Syntax Tree) analysis to compile pipeline definitions at definition time:
- Static Analysis - Parses the function’s AST to extract transformation sequences
- Step Compilation - Converts each primitive call into a
CompiledStep
object - Runtime Execution -
StablePipeline
executor applies transformations efficiently
Performance note: Compilation happens once when the pipeline is defined, not during execution.
Execution Order Matters
Transformations execute sequentially in the order they appear:
@email.compose()
def process_email():
email.lowercase() # Step 1: Normalize case
email.trim() # Step 2: Remove whitespace
email.validate() # Step 3: Verify format
email.extract_domain() # Step 4: Extract domain part
Best practice: Order transformations from general to specific—clean before validate, validate before extract.
Advanced Features
Conditional Branching
ExperimentalComposition supports conditional logic within pipelines using if/else statements:
@validation.compose()
def process_email():
if validation.is_valid_email():
email.normalize()
email.extract_domain()
else:
email.mark_invalid()
Implementation detail: Conditions compile to Spark’s when/otherwise
expressions, maintaining lazy evaluation.
Parameterized Pipelines
Configure transformations with specific parameters:
@text.compose()
def clean_with_options():
text.trim(chars=' \t\n')
text.truncate(max_length=100)
text.lowercase()
# Parameters are baked into the pipeline
df.select(clean_with_options(F.col("description")))
Use case: Create specialized pipelines for different data sources without code duplication.
Nested Composition
Build complex pipelines from simpler ones:
@text.compose()
def basic_clean():
text.trim()
text.lowercase()
@text.compose()
def full_clean():
basic_clean() # Reuse existing pipeline
text.remove_special_chars()
text.normalize_unicode()
Design principle: Create small, focused pipelines and compose them for complex workflows.
Working with Multiple Namespaces
Composition can work across multiple primitive namespaces:
from transformers.pyspark.emails import emails
from transformers.pyspark.phone_numbers import phones
@customer.compose()
def clean_contact_info():
emails.validate()
emails.normalize()
phones.format_e164()
customer.standardize_company_name()
Composition Patterns
Sequential Processing
The most common pattern - apply transformations one after another:
@text.compose()
def standard_text_pipeline():
text.remove_html_tags()
text.normalize_whitespace()
text.trim()
text.lowercase()
Validation and Cleaning
Combine validation with conditional cleaning:
@email.compose()
def smart_email_clean():
if validation.has_common_typo():
email.fix_typos()
email.normalize()
email.validate()
Multi-field Processing
Process related fields together:
@customer.compose()
def process_customer_record():
customer.standardize_first_name()
customer.standardize_last_name()
customer.format_full_name()
customer.create_customer_id()
Inline Lambda Functions
You can return a lambda function directly in the composed function for one-off transformations:
@text.compose()
def custom_processing():
text.trim()
text.lowercase()
lambda col: F.regexp_replace(col, r'[^ws]', '') # Inline custom transformation
text.validate_length()
This allows you to include simple, pipeline-specific transformations without creating a full primitive.
Customization Hierarchy
When you need custom transformation logic, follow this recommended approach:
1. Use Primitive Parameters (Preferred)
First, try using the built-in parameters of existing primitives:
@text.compose()
def custom_clean():
text.trim(chars=' \t\n\r') # Use parameters for customization
text.truncate(max_length=50)
text.replace_pattern(pattern=r's+', replacement=' ')
2. Use Inline Lambdas (For Simple Logic)
If parameters aren’t sufficient, add inline lambda functions for one-off transformations:
@text.compose()
def specialized_clean():
text.trim()
lambda col: F.regexp_replace(col, r'[^ws-]', '') # Custom regex
lambda col: F.when(F.length(col) < 3, None).otherwise(col) # Custom validation
text.lowercase()
3. Modify Imported Code (For Complex Changes)
Only if the above approaches don’t work, modify the DataCompose code directly:
- The code is in your
transformers
directory after runningdatacompose add
- You own this code - modify it to fit your exact needs
- Changes persist since it’s part of your codebase
# In transformers/pyspark/utils.py - modify existing primitives directly
@text.register()
def custom_normalize(col):
"""Your completely custom implementation"""
return your_custom_logic(col)
Best Practices for Composition
- Keep pipelines focused - Each pipeline should have a clear, single purpose
- Use descriptive names - Pipeline names should indicate what transformation they perform
- Order matters - Consider the sequence of transformations carefully
- Test pipelines - Composed pipelines should be tested as complete units
- Document complex logic - Add docstrings explaining the pipeline’s purpose and logic
- Avoid deep nesting - Break complex pipelines into smaller, reusable components
- Follow the customization hierarchy - Use parameters → lambdas → code modification (in that order)
Performance Considerations
- Compilation is one-time - The AST analysis happens only when the pipeline is defined
- No runtime overhead - Composed pipelines execute as efficiently as manual function chaining
- Lazy evaluation - Spark’s lazy evaluation means transformations are optimized before execution
- Catalyst optimizer - Spark’s Catalyst optimizer can further optimize the composed transformations
Debugging Composed Pipelines
Enable debug mode to see each transformation as it’s applied:
@text.compose()
def debug_pipeline():
text.trim()
text.lowercase()
text.validate_length()
# Debug output will show each step during execution
df.select(debug_pipeline(F.col("input")))
Related Documentation
- Getting Started - Learn the basics of DataCompose
- Transformers - Browse available transformers
- Error Handling - Learn about error handling strategies