Skip to main content
Process large datasets with distributed rule evaluation using PySpark UDFs and broadcast variables.

Installation

pip install zen-engine pyspark

Singleton evaluator

Use a singleton pattern with precompiled ZenDecisionContent for optimal performance:
import zen


class ZenEvaluator:
    """
    Singleton for ZenEngine with precompiled ZenDecisionContent.

    Broadcast dict[str, str] (picklable), precompile to dict[str, ZenDecisionContent],
    then engine.evaluate() uses the loader to return precompiled content.
    """
    _engine: zen.ZenEngine | None = None
    _content: dict[str, zen.ZenDecisionContent] = {}
    _raw: dict[str, str] = {}

    @classmethod
    def initialize(cls, loaders: dict[str, str]):
        """Precompile loaders dict to ZenDecisionContent."""
        if cls._raw != loaders:
            cls._raw = loaders
            cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
            cls._engine = None

    @classmethod
    def _get_engine(cls) -> zen.ZenEngine:
        if cls._engine is None:
            def loader(key: str) -> zen.ZenDecisionContent:
                return cls._content[key]
            cls._engine = zen.ZenEngine({"loader": loader})
        return cls._engine

    @classmethod
    def evaluate(cls, key: str, context: dict) -> dict:
        """Evaluate a decision by key with given context."""
        return cls._get_engine().evaluate(key, context)

Basic usage

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()

# Load decisions into loaders dict and broadcast
with open('./pricing.json') as f:
    loaders = {"pricing": f.read()}

loaders_broadcast = spark.sparkContext.broadcast(loaders)

# Define UDF with cached ZenDecision
@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

# Process data
df = spark.read.parquet("s3://bucket/input.parquet")
result_df = df.withColumn("result", evaluate_rules(col("data")))
result_df.write.mode("overwrite").parquet("s3://bucket/output.parquet")

Cloud storage

Load all decisions from a single zip file at startup for optimal performance.

AWS S3

import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
s3 = boto3.client("s3")

# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

result_df = df.withColumn("result", evaluate_rules(col("data")))

Azure Blob Storage

from azure.storage.blob import BlobServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import os
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")

# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

result_df = df.withColumn("result", evaluate_rules(col("data")))

Google Cloud Storage

from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
client = storage.Client()
bucket = client.bucket("my-rules-bucket")

# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

result_df = df.withColumn("result", evaluate_rules(col("data")))

Processing structured columns

Process data from separate columns:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, DoubleType, BooleanType

bc = loaders_broadcast

@udf(returnType=StringType())
def evaluate_pricing(customer_tier, years_active, order_subtotal, item_count):
    import json
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {
            "tier": customer_tier,
            "yearsActive": int(years_active) if years_active else 0
        },
        "order": {
            "subtotal": float(order_subtotal) if order_subtotal else 0,
            "items": int(item_count) if item_count else 0
        }
    })
    return json.dumps(result["result"])

result_df = df.withColumn(
    "pricing",
    evaluate_pricing(
        col("customer_tier"),
        col("years_active"),
        col("order_subtotal"),
        col("item_count")
    )
)

Extracting result fields

Return specific result fields directly:
@udf(returnType=DoubleType())
def get_discount(customer_tier, years_active, order_subtotal, item_count):
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {"tier": customer_tier, "yearsActive": int(years_active or 0)},
        "order": {"subtotal": float(order_subtotal or 0), "items": int(item_count or 0)}
    })
    return float(result["result"]["discount"])

@udf(returnType=BooleanType())
def get_free_shipping(customer_tier, years_active, order_subtotal, item_count):
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {"tier": customer_tier, "yearsActive": int(years_active or 0)},
        "order": {"subtotal": float(order_subtotal or 0), "items": int(item_count or 0)}
    })
    return bool(result["result"]["freeShipping"])

result_df = df.withColumn(
    "discount",
    get_discount(col("customer_tier"), col("years_active"), col("order_subtotal"), col("item_count"))
).withColumn(
    "free_shipping",
    get_free_shipping(col("customer_tier"), col("years_active"), col("order_subtotal"), col("item_count"))
)

Error handling

Return structured results with success/error information:
from pyspark.sql.types import StructType, StructField, BooleanType, StringType

result_schema = StructType([
    StructField("success", BooleanType(), False),
    StructField("result", StringType(), True),
    StructField("error", StringType(), True)
])

@udf(returnType=result_schema)
def evaluate_rules_safe(row_json):
    import json
    try:
        ZenEvaluator.initialize(loaders_broadcast.value)
        input_data = json.loads(row_json)
        result = ZenEvaluator.evaluate("pricing", input_data)
        return (True, json.dumps(result["result"]), None)
    except Exception as e:
        return (False, None, str(e))

result_df = df.withColumn("evaluation", evaluate_rules_safe(col("data")))

# Filter successful and failed evaluations
success_df = result_df.filter(col("evaluation.success") == True)
failed_df = result_df.filter(col("evaluation.success") == False)

Multiple decisions

Evaluate multiple rule sets from a single loaders dict:
# Load multiple rules into loaders dict
with open('./pricing.json') as f:
    pricing_content = f.read()
with open('./eligibility.json') as f:
    eligibility_content = f.read()

loaders = {
    "pricing": pricing_content,
    "eligibility": eligibility_content,
}
loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_pricing(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

@udf(returnType=StringType())
def evaluate_eligibility(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("eligibility", input_data)
    return json.dumps(result["result"])

result_df = (
    df
    .withColumn("pricing", evaluate_pricing(col("data")))
    .withColumn("eligibility", evaluate_eligibility(col("data")))
)

Performance tuning

Repartition for parallelism

# Repartition for better parallelism (2-4x number of cores)
df = df.repartition(200)

result_df = df.withColumn("result", evaluate_rules(col("data")))

# Coalesce to reduce partitions for output
result_df.coalesce(10).write.parquet("output.parquet")

Helper function for UDF creation

def make_evaluate_udf(loaders_broadcast):
    """Create a UDF that uses broadcast loaders with cached ZenDecision."""
    @udf(returnType=StringType())
    def evaluate_rules(row_json):
        import json
        ZenEvaluator.initialize(loaders_broadcast.value)
        input_data = json.loads(row_json)
        result = ZenEvaluator.evaluate("pricing", input_data)
        return json.dumps(result["result"])
    return evaluate_rules

# Usage
evaluate_rules = make_evaluate_udf(loaders_broadcast)
result_df = df.withColumn("result", evaluate_rules(col("data")))

Best practices

Precompile on initialize. The ZenEvaluator converts dict[str, str] to dict[str, ZenDecisionContent] once, then the loader returns precompiled content for maximum performance. Broadcast loaders dict. Broadcast dict[str, str] (picklable) to all workers. Each worker precompiles once on first use. Use engine.evaluate directly. No need to call create_decision - the engine’s loader handles everything. Repartition for parallelism. Match partition count to cluster parallelism (typically 2-4x cores). Use Parquet for I/O. Parquet provides efficient columnar storage and predicate pushdown.
The ZenEvaluator precompiles JSON strings to ZenDecisionContent on first initialization per worker. The engine’s loader then returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.