> ## Documentation Index
> Fetch the complete documentation index at: https://docs.gorules.io/llms.txt
> Use this file to discover all available pages before exploring further.

# PySpark Rules Engine

> Evaluate rules at scale with distributed PySpark DataFrames.

Process large datasets with distributed rule evaluation using PySpark UDFs and broadcast variables.

## Installation

```bash theme={null}
pip install zen-engine pyspark
```

## Singleton evaluator

Use a singleton pattern with precompiled `ZenDecisionContent` for optimal performance:

```python theme={null}
import zen


class ZenEvaluator:
    """
    Singleton for ZenEngine with precompiled ZenDecisionContent.

    Broadcast dict[str, str] (picklable), precompile to dict[str, ZenDecisionContent],
    then engine.evaluate() uses the loader to return precompiled content.
    """
    _engine: zen.ZenEngine | None = None
    _content: dict[str, zen.ZenDecisionContent] = {}
    _raw: dict[str, str] = {}

    @classmethod
    def initialize(cls, loaders: dict[str, str]):
        """Precompile loaders dict to ZenDecisionContent."""
        if cls._raw != loaders:
            cls._raw = loaders
            cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
            cls._engine = None

    @classmethod
    def _get_engine(cls) -> zen.ZenEngine:
        if cls._engine is None:
            def loader(key: str) -> zen.ZenDecisionContent:
                return cls._content[key]
            cls._engine = zen.ZenEngine({"loader": loader})
        return cls._engine

    @classmethod
    def evaluate(cls, key: str, context: dict) -> dict:
        """Evaluate a decision by key with given context."""
        return cls._get_engine().evaluate(key, context)
```

## Basic usage

```python theme={null}
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()

# Load decisions into loaders dict and broadcast
with open('./pricing.json') as f:
    loaders = {"pricing": f.read()}

loaders_broadcast = spark.sparkContext.broadcast(loaders)

# Define UDF with cached ZenDecision
@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

# Process data
df = spark.read.parquet("s3://bucket/input.parquet")
result_df = df.withColumn("result", evaluate_rules(col("data")))
result_df.write.mode("overwrite").parquet("s3://bucket/output.parquet")
```

## Cloud storage

Load all decisions from a single zip file at startup for optimal performance.

### AWS S3

```python theme={null}
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
s3 = boto3.client("s3")

# Download and extract all decisions at startup
obj = s3.get_object(Bucket="my-rules-bucket", Key="decisions.zip")
zip_bytes = obj["Body"].read()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

result_df = df.withColumn("result", evaluate_rules(col("data")))
```

### Azure Blob Storage

```python theme={null}
from azure.storage.blob import BlobServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import os
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
blob_service = BlobServiceClient.from_connection_string(os.environ["AZURE_STORAGE_CONNECTION"])
container = blob_service.get_container_client("rules")

# Download and extract all decisions at startup
blob = container.get_blob_client("decisions.zip")
zip_bytes = blob.download_blob().readall()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

result_df = df.withColumn("result", evaluate_rules(col("data")))
```

### Google Cloud Storage

```python theme={null}
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import zipfile
import io
import json

spark = SparkSession.builder.appName("RulesEvaluation").getOrCreate()
client = storage.Client()
bucket = client.bucket("my-rules-bucket")

# Download and extract all decisions at startup
blob = bucket.blob("decisions.zip")
zip_bytes = blob.download_as_bytes()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith("/"):
            loaders[name] = zf.read(name).decode("utf-8")

loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing.json", input_data)
    return json.dumps(result["result"])

result_df = df.withColumn("result", evaluate_rules(col("data")))
```

## Processing structured columns

Process data from separate columns:

```python theme={null}
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, DoubleType, BooleanType

bc = loaders_broadcast

@udf(returnType=StringType())
def evaluate_pricing(customer_tier, years_active, order_subtotal, item_count):
    import json
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {
            "tier": customer_tier,
            "yearsActive": int(years_active) if years_active else 0
        },
        "order": {
            "subtotal": float(order_subtotal) if order_subtotal else 0,
            "items": int(item_count) if item_count else 0
        }
    })
    return json.dumps(result["result"])

result_df = df.withColumn(
    "pricing",
    evaluate_pricing(
        col("customer_tier"),
        col("years_active"),
        col("order_subtotal"),
        col("item_count")
    )
)
```

## Extracting result fields

