Process DataFrames efficiently using the ZEN Engine with Polars map_elements.
Installation
pip install zen-engine polars
Singleton evaluator
Use a singleton pattern with precompiled ZenDecisionContent for optimal performance:
import zen
class ZenEvaluator:
"""
Singleton for ZenEngine with precompiled ZenDecisionContent.
Accepts dict[str, str] (picklable), precompiles to dict[str, ZenDecisionContent],
then engine.evaluate() uses the loader to return precompiled content.
"""
_engine: zen.ZenEngine | None = None
_content: dict[str, zen.ZenDecisionContent] = {}
_raw: dict[str, str] = {}
@classmethod
def initialize(cls, loaders: dict[str, str]):
"""Precompile loaders dict to ZenDecisionContent."""
if cls._raw != loaders:
cls._raw = loaders
cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
cls._engine = None
@classmethod
def _get_engine(cls) -> zen.ZenEngine:
if cls._engine is None:
def loader(key: str) -> zen.ZenDecisionContent:
return cls._content[key]
cls._engine = zen.ZenEngine({"loader": loader})
return cls._engine
@classmethod
def evaluate(cls, key: str, context: dict) -> dict:
"""Evaluate a decision by key with given context."""
return cls._get_engine().evaluate(key, context)
Basic usage
import polars as pl
import json
# Load decisions into loaders dict
with open('./pricing.json') as f:
loaders = {"pricing": f.read()}
ZenEvaluator.initialize(loaders)
# Sample data
df = pl.DataFrame({
"id": [1, 2, 3],
"data": [
'{"customer": {"tier": "gold"}, "order": {"subtotal": 150}}',
'{"customer": {"tier": "silver"}, "order": {"subtotal": 50}}',
'{"customer": {"tier": "bronze"}, "order": {"subtotal": 200}}',
]
})
# Process with map_elements (UDF-like)
def evaluate_rules(row_json: str) -> str:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
output_df = df.with_columns(
pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)
print(output_df)
Cloud storage
Load all decisions from a single zip file at startup for optimal performance.
AWS S3
import boto3
import polars as pl
import zen
import zipfile
import io
import json
s3 = boto3.client("s3")
# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
loaders[name] = zf.read(name).decode("utf-8")
ZenEvaluator.initialize(loaders)
def evaluate_rules(row_json: str) -> str:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing.json", input_data)
return json.dumps(result["result"])
output_df = df.with_columns(
pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)
Azure Blob Storage
from azure.storage.blob import BlobServiceClient
import polars as pl
import zen
import zipfile
import io
import os
import json
blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")
# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
loaders[name] = zf.read(name).decode("utf-8")
ZenEvaluator.initialize(loaders)
def evaluate_rules(row_json: str) -> str:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing.json", input_data)
return json.dumps(result["result"])
output_df = df.with_columns(
pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)
Google Cloud Storage
from google.cloud import storage
import polars as pl
import zen
import zipfile
import io
import json
client = storage.Client()
bucket = client.bucket("my-rules-bucket")
# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
loaders[name] = zf.read(name).decode("utf-8")
ZenEvaluator.initialize(loaders)
def evaluate_rules(row_json: str) -> str:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing.json", input_data)
return json.dumps(result["result"])
output_df = df.with_columns(
pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)
Processing structured columns
Use pl.struct to combine multiple columns for evaluation:
import polars as pl
import json
df = pl.DataFrame({
"customer_tier": ["gold", "silver", "bronze"],
"years_active": [3, 2, 0],
"order_subtotal": [150.0, 50.0, 200.0],
"item_count": [5, 2, 10],
})
def evaluate_pricing(row: dict) -> str:
input_data = {
"customer": {"tier": row["customer_tier"], "yearsActive": row["years_active"]},
"order": {"subtotal": row["order_subtotal"], "items": row["item_count"]}
}
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
output_df = df.with_columns(
pl.struct(["customer_tier", "years_active", "order_subtotal", "item_count"])
.map_elements(evaluate_pricing, return_dtype=pl.Utf8)
.alias("result")
)
Extract individual fields from results:
def get_discount(row: dict) -> float:
input_data = {
"customer": {"tier": row["customer_tier"], "yearsActive": row["years_active"]},
"order": {"subtotal": row["order_subtotal"], "items": row["item_count"]}
}
result = ZenEvaluator.evaluate("pricing", input_data)
return float(result["result"]["discount"])
def get_free_shipping(row: dict) -> bool:
input_data = {
"customer": {"tier": row["customer_tier"], "yearsActive": row["years_active"]},
"order": {"subtotal": row["order_subtotal"], "items": row["item_count"]}
}
result = ZenEvaluator.evaluate("pricing", input_data)
return bool(result["result"]["freeShipping"])
struct_col = pl.struct(["customer_tier", "years_active", "order_subtotal", "item_count"])
output_df = df.with_columns([
struct_col.map_elements(get_discount, return_dtype=pl.Float64).alias("discount"),
struct_col.map_elements(get_free_shipping, return_dtype=pl.Boolean).alias("free_shipping"),
])
Error handling
Return structured results with success/error information:
def evaluate_rules_safe(row_json: str) -> dict:
try:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return {"success": True, "result": json.dumps(result["result"]), "error": None}
except Exception as e:
return {"success": False, "result": None, "error": str(e)}
output_df = df.with_columns(
pl.col("data").map_elements(evaluate_rules_safe, return_dtype=pl.Struct({
"success": pl.Boolean,
"result": pl.Utf8,
"error": pl.Utf8,
})).alias("evaluation")
)
# Unnest and filter
results = output_df.unnest("evaluation")
success_df = results.filter(pl.col("success") == True)
failed_df = results.filter(pl.col("success") == False)
Lazy evaluation
Filter data before processing:
# Lazy scan with predicates
filtered_df = (
pl.scan_parquet("input.parquet")
.filter(pl.col("status") == "pending")
.filter(pl.col("amount") > 100)
.collect()
)
# Then process filtered data
output_df = filtered_df.with_columns(
pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)
Batch processing
Process large files in batches:
df = pl.read_parquet("large_input.parquet")
batch_size = 10_000
all_batches = []
for i in range(0, df.shape[0], batch_size):
batch_df = df.slice(i, batch_size)
processed_batch = batch_df.with_columns(
pl.col("data").map_elements(evaluate_rules, return_dtype=pl.Utf8).alias("result")
)
all_batches.append(processed_batch)
output_df = pl.concat(all_batches)
output_df.write_parquet("output.parquet")
Parallel processing
For CPU-bound workloads, use multiprocessing with the same pattern:
from multiprocessing import Pool, cpu_count
import zen
import json
# Global for multiprocessing
_mp_engine: zen.ZenEngine | None = None
_mp_content: dict[str, zen.ZenDecisionContent] = {}
_mp_raw: dict[str, str] = {}
def init_worker(loaders: dict[str, str]):
"""Precompile loaders to ZenDecisionContent on worker init."""
global _mp_engine, _mp_content, _mp_raw
_mp_raw = loaders
_mp_content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
_mp_engine = None
def _get_mp_engine() -> zen.ZenEngine:
global _mp_engine, _mp_content
if _mp_engine is None:
def loader(key: str) -> zen.ZenDecisionContent:
return _mp_content[key]
_mp_engine = zen.ZenEngine({"loader": loader})
return _mp_engine
def evaluate_row_worker(row_json):
if row_json is None:
return None
input_data = json.loads(row_json)
result = _get_mp_engine().evaluate("pricing", input_data)
return json.dumps(result["result"])
# Load rules into loaders dict
with open('./pricing.json') as f:
loaders = {"pricing": f.read()}
df = pl.read_parquet("input.parquet")
rows = df["data"].to_list()
with Pool(cpu_count(), initializer=init_worker, initargs=(loaders,)) as pool:
results = pool.map(evaluate_row_worker, rows)
output_df = df.with_columns(pl.Series("result", results))
Multiple decisions
Evaluate multiple rule sets from a single loaders dict:
# Load multiple rules
with open('./pricing.json') as f:
pricing_content = f.read()
with open('./eligibility.json') as f:
eligibility_content = f.read()
loaders = {
"pricing": pricing_content,
"eligibility": eligibility_content,
}
ZenEvaluator.initialize(loaders)
def evaluate_pricing(row_json: str) -> str:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
def evaluate_eligibility(row_json: str) -> str:
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("eligibility", input_data)
return json.dumps(result["result"])
output_df = df.with_columns([
pl.col("data").map_elements(evaluate_pricing, return_dtype=pl.Utf8).alias("pricing"),
pl.col("data").map_elements(evaluate_eligibility, return_dtype=pl.Utf8).alias("eligibility"),
])
Best practices
Precompile on initialize. The ZenEvaluator converts dict[str, str] to dict[str, ZenDecisionContent] once. The loader returns precompiled content for maximum performance.
Use engine.evaluate directly. No need to call create_decision - the engine’s loader handles everything.
Use loaders dict. Store rules as dict[str, str] (picklable for multiprocessing), precompile once on init.
Use map_elements for UDF-like behavior. This is Polars’ equivalent of Spark UDFs.
Use pl.struct for multiple columns. Combine columns into a struct before applying map_elements.
Use lazy evaluation for filtering. Apply predicates with scan_parquet before collecting to reduce data processed.
Use multiprocessing for large datasets. The GIL limits threading benefits; use process-based parallelism instead.
The ZenEvaluator precompiles JSON strings to ZenDecisionContent on first initialization. The engine’s loader returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.