Installation
Copy
pip install zen-engine pyspark
Singleton evaluator
Use a singleton pattern with precompiledZenDecisionContent for optimal performance:
Copy
import zen
class ZenEvaluator:
"""
Singleton for ZenEngine with precompiled ZenDecisionContent.
Broadcast dict[str, str] (picklable), precompile to dict[str, ZenDecisionContent],
then engine.evaluate() uses the loader to return precompiled content.
"""
_engine: zen.ZenEngine | None = None
_content: dict[str, zen.ZenDecisionContent] = {}
_raw: dict[str, str] = {}
@classmethod
def initialize(cls, loaders: dict[str, str]):
"""Precompile loaders dict to ZenDecisionContent."""
if cls._raw != loaders:
cls._raw = loaders
cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
cls._engine = None
@classmethod
def _get_engine(cls) -> zen.ZenEngine:
if cls._engine is None:
def loader(key: str) -> zen.ZenDecisionContent:
return cls._content[key]
cls._engine = zen.ZenEngine({"loader": loader})
return cls._engine
@classmethod
def evaluate(cls, key: str, context: dict) -> dict:
"""Evaluate a decision by key with given context."""
return cls._get_engine().evaluate(key, context)
Basic usage
Copy
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import json
spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
# Load decisions into loaders dict and broadcast
with open('./pricing.json') as f:
loaders = {"pricing": f.read()}
loaders_broadcast = spark.sparkContext.broadcast(loaders)
# Define UDF with cached ZenDecision
@udf(returnType=StringType())
def evaluate_rules(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
# Process data
df = spark.read.parquet("s3://bucket/input.parquet")
result_df = df.withColumn("result", evaluate_rules(col("data")))
result_df.write.mode("overwrite").parquet("s3://bucket/output.parquet")
Cloud storage
Load all decisions from a single zip file at startup for optimal performance.AWS S3
Copy
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import json
spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
s3 = boto3.client("s3")
# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
loaders[name] = zf.read(name).decode("utf-8")
loaders_broadcast = spark.sparkContext.broadcast(loaders)
@udf(returnType=StringType())
def evaluate_rules(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing.json", input_data)
return json.dumps(result["result"])
result_df = df.withColumn("result", evaluate_rules(col("data")))
Azure Blob Storage
Copy
from azure.storage.blob import BlobServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import os
import json
spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")
# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
loaders[name] = zf.read(name).decode("utf-8")
loaders_broadcast = spark.sparkContext.broadcast(loaders)
@udf(returnType=StringType())
def evaluate_rules(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing.json", input_data)
return json.dumps(result["result"])
result_df = df.withColumn("result", evaluate_rules(col("data")))
Google Cloud Storage
Copy
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import json
spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
client = storage.Client()
bucket = client.bucket("my-rules-bucket")
# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
loaders[name] = zf.read(name).decode("utf-8")
loaders_broadcast = spark.sparkContext.broadcast(loaders)
@udf(returnType=StringType())
def evaluate_rules(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing.json", input_data)
return json.dumps(result["result"])
result_df = df.withColumn("result", evaluate_rules(col("data")))
Processing structured columns
Process data from separate columns:Copy
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, DoubleType, BooleanType
bc = loaders_broadcast
@udf(returnType=StringType())
def evaluate_pricing(customer_tier, years_active, order_subtotal, item_count):
import json
ZenEvaluator.initialize(bc.value)
result = ZenEvaluator.evaluate("pricing", {
"customer": {
"tier": customer_tier,
"yearsActive": int(years_active) if years_active else 0
},
"order": {
"subtotal": float(order_subtotal) if order_subtotal else 0,
"items": int(item_count) if item_count else 0
}
})
return json.dumps(result["result"])
result_df = df.withColumn(
"pricing",
evaluate_pricing(
col("customer_tier"),
col("years_active"),
col("order_subtotal"),
col("item_count")
)
)
Extracting result fields
Return specific result fields directly:Copy
@udf(returnType=DoubleType())
def get_discount(customer_tier, years_active, order_subtotal, item_count):
ZenEvaluator.initialize(bc.value)
result = ZenEvaluator.evaluate("pricing", {
"customer": {"tier": customer_tier, "yearsActive": int(years_active or 0)},
"order": {"subtotal": float(order_subtotal or 0), "items": int(item_count or 0)}
})
return float(result["result"]["discount"])
@udf(returnType=BooleanType())
def get_free_shipping(customer_tier, years_active, order_subtotal, item_count):
ZenEvaluator.initialize(bc.value)
result = ZenEvaluator.evaluate("pricing", {
"customer": {"tier": customer_tier, "yearsActive": int(years_active or 0)},
"order": {"subtotal": float(order_subtotal or 0), "items": int(item_count or 0)}
})
return bool(result["result"]["freeShipping"])
result_df = df.withColumn(
"discount",
get_discount(col("customer_tier"), col("years_active"), col("order_subtotal"), col("item_count"))
).withColumn(
"free_shipping",
get_free_shipping(col("customer_tier"), col("years_active"), col("order_subtotal"), col("item_count"))
)
Error handling
Return structured results with success/error information:Copy
from pyspark.sql.types import StructType, StructField, BooleanType, StringType
result_schema = StructType([
StructField("success", BooleanType(), False),
StructField("result", StringType(), True),
StructField("error", StringType(), True)
])
@udf(returnType=result_schema)
def evaluate_rules_safe(row_json):
import json
try:
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return (True, json.dumps(result["result"]), None)
except Exception as e:
return (False, None, str(e))
result_df = df.withColumn("evaluation", evaluate_rules_safe(col("data")))
# Filter successful and failed evaluations
success_df = result_df.filter(col("evaluation.success") == True)
failed_df = result_df.filter(col("evaluation.success") == False)
Multiple decisions
Evaluate multiple rule sets from a single loaders dict:Copy
# Load multiple rules into loaders dict
with open('./pricing.json') as f:
pricing_content = f.read()
with open('./eligibility.json') as f:
eligibility_content = f.read()
loaders = {
"pricing": pricing_content,
"eligibility": eligibility_content,
}
loaders_broadcast = spark.sparkContext.broadcast(loaders)
@udf(returnType=StringType())
def evaluate_pricing(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
@udf(returnType=StringType())
def evaluate_eligibility(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("eligibility", input_data)
return json.dumps(result["result"])
result_df = (
df
.withColumn("pricing", evaluate_pricing(col("data")))
.withColumn("eligibility", evaluate_eligibility(col("data")))
)
Performance tuning
Repartition for parallelism
Copy
# Repartition for better parallelism (2-4x number of cores)
df = df.repartition(200)
result_df = df.withColumn("result", evaluate_rules(col("data")))
# Coalesce to reduce partitions for output
result_df.coalesce(10).write.parquet("output.parquet")
Helper function for UDF creation
Copy
def make_evaluate_udf(loaders_broadcast):
"""Create a UDF that uses broadcast loaders with cached ZenDecision."""
@udf(returnType=StringType())
def evaluate_rules(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
return evaluate_rules
# Usage
evaluate_rules = make_evaluate_udf(loaders_broadcast)
result_df = df.withColumn("result", evaluate_rules(col("data")))
Best practices
Precompile on initialize. TheZenEvaluator converts dict[str, str] to dict[str, ZenDecisionContent] once, then the loader returns precompiled content for maximum performance.
Broadcast loaders dict. Broadcast dict[str, str] (picklable) to all workers. Each worker precompiles once on first use.
Use engine.evaluate directly. No need to call create_decision - the engine’s loader handles everything.
Repartition for parallelism. Match partition count to cluster parallelism (typically 2-4x cores).
Use Parquet for I/O. Parquet provides efficient columnar storage and predicate pushdown.
The
ZenEvaluator precompiles JSON strings to ZenDecisionContent on first initialization per worker. The engine’s loader then returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.