Return specific result fields directly:

```python theme={null}
@udf(returnType=DoubleType())
def get_discount(customer_tier, years_active, order_subtotal, item_count):
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {"tier": customer_tier, "yearsActive": int(years_active or 0)},
        "order": {"subtotal": float(order_subtotal or 0), "items": int(item_count or 0)}
    })
    return float(result["result"]["discount"])

@udf(returnType=BooleanType())
def get_free_shipping(customer_tier, years_active, order_subtotal, item_count):
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {"tier": customer_tier, "yearsActive": int(years_active or 0)},
        "order": {"subtotal": float(order_subtotal or 0), "items": int(item_count or 0)}
    })
    return bool(result["result"]["freeShipping"])

result_df = df.withColumn(
    "discount",
    get_discount(col("customer_tier"), col("years_active"), col("order_subtotal"), col("item_count"))
).withColumn(
    "free_shipping",
    get_free_shipping(col("customer_tier"), col("years_active"), col("order_subtotal"), col("item_count"))
)
```

## Error handling

Return structured results with success/error information:

```python theme={null}
from pyspark.sql.types import StructType, StructField, BooleanType, StringType

result_schema = StructType([
    StructField("success", BooleanType(), False),
    StructField("result", StringType(), True),
    StructField("error", StringType(), True)
])

@udf(returnType=result_schema)
def evaluate_rules_safe(row_json):
    import json
    try:
        ZenEvaluator.initialize(loaders_broadcast.value)
        input_data = json.loads(row_json)
        result = ZenEvaluator.evaluate("pricing", input_data)
        return (True, json.dumps(result["result"]), None)
    except Exception as e:
        return (False, None, str(e))

result_df = df.withColumn("evaluation", evaluate_rules_safe(col("data")))

# Filter successful and failed evaluations
success_df = result_df.filter(col("evaluation.success") == True)
failed_df = result_df.filter(col("evaluation.success") == False)
```

## Multiple decisions

Evaluate multiple rule sets from a single loaders dict:

```python theme={null}
# Load multiple rules into loaders dict
with open('./pricing.json') as f:
    pricing_content = f.read()
with open('./eligibility.json') as f:
    eligibility_content = f.read()

loaders = {
    "pricing": pricing_content,
    "eligibility": eligibility_content,
}
loaders_broadcast = spark.sparkContext.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_pricing(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

@udf(returnType=StringType())
def evaluate_eligibility(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("eligibility", input_data)
    return json.dumps(result["result"])

result_df = (
    df
    .withColumn("pricing", evaluate_pricing(col("data")))
    .withColumn("eligibility", evaluate_eligibility(col("data")))
)
```

## Performance tuning

### Repartition for parallelism

```python theme={null}
# Repartition for better parallelism (2-4x number of cores)
df = df.repartition(200)

result_df = df.withColumn("result", evaluate_rules(col("data")))

# Coalesce to reduce partitions for output
result_df.coalesce(10).write.parquet("output.parquet")
```

### Helper function for UDF creation

```python theme={null}
def make_evaluate_udf(loaders_broadcast):
    """Create a UDF that uses broadcast loaders with cached ZenDecision."""
    @udf(returnType=StringType())
    def evaluate_rules(row_json):
        import json
        ZenEvaluator.initialize(loaders_broadcast.value)
        input_data = json.loads(row_json)
        result = ZenEvaluator.evaluate("pricing", input_data)
        return json.dumps(result["result"])
    return evaluate_rules

# Usage
evaluate_rules = make_evaluate_udf(loaders_broadcast)
result_df = df.withColumn("result", evaluate_rules(col("data")))
```

## Best practices

**Precompile on initialize.** The `ZenEvaluator` converts `dict[str, str]` to `dict[str, ZenDecisionContent]` once, then the loader returns precompiled content for maximum performance.

**Broadcast loaders dict.** Broadcast `dict[str, str]` (picklable) to all workers. Each worker precompiles once on first use.

**Use engine.evaluate directly.** No need to call `create_decision` - the engine's loader handles everything.

**Repartition for parallelism.** Match partition count to cluster parallelism (typically 2-4x cores).

**Use Parquet for I/O.** Parquet provides efficient columnar storage and predicate pushdown.

<Note>
  The `ZenEvaluator` precompiles JSON strings to `ZenDecisionContent` on first initialization per worker. The engine's loader then returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.
</Note>
