Documentation Index
Fetch the complete documentation index at: https://docs.gorules.io/llms.txt
Use this file to discover all available pages before exploring further.
Install the ZEN Engine and evaluate your first decision in Python.
Installation
Basic usage
import zen
with open('./pricing-rules.json') as f:
content = f.read()
engine = zen.ZenEngine()
decision = engine.create_decision(content)
response = decision.evaluate({
"customer": {"tier": "gold", "yearsActive": 3},
"order": {"subtotal": 150, "items": 5}
})
print(response["result"])
# => {'discount': 0.15, 'freeShipping': True}
Loader
The loader pattern enables dynamic decision loading from any storage backend. Combined with ZenDecisionContent for pre-compilation, this provides optimal performance for multi-decision applications.
File system
import zen
cache = {}
def loader(key):
if key in cache:
return cache[key]
with open(f"./rules/{key}") as f:
content = zen.ZenDecisionContent(f.read())
cache[key] = content
return content
engine = zen.ZenEngine({"loader": loader})
response = engine.evaluate("pricing.json", {"amount": 100})
print(response["result"])
AWS S3
import boto3
import zen
import zipfile
import io
s3 = boto3.client("s3")
rules = {}
# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))
engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})
response = engine.evaluate("pricing.json", {"amount": 100})
Azure Blob Storage
from azure.storage.blob import BlobServiceClient
import zen
import zipfile
import io
import os
blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")
rules = {}
# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))
engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})
response = engine.evaluate("pricing.json", {"amount": 100})
Google Cloud Storage
from google.cloud import storage
import zen
import zipfile
import io
client = storage.Client()
bucket = client.bucket("my-rules-bucket")
rules = {}
# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith("/"):
rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))
engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})
response = engine.evaluate("pricing.json", {"amount": 100})
Async support
Use async_evaluate for non-blocking evaluation:
import asyncio
import zen
async def loader(key):
with open(f"./rules/{key}") as f:
return f.read()
engine = zen.ZenEngine({"loader": loader})
async def main():
# Evaluate multiple decisions concurrently
results = await asyncio.gather(
engine.async_evaluate("pricing.json", {"amount": 100}),
engine.async_evaluate("eligibility.json", {"score": 750}),
engine.async_evaluate("shipping.json", {"weight": 5})
)
for response in results:
print(response["result"])
asyncio.run(main())
Error handling
try:
response = decision.evaluate(input_data)
print(response["result"])
except Exception as e:
print(f"Evaluation failed: {e}")
Tracing
Enable tracing to inspect decision execution:
response = decision.evaluate(input_data, {"trace": True})
print(response["trace"])
# Each node's input, output, and performance timing
print(response["performance"])
# Total evaluation time
Expression utilities
Evaluate ZEN expressions outside of a decision context:
import zen
# Standard expressions
result = zen.evaluate_expression("a + b", {"a": 5, "b": 3})
# => 8
total = zen.evaluate_expression("sum(items)", {"items": [1, 2, 3, 4]})
# => 10
# Unary expressions (comparison against $)
is_valid = zen.evaluate_unary_expression(">= 5", {"$": 10})
# => True
in_list = zen.evaluate_unary_expression("'US', 'CA', 'MX'", {"$": "US"})
# => True
Compile expressions for repeated evaluation:
import zen
expr = zen.compile_expression("a * b + c")
result1 = expr.evaluate({"a": 2, "b": 3, "c": 4})
# => 10
result2 = expr.evaluate({"a": 5, "b": 6, "c": 7})
# => 37
Spark integration
For distributed processing at scale, see PySpark and AWS Glue.
Best practices
Use ZenDecisionContent for caching. Pre-compiling decisions avoids repeated parsing overhead. Cache compiled content in a dict keyed by decision name.
Initialize the engine once. Create a single ZenEngine instance at application startup and reuse it for all evaluations.
Implement a loader for dynamic decisions. The loader pattern centralizes decision loading logic and enables caching at the source.
Use async evaluation for concurrent workloads. When evaluating multiple decisions, use async_evaluate with asyncio.gather for better throughput.