> ## Documentation Index
> Fetch the complete documentation index at: https://docs.gorules.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Python Rules Engine

> Integrate GoRules into your Python application.

Install the ZEN Engine and evaluate your first decision in Python.

## Installation

<CodeGroup>
  ```bash pip theme={null}
  pip install zen-engine
  ```

  ```bash poetry theme={null}
  poetry add zen-engine
  ```

  ```bash uv theme={null}
  uv add zen-engine
  ```
</CodeGroup>

## Basic usage

```python theme={null}
import zen

with open('./pricing-rules.json') as f:
    content = f.read()

engine = zen.ZenEngine()
decision = engine.create_decision(content)

response = decision.evaluate({
    "customer": {"tier": "gold", "yearsActive": 3},
    "order": {"subtotal": 150, "items": 5}
})

print(response["result"])
# => {'discount': 0.15, 'freeShipping': True}
```

## Loader

The loader pattern enables dynamic decision loading from any storage backend. Combined with `ZenDecisionContent` for pre-compilation, this provides optimal performance for multi-decision applications.

### File system

```python theme={null}
import zen

cache = {}

def loader(key):
    if key in cache:
        return cache[key]

    with open(f"./rules/{key}") as f:
        content = zen.ZenDecisionContent(f.read())
        cache[key] = content
        return content

engine = zen.ZenEngine({"loader": loader})

response = engine.evaluate("pricing.json", {"amount": 100})
print(response["result"])
```

### AWS S3

```python theme={null}
import boto3
import zen
import zipfile
import io

s3 = boto3.client("s3")
rules = {}

# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()

with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))

engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})

response = engine.evaluate("pricing.json", {"amount": 100})
```

### Azure Blob Storage

```python theme={null}
from azure.storage.blob import BlobServiceClient
import zen
import zipfile
import io
import os

blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")
rules = {}

# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()

with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))

engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})

response = engine.evaluate("pricing.json", {"amount": 100})
```

### Google Cloud Storage

```python theme={null}
from google.cloud import storage
import zen
import zipfile
import io

client = storage.Client()
bucket = client.bucket("my-rules-bucket")
rules = {}

# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()

with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            rules[name] = zen.ZenDecisionContent(zf.read(name).decode("utf-8"))

engine = zen.ZenEngine({"loader": lambda key: rules.get(key)})

response = engine.evaluate("pricing.json", {"amount": 100})
```

## Async support

Use `async_evaluate` for non-blocking evaluation:

```python theme={null}
import asyncio
import zen

async def loader(key):
    with open(f"./rules/{key}") as f:
        return f.read()

engine = zen.ZenEngine({"loader": loader})

async def main():
    # Evaluate multiple decisions concurrently
    results = await asyncio.gather(
        engine.async_evaluate("pricing.json", {"amount": 100}),
        engine.async_evaluate("eligibility.json", {"score": 750}),
        engine.async_evaluate("shipping.json", {"weight": 5})
    )

    for response in results:
        print(response["result"])

asyncio.run(main())
```

## Error handling

```python theme={null}
try:
    response = decision.evaluate(input_data)
    print(response["result"])
except Exception as e:
    print(f"Evaluation failed: {e}")
```

## Tracing

Enable tracing to inspect decision execution:

```python theme={null}
response = decision.evaluate(input_data, {"trace": True})

print(response["trace"])
# Each node's input, output, and performance timing

print(response["performance"])
# Total evaluation time
```

## Expression utilities

Evaluate ZEN expressions outside of a decision context:

```python theme={null}
import zen

# Standard expressions
result = zen.evaluate_expression("a + b", {"a": 5, "b": 3})
# => 8

total = zen.evaluate_expression("sum(items)", {"items": [1, 2, 3, 4]})
# => 10

# Unary expressions (comparison against $)
is_valid = zen.evaluate_unary_expression(">= 5", {"$": 10})
# => True

in_list = zen.evaluate_unary_expression("'US', 'CA', 'MX'", {"$": "US"})
# => True
```

Compile expressions for repeated evaluation:

```python theme={null}
import zen

expr = zen.compile_expression("a * b + c")

result1 = expr.evaluate({"a": 2, "b": 3, "c": 4})
# => 10

result2 = expr.evaluate({"a": 5, "b": 6, "c": 7})
# => 37
```

## Spark integration

For distributed processing at scale, see [PySpark](/developers/integrations/pyspark) and [AWS Glue](/developers/integrations/aws-glue).

## Best practices

**Use `ZenDecisionContent` for caching.** Pre-compiling decisions avoids repeated parsing overhead. Cache compiled content in a dict keyed by decision name.

**Initialize the engine once.** Create a single `ZenEngine` instance at application startup and reuse it for all evaluations.

**Implement a loader for dynamic decisions.** The loader pattern centralizes decision loading logic and enables caching at the source.

**Use async evaluation for concurrent workloads.** When evaluating multiple decisions, use `async_evaluate` with `asyncio.gather` for better throughput.
