DataCompose Smart Primitive Design: Technical Architecture

DataCompose uses the shadcn copy-to-own model to solve a fundamental problem: data cleaning is business logic, and business logic belongs in your codebase.

The architecture splits into two zones: mutable components that live in your repository and can be edited, and a fixed IO layer that provides a stable interface boundary. When you run datacompose add addresses, the CLI copies primitives into your project structure. From that point forward, you own the transformation logic completely.

The CLI’s job is simple: generate code into your project and let you choose which primitives to include. Once generated, it steps aside.

Mutable Primitives

The primitives are mutable in nature. Since they live inside your codebase, they can be changed by you, the user.

Primitives: Pure Functions with Smart Wrappers

When you run datacompose add addresses, the CLI generates two types of code into your project:

  1. Primitive functions - The actual transformation logic (mutable)
  2. Registry infrastructure - The SmartPrimitive and PrimitiveRegistry classes (fixed)

The primitive functions are yours to modify. Here’s what one looks like after generation:

# transformers/pyspark/addresses/primitives.py

addresses = PrimitiveRegistry("addresses")

@addresses.register()
def extract_street_number(col):
    """Extract street number from address string."""
    pattern = r'^(d+)s+'
    return F.regexp_extract(col, pattern, 1)

@addresses.register()
def standardize_state(col, mapping=None):
    """Standardize state abbreviations."""
    default_mapping = {'California': 'CA', 'New York': 'NY'}
    mapping = mapping or default_mapping
    # ... transformation logic

These functions live in your repository. If your business logic requires different street number extraction—maybe you handle ranges like “123-125”—just edit the function. No forking, no pull requests, no waiting.

The registry infrastructure, however, shouldn’t be modified. The SmartPrimitive class and PrimitiveRegistry are stable components that enable a key pattern: partial application.

Flexible Usage Patterns

SmartPrimitive wraps your functions to support two usage modes:

from transformers.pyspark.addresses import addresses

# Direct usage - apply transformation immediately
df = df.withColumn("street_num",
    addresses.extract_street_number(F.col("address")))

# Pre-configured usage - set parameters, use later
custom_state_map = {'TX': 'Texas', 'CA': 'California'}
expand_states = addresses.standardize_state(mapping=custom_state_map)

df = df.withColumn("state_full", expand_states(F.col("state")))

This flexibility matters because it’s what makes composition work. The compose decorator (which we’ll cover next) expects functions that can be called with a single column argument. Pre-configured primitives provide exactly that interface.

The mutability boundary is intentional: you control the transformation logic, but the wrapper infrastructure remains stable. This ensures that primitives can reliably compose together while giving you complete freedom over the actual data cleaning rules.

Composition: Pipelines Through AST Parsing

Individual primitives handle single transformations. Real data cleaning requires chaining them together. The composition layer provides a declarative syntax for building pipelines:

from transformers.pyspark.addresses import addresses

@addresses.compose()
def clean_address():
    addresses.normalize_whitespace()
    addresses.extract_street_number()
    addresses.standardize_abbreviations()

This looks like a normal function body, but it’s not executed as Python code. The compose decorator parses the function’s AST at runtime, extracts the transformation calls, and builds an execution pipeline.

How AST Parsing Works

When you define a composed pipeline, the PipelineCompiler class does the following:

  1. Extract source code - Uses inspect.getsource() to get the function definition as a string
  2. Parse AST - Converts the source into an Abstract Syntax Tree with ast.parse()
  3. Walk the tree - Identifies function calls like addresses.normalize_whitespace()
  4. Resolve references - Looks up the actual primitive functions from registered namespaces
  5. Compile steps - Creates CompiledStep objects with the callable primitives
  6. Build pipeline - Returns a StablePipeline that executes steps in sequence

Here’s a simplified view of what happens:

# Your code
@addresses.compose()
def clean_phone():
    addresses.normalize_whitespace()
    addresses.extract_digits()

# What the compiler sees in the AST
FunctionDef(name='clean_phone', body=[
    Expr(Call(func=Attribute(value=Name('addresses'), attr='normalize_whitespace'))),
    Expr(Call(func=Attribute(value=Name('addresses'), attr='extract_digits')))
])

# What gets compiled
StablePipeline(steps=[
    CompiledStep(step_type='transform', action=<normalize_whitespace callable>),
    CompiledStep(step_type='transform', action=<extract_digits callable>)
])

The compiled pipeline is a callable that takes a PySpark Column and applies each transformation in sequence. Runtime execution is straightforward:

def _execute_steps(steps, col):
    result = col
    for step in steps:
        result = step.action(result)
    return result

Why AST Instead of Direct Chaining?

You might wonder why we parse AST instead of just chaining functions normally:

# Why not just do this?
def clean_phone(col):
    return addresses.extract_digits(
        addresses.normalize_whitespace(col)
    )

Three reasons:

1. Readability

The composed version reads top-to-bottom, matching execution order. Nested calls read inside-out.

2. Conditional Logic

AST parsing enables if/else branching in pipelines:

@addresses.compose()
def clean_address():
    addresses.normalize_whitespace()
    if addresses.is_po_box():
        addresses.extract_po_box()
    else:
        addresses.extract_street_number()

The compiler recognizes ast.If nodes and creates conditional steps that use PySpark’s F.when().otherwise() under the hood.

3. Future Portability

The AST representation is backend-agnostic. Right now it generates PySpark execution plans, but the same pipeline definition could compile to Pandas, Polars, or SQL. The transformation logic (primitives) would need backend-specific implementations, but the composition logic wouldn’t change.

The Mutability Question

The composition layer is mutable—it lives in your codebase and you can modify it. But why would you?

The most common reason is custom validation. The default compiler validates that all function calls resolve to registered primitives. If your domain requires additional checks—maybe ensuring certain transformations always happen together—you can extend the PipelineCompiler class.

You might also:

  • Add logging between steps
  • Measure execution time per transformation
  • Implement domain-specific optimizations

Because the compilation logic is in your repository, these customizations are straightforward.

The compose decorator and compiler infrastructure are designed to be modified. They’re not sealed abstractions—they’re starting points that you can adapt as your pipeline complexity grows.