Skip to main content
Process DataFrames efficiently using the ZEN Engine with Polars map_elements.

Installation

pip install zen-engine polars

Singleton evaluator

Use a singleton pattern with precompiled ZenDecisionContent for optimal performance:
import zen


class ZenEvaluator:
    """
    Singleton for ZenEngine with precompiled ZenDecisionContent.

    Accepts dict[str, str] (picklable), precompiles to dict[str, ZenDecisionContent],
    then engine.evaluate() uses the loader to return precompiled content.
    """
    _engine: zen.ZenEngine | None = None
    _content: dict[str, zen.ZenDecisionContent] = {}
    _raw: dict[str, str] = {}

    @classmethod
    def initialize(cls, loaders: dict[str, str]):
        """Precompile loaders dict to ZenDecisionContent."""
        if cls._raw != loaders:
            cls._raw = loaders
            cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
            cls._engine = None

    @classmethod
    def _get_engine(cls) -> zen.ZenEngine:
        if cls._engine is None:
            def loader(key: str) -> zen.ZenDecisionContent:
                return cls._content[key]
            cls._engine = zen.ZenEngine({"loader": loader})
        return cls._engine

    @classmethod
    def evaluate(cls, key: str, context: dict) -> dict:
        """Evaluate a decision by key with given context."""
        return cls._get_engine().evaluate(key, context)

Basic usage

import polars as pl
import json

# Load decisions into loaders dict
with open('./pricing.json') as f:
    loaders = {"pricing": f.read()}

ZenEvaluator.initialize(loaders)

# Sample data
df = pl.DataFrame({
    "id": [1, 2, 3],
    "data": [
        '{"customer": {"tier": "gold"}, "order": {"subtotal": 150}}',
        '{"customer": {"tier": "silver"}, "order": {"subtotal": 50}}',
        '{"customer": {"tier": "bronze"}, "order": {"subtotal": 200}}',
    ]
})

# Process with map_elements (UDF-like)
def evaluate_rules(row_json: str) -> str:
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

output_df = df.with_columns(
    pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)

print(output_df)

Cloud storage

Load all decisions from a single zip file at startup for optimal performance.

AWS S3

import boto3
import polars as pl
import zen
import zipfile
import io
import json

s3 = boto3.client("s3")

# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

ZenEvaluator.initialize(loaders)

def evaluate_rules(row_json: str) -> str:
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

output_df = df.with_columns(
    pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)

Azure Blob Storage

from azure.storage.blob import BlobServiceClient
import polars as pl
import zen
import zipfile
import io
import os
import json

blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")

# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

ZenEvaluator.initialize(loaders)

def evaluate_rules(row_json: str) -> str:
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

output_df = df.with_columns(
    pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)

Google Cloud Storage

from google.cloud import storage
import polars as pl
import zen
import zipfile
import io
import json

client = storage.Client()
bucket = client.bucket("my-rules-bucket")

# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

ZenEvaluator.initialize(loaders)

def evaluate_rules(row_json: str) -> str:
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

output_df = df.with_columns(
    pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)

Processing structured columns

Use pl.struct to combine multiple columns for evaluation:
import polars as pl
import json

df = pl.DataFrame({
    "customer_tier": ["gold", "silver", "bronze"],
    "years_active": [3, 2, 0],
    "order_subtotal": [150.0, 50.0, 200.0],
    "item_count": [5, 2, 10],
})

def evaluate_pricing(row: dict) -> str:
    input_data = {
        "customer": {"tier": row["customer_tier"], "yearsActive": row["years_active"]},
        "order": {"subtotal": row["order_subtotal"], "items": row["item_count"]}
    }
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

output_df = df.with_columns(
    pl.struct(["customer_tier", "years_active", "order_subtotal", "item_count"])
    .map_elements(evaluate_pricing, return_dtype=pl.Utf8)
    .alias("result")
)

Extracting result fields

Extract individual fields from results:
def get_discount(row: dict) -> float:
    input_data = {
        "customer": {"tier": row["customer_tier"], "yearsActive": row["years_active"]},
        "order": {"subtotal": row["order_subtotal"], "items": row["item_count"]}
    }
    result = ZenEvaluator.evaluate("pricing", input_data)
    return float(result["result"]["discount"])

def get_free_shipping(row: dict) -> bool:
    input_data = {
        "customer": {"tier": row["customer_tier"], "yearsActive": row["years_active"]},
        "order": {"subtotal": row["order_subtotal"], "items": row["item_count"]}
    }
    result = ZenEvaluator.evaluate("pricing", input_data)
    return bool(result["result"]["freeShipping"])

struct_col = pl.struct(["customer_tier", "years_active", "order_subtotal", "item_count"])

