Core Concepts
What Makes a Primitive?
A primitive is a PySpark column transformation function wrapped in DataCompose’s SmartPrimitive
class, which enables:
- Direct invocation - Apply transformations immediately to columns
- Partial application - Pre-configure transformations with parameters for reuse
- Pipeline composition - Chain multiple primitives into complex workflows
The SmartPrimitive Pattern
# Direct usage - apply transformation immediately
result = primitive(col)
# Partial application - create a configured variant
configured_primitive = primitive(param=value)
result = configured_primitive(col)
Types of Primitives
DataCompose primitives follow consistent naming conventions based on their purpose:
Extract Primitives (
extract_*
) - Pull specific components from dataextract_domain
- Gets domain from email addressextract_area_code
- Gets area code from phone numberextract_street_name
- Gets street name from address
Validation Primitives (
is_valid_*
,is_*
,has_*
) - Check data against rulesis_valid_email
- Ensures email format is correctis_valid_zip_code
- Checks ZIP code formatis_valid_nanp
- Validates North American phone numbers
Get Primitives (
get_*
) - Retrieve metadata or classificationsget_email_provider
- Identifies email service providerget_phone_type
- Classifies phone number type (mobile, landline)get_state_name
- Gets full state name from abbreviation
Transform Primitives (
format_*
,normalize_*
,clean_*
) - Modify data format or structureformat_e164
- Converts phone to international formatnormalize_gmail
- Standardizes Gmail address variationsclean_phone
- Removes formatting from phone numbers
Filter Primitives (
filter_*
) - Return data only if it meets criteria, otherwise nullfilter_valid_emails
- Returns email or null if invalidfilter_corporate_emails
- Returns email or null if not corporatefilter_non_disposable_emails
- Returns email or null if disposable
This categorization helps you quickly understand what each primitive does and find the right one for your use case.
Architecture
PrimitiveRegistry
The PrimitiveRegistry
class is the core organizational structure for primitives. It serves as a namespace container that groups related transformations and makes them accessible through a clean, intuitive API.
Key Features
Namespace Organization - Groups related primitives under a common namespace
Automatic SmartPrimitive Wrapping - Functions registered with @register()
are automatically wrapped as SmartPrimitives
Attribute Access - Registered primitives become attributes of the registry instance
Composition Support - Includes the compose()
method for building pipelines
Conditional Support - Can register conditional primitives with is_conditional=True
How It Works
When you register a function with a PrimitiveRegistry:
text = PrimitiveRegistry("text")
@text.register()
def lowercase(col):
return F.lower(col)
The registry performs the following steps:
- Wraps the function in a
SmartPrimitive
instance - Stores it in the internal
_primitives
dictionary - Makes it accessible as
text.lowercase
- Enables both direct usage and partial application
Advanced: Internal Structure
class PrimitiveRegistry:
def __init__(self, namespace_name: str):
self.namespace_name = namespace_name
self._primitives = {} # Regular transformations
self._conditionals = {} # Conditional primitives
The registry maintains separate dictionaries for regular and conditional primitives, providing appropriate access through the __getattr__
method.
Working with Primitives
Once primitives are registered in a namespace, they can be used in multiple ways:
# Direct usage on columns
df.select(text.lowercase(F.col("name")))
# With partial application for configuration
trim_tabs = text.trim(chars='\t')
df.select(trim_tabs(F.col("description")))
# Chaining primitives manually
col = F.col("input")
col = text.trim(col)
col = text.lowercase(col)
df.select(col)
# Or using composition (see Composition Tutorial)
@text.compose()
def clean_text():
text.trim()
text.lowercase()
For more details on building pipelines with primitives, see the Composition Tutorial.
Design Principles
1. Single Responsibility
Each primitive performs exactly one transformation. This ensures:
- Clear, predictable behavior
- Easy testing and debugging
- Maximum reusability
2. Pure Functions
Primitives are pure functions that:
- Never modify input data
- Always return new Column objects
- Have no side effects
- Produce deterministic results
3. Composability
Primitives are designed to work together:
- Uniform input/output interface (PySpark Columns)
- Chainable through manual application or the compose decorator
- Support for conditional logic and branching
4. Zero Runtime Dependencies
Once created, primitive code:
- Runs on pure PySpark without DataCompose
- Can be copied and modified freely
- Has no external dependencies
Implementation Patterns
Basic Transformation Primitive
@namespace.register()
def standardize_text(col):
"""Standardizes text to lowercase with trimmed whitespace"""
return F.trim(F.lower(col))
Parameterized Primitive
@namespace.register()
def truncate(col, max_length=100):
"""Truncates text to specified maximum length"""
return F.substring(col, 1, max_length)
Conditional Primitive
@namespace.register(is_conditional=True)
def is_valid_length(col, min_length=1, max_length=255):
"""Checks if text length is within specified bounds"""
return (F.length(col) >= min_length) & (F.length(col) <= max_length)
Testing Primitives
Every primitive should have comprehensive unit tests:
def test_mask_ssn():
# Create test data
df = spark.createDataFrame(
[("123-45-6789",), ("987-65-4321",)],
["ssn"]
)
# Apply primitive
result = df.select(customer.mask_ssn(F.col("ssn")))
# Verify results
expected = [("XXX-XX-6789",), ("XXX-XX-4321",)]
assert result.collect() == expected
Creating Custom Primitives
While DataCompose includes extensive pre-built primitive libraries, you can easily create custom primitives for your specific domain needs.
Note: After running
datacompose add
, the code will be placed in atransformers
directory in your project, making primitives available viafrom transformers.pyspark.utils import PrimitiveRegistry
.
from transformers.pyspark.utils import PrimitiveRegistry
from pyspark.sql import functions as F
# Create a namespace for your domain
customer = PrimitiveRegistry("customer")
# Register custom primitives
@customer.register()
def mask_ssn(col):
"""Masks SSN keeping only last 4 digits"""
return F.concat(F.lit("XXX-XX-"), F.substring(col, -4, 4))
@customer.register()
def standardize_company_name(col):
"""Standardizes company names for matching"""
return F.upper(F.regexp_replace(col, r's+(INC|LLC|LTD|CORP).?$', ''))
@customer.register()
def extract_account_type(col, default_type='STANDARD'):
"""Extracts account type from account number prefix"""
return F.when(F.substring(col, 1, 2) == 'PR', F.lit('PREMIUM'))
.when(F.substring(col, 1, 2) == 'EN', F.lit('ENTERPRISE'))
.otherwise(F.lit(default_type))
# Use your custom primitives
df.select(customer.mask_ssn(F.col("ssn")))
df.select(customer.extract_account_type(F.col("account_id"), default_type='BASIC'))
# Combine with composition for complex pipelines
# See the Composition Tutorial for details on @compose
Best Practices
- Keep primitives focused - One transformation per primitive
- Use descriptive names - Function name should clearly indicate the transformation
- Document parameters - Include docstrings with parameter descriptions
- Leverage partial application - Create reusable configured variants
- Compose complex logic - Combine primitives using the composition system (see Composition Tutorial)
- Test thoroughly - Each primitive should have comprehensive unit tests
Pre-built Primitive Libraries
DataCompose includes extensive pre-built primitive libraries for common data types:
- Email Primitives - Validation, normalization, domain extraction, typo correction
- Phone Primitives - NANP/international validation, formatting, carrier detection
- Address Primitives - Parsing, standardization, geocoding preparation
Each library follows the same primitive patterns described above, ensuring consistency across all transformations. All primitives are accessible after importing the relevant namespace and can be composed together for complex data cleaning pipelines.