Skip to main content
Install the ZEN Engine and evaluate your first decision in Python.

Installation

pip install zen-engine

Basic usage

import zen

with open('./pricing-rules.json') as f:
    content = f.read()

engine = zen.ZenEngine()
decision = engine.create_decision(content)

response = decision.evaluate({
    "customer": {"tier": "gold", "yearsActive": 3},
    "order": {"subtotal": 150, "items": 5}
})

print(response["result"])
# => {'discount': 0.15, 'freeShipping': True}

Loader

The loader pattern enables dynamic decision loading from any storage backend. Combined with ZenDecisionContent for pre-compilation, this provides optimal performance for multi-decision applications.

File system

import zen

cache = {}

def loader(key):
    if key in cache:
        return cache[key]

    with open(f"./rules/{key}") as f:
        content = zen.ZenDecisionContent(f.read())
        cache[key] = content
        return content

engine = zen.ZenEngine({"loader": loader})

response = engine.evaluate("pricing.json", {"amount": 100})
print(response["result"])

AWS S3

import boto3
import zen
import zipfile
import io

s3 = boto3.client("s3")
rules = {}

# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()

with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))

engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})

response = engine.evaluate("pricing.json", {"amount": 100})

Azure Blob Storage

from azure.storage.blob import BlobServiceClient
import zen
import zipfile
import io
import os

blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")
rules = {}

# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()

with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))

engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})

response = engine.evaluate("pricing.json", {"amount": 100})

Google Cloud Storage

from google.cloud import storage
import zen
import zipfile
import io

client = storage.Client()
bucket = client.bucket("my-rules-bucket")
rules = {}

# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()

with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))

engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})

response = engine.evaluate("pricing.json", {"amount": 100})

Async support

Use async_evaluate for non-blocking evaluation:
import asyncio
import zen

async def loader(key):
    with open(f"./rules/{key}") as f:
        return f.read()

engine = zen.ZenEngine({"loader": loader})

async def main():
    # Evaluate multiple decisions concurrently
    results = await asyncio.gather(
        engine.async_evaluate("pricing.json", {"amount": 100}),
        engine.async_evaluate("eligibility.json", {"score": 750}),
        engine.async_evaluate("shipping.json", {"weight": 5})
    )

    for response in results:
        print(response["result"])

asyncio.run(main())

Error handling

try:
    response = decision.evaluate(input_data)
    print(response["result"])
except Exception as e:
    print(f"Evaluation failed: {e}")

Tracing

Enable tracing to inspect decision execution:
response = decision.evaluate(input_data, {"trace": True})

print(response["trace"])
# Each node's input, output, and performance timing

print(response["performance"])
# Total evaluation time

Expression utilities

Evaluate ZEN expressions outside of a decision context:
import zen

# Standard expressions
result = zen.evaluate_expression("a + b", {"a": 5, "b": 3})
# => 8

total = zen.evaluate_expression("sum(items)", {"items": [1, 2, 3, 4]})
# => 10

# Unary expressions (comparison against $)
is_valid = zen.evaluate_unary_expression(">= 5", {"$": 10})
# => True

in_list = zen.evaluate_unary_expression("'US', 'CA', 'MX'", {"$": "US"})
# => True
Compile expressions for repeated evaluation:
import zen

expr = zen.compile_expression("a * b + c")

result1 = expr.evaluate({"a": 2, "b": 3, "c": 4})
# => 10

result2 = expr.evaluate({"a": 5, "b": 6, "c": 7})
# => 37

Spark integration

For distributed processing at scale, see PySpark and AWS Glue.

Best practices

Use ZenDecisionContent for caching. Pre-compiling decisions avoids repeated parsing overhead. Cache compiled content in a dict keyed by decision name. Initialize the engine once. Create a single ZenEngine instance at application startup and reuse it for all evaluations. Implement a loader for dynamic decisions. The loader pattern centralizes decision loading logic and enables caching at the source. Use async evaluation for concurrent workloads. When evaluating multiple decisions, use async_evaluate with asyncio.gather for better throughput.