output_df = df.with_columns([
    struct_col.map_elements(get_discount, return_dtype=pl.Float64).alias("discount"),
    struct_col.map_elements(get_free_shipping, return_dtype=pl.Boolean).alias("free_shipping"),
])

Error handling

Return structured results with success/error information:
def evaluate_rules_safe(row_json: str) -> dict:
    try:
        input_data = json.loads(row_json)
        result = ZenEvaluator.evaluate("pricing", input_data)
        return {"success": True, "result": json.dumps(result["result"]), "error": None}
    except Exception as e:
        return {"success": False, "result": None, "error": str(e)}

output_df = df.with_columns(
    pl.col("data").map_elements(evaluate_rules_safe, return_dtype=pl.Struct({
        "success": pl.Boolean,
        "result": pl.Utf8,
        "error": pl.Utf8,
    })).alias("evaluation")
)

# Unnest and filter
results = output_df.unnest("evaluation")
success_df = results.filter(pl.col("success") == True)
failed_df = results.filter(pl.col("success") == False)

Lazy evaluation

Filter data before processing:
# Lazy scan with predicates
filtered_df = (
    pl.scan_parquet("input.parquet")
    .filter(pl.col("status") == "pending")
    .filter(pl.col("amount") > 100)
    .collect()
)

# Then process filtered data
output_df = filtered_df.with_columns(
    pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)

Batch processing

Process large files in batches:
df = pl.read_parquet("large_input.parquet")
batch_size = 10_000
all_batches = []

for i in range(0, df.shape[0], batch_size):
    batch_df = df.slice(i, batch_size)
    processed_batch = batch_df.with_columns(
        pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
    )
    all_batches.append(processed_batch)

output_df = pl.concat(all_batches)
output_df.write_parquet("output.parquet")

Parallel processing

For CPU-bound workloads, use multiprocessing with the same pattern:
from multiprocessing import Pool, cpu_count
import zen
import json

# Global for multiprocessing
_mp_engine: zen.ZenEngine | None = None
_mp_content: dict[str, zen.ZenDecisionContent] = {}
_mp_raw: dict[str, str] = {}


def init_worker(loaders: dict[str, str]):
    """Precompile loaders to ZenDecisionContent on worker init."""
    global _mp_engine, _mp_content, _mp_raw
    _mp_raw = loaders
    _mp_content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
    _mp_engine = None


def _get_mp_engine() -> zen.ZenEngine:
    global _mp_engine, _mp_content
    if _mp_engine is None:
        def loader(key: str) -> zen.ZenDecisionContent:
            return _mp_content[key]
        _mp_engine = zen.ZenEngine({"loader": loader})
    return _mp_engine


def evaluate_row_worker(row_json):
    if row_json is None:
        return None
    input_data = json.loads(row_json)
    result = _get_mp_engine().evaluate("pricing", input_data)
    return json.dumps(result["result"])


# Load rules into loaders dict
with open('./pricing.json') as f:
    loaders = {"pricing": f.read()}

df = pl.read_parquet("input.parquet")
rows = df["data"].to_list()

with Pool(cpu_count(), initializer=init_worker, initargs=(loaders,)) as pool:
    results = pool.map(evaluate_row_worker, rows)

output_df = df.with_columns(pl.Series("result", results))

Multiple decisions

Evaluate multiple rule sets from a single loaders dict:
# Load multiple rules
with open('./pricing.json') as f:
    pricing_content = f.read()
with open('./eligibility.json') as f:
    eligibility_content = f.read()

loaders = {
    "pricing": pricing_content,
    "eligibility": eligibility_content,
}
ZenEvaluator.initialize(loaders)

def evaluate_pricing(row_json: str) -> str:
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

def evaluate_eligibility(row_json: str) -> str:
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("eligibility", input_data)
    return json.dumps(result["result"])

output_df = df.with_columns([
    pl.col("data").map_elements(evaluate_pricing, return_dtype=pl.Utf8).alias("pricing"),
    pl.col("data").map_elements(evaluate_eligibility, return_dtype=pl.Utf8).alias("eligibility"),
])

Best practices

Precompile on initialize. The ZenEvaluator converts dict[str, str] to dict[str, ZenDecisionContent] once. The loader returns precompiled content for maximum performance. Use engine.evaluate directly. No need to call create_decision - the engine’s loader handles everything. Use loaders dict. Store rules as dict[str, str] (picklable for multiprocessing), precompile once on init. Use map_elements for UDF-like behavior. This is Polars’ equivalent of Spark UDFs. Use pl.struct for multiple columns. Combine columns into a struct before applying map_elements. Use lazy evaluation for filtering. Apply predicates with scan_parquet before collecting to reduce data processed. Use multiprocessing for large datasets. The GIL limits threading benefits; use process-based parallelism instead.
The ZenEvaluator precompiles JSON strings to ZenDecisionContent on first initialization. The engine’s loader returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.