> ## Documentation Index
> Fetch the complete documentation index at: https://docs.gorules.io/llms.txt
> Use this file to discover all available pages before exploring further.

# AWS Glue Rules Engine

> Run distributed rule evaluation as managed AWS Glue jobs.

Process large datasets with GoRules on AWS Glue's serverless Spark infrastructure.

## Setup

### 1. Create requirements file

Create `requirements.txt` and upload to S3:

```
zen-engine
```

### 2. Configure Glue job parameters

| Parameter                     | Value        |
| ----------------------------- | ------------ |
| `--additional-python-modules` | `zen-engine` |

Or for requirements file:

| Parameter                           | Value                               |
| ----------------------------------- | ----------------------------------- |
| `--python-modules-installer-option` | `-r`                                |
| `--additional-python-modules`       | `s3://your-bucket/requirements.txt` |

### 3. IAM permissions

Ensure your Glue job role has access to:

* S3 buckets containing rules and data
* Any other AWS services your rules might reference

## Singleton evaluator

Use a singleton pattern with precompiled `ZenDecisionContent` for optimal performance:

```python theme={null}
import zen


class ZenEvaluator:
    """
    Singleton for ZenEngine with precompiled ZenDecisionContent.

    Broadcast dict[str, str] (picklable), precompile to dict[str, ZenDecisionContent],
    then engine.evaluate() uses the loader to return precompiled content.
    """
    _engine: zen.ZenEngine | None = None
    _content: dict[str, zen.ZenDecisionContent] = {}
    _raw: dict[str, str] = {}

    @classmethod
    def initialize(cls, loaders: dict[str, str]):
        """Precompile loaders dict to ZenDecisionContent."""
        if cls._raw != loaders:
            cls._raw = loaders
            cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
            cls._engine = None

    @classmethod
    def _get_engine(cls) -> zen.ZenEngine:
        if cls._engine is None:
            def loader(key: str) -> zen.ZenDecisionContent:
                return cls._content[key]
            cls._engine = zen.ZenEngine({"loader": loader})
        return cls._engine

    @classmethod
    def evaluate(cls, key: str, context: dict) -> dict:
        """Evaluate a decision by key with given context."""
        return cls._get_engine().evaluate(key, context)
```

## Basic job

```python theme={null}
import sys
import boto3
import json

from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

# Load all decisions from S3 zip at startup
import zipfile
import io

s3 = boto3.client('s3')

response = s3.get_object(Bucket='your-bucket', Key='decisions.zip')
zip_bytes = response['Body'].read()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith('/'):
            loaders[name] = zf.read(name).decode('utf-8')

# Broadcast loaders dict (picklable) to all workers
loaders_broadcast = sc.broadcast(loaders)

# Define UDF - precompiles on first call, then reuses
@udf(returnType=StringType())
def evaluate_rules(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

# Load and process data
df = spark.read.parquet("s3://your-bucket/input/")
df = df.repartition(200)

logger.info(f"Processing {df.count()} rows")

result_df = df.withColumn("result", evaluate_rules(col("data")))

result_df.write \
    .mode("overwrite") \
    .parquet("s3://your-bucket/output/")

job.commit()
```

## Processing structured columns

Process data from separate columns rather than JSON:

```python theme={null}
from pyspark.sql.types import StringType, DoubleType, BooleanType

bc = loaders_broadcast

@udf(returnType=StringType())
def evaluate_pricing(customer_tier, years_active, order_subtotal, item_count):
    import json
    ZenEvaluator.initialize(bc.value)
    result = ZenEvaluator.evaluate("pricing", {
        "customer": {
            "tier": customer_tier,
            "yearsActive": int(years_active) if years_active else 0
        },
        "order": {
            "subtotal": float(order_subtotal) if order_subtotal else 0,
            "items": int(item_count) if item_count else 0
        }
    })
    return json.dumps(result["result"])

result_df = df.withColumn(
    "pricing",
    evaluate_pricing(
        col("customer_tier"),
        col("years_active"),
        col("order_subtotal"),
        col("item_count")
    )
)
```

## Error handling

Return structured results with success/error information:

```python theme={null}
from pyspark.sql.types import StructType, StructField, BooleanType, StringType

result_schema = StructType([
    StructField("success", BooleanType(), False),
    StructField("result", StringType(), True),
    StructField("error", StringType(), True)
])

@udf(returnType=result_schema)
def evaluate_rules_safe(row_json):
    import json
    try:
        ZenEvaluator.initialize(loaders_broadcast.value)
        input_data = json.loads(row_json)
        result = ZenEvaluator.evaluate("pricing", input_data)
        return (True, json.dumps(result["result"]), None)
    except Exception as e:
        return (False, None, str(e))

result_df = df.withColumn("evaluation", evaluate_rules_safe(col("data")))

# Filter and handle failures
success_df = result_df.filter(col("evaluation.success") == True)
failed_df = result_df.filter(col("evaluation.success") == False)

# Log failures
failed_count = failed_df.count()
if failed_count > 0:
    logger.warn(f"{failed_count} rows failed evaluation")
    failed_df.write.mode("overwrite").parquet("s3://your-bucket/failed/")
```

## Reading from Glue Data Catalog

```python theme={null}
from awsglue.dynamicframe import DynamicFrame

# Read from cataloged table
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
    database="my_database",
    table_name="my_table"
)
df = dynamic_frame.toDF()

result_df = df.withColumn("result", evaluate_rules(col("data")))

# Write back to S3 with catalog update
glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(result_df, glueContext, "result"),
    connection_type="s3",
    connection_options={"path": "s3://your-bucket/output/"},
    format="parquet"
)
```

## Job bookmarks

Enable job bookmarks to process only new data incrementally:

```python theme={null}
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])

# Read with bookmark - only processes new data since last run
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
    database="my_database",
    table_name="my_table",
    transformation_ctx="datasource"  # Required for bookmarks
)
df = dynamic_frame.toDF()

# Process
result_df = df.withColumn("result", evaluate_rules(col("data")))

# Write with bookmark tracking
glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(result_df, glueContext, "result"),
    connection_type="s3",
    connection_options={"path": "s3://your-bucket/output/"},
    format="parquet",
    transformation_ctx="datasink"  # Required for bookmarks
)

job.commit()  # Commits bookmark state
```

## Multiple rule files

Process with multiple decision files using a single loaders dict. All decisions are extracted from `decisions.zip`:

```python theme={null}
# Load all decisions from S3 zip at startup
response = s3.get_object(Bucket='your-bucket', Key='decisions.zip')
zip_bytes = response['Body'].read()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith('/'):
            loaders[name] = zf.read(name).decode('utf-8')

loaders_broadcast = sc.broadcast(loaders)

@udf(returnType=StringType())
def evaluate_pricing(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("pricing", input_data)
    return json.dumps(result["result"])

@udf(returnType=StringType())
def evaluate_eligibility(row_json):
    import json
    ZenEvaluator.initialize(loaders_broadcast.value)
    input_data = json.loads(row_json)
    result = ZenEvaluator.evaluate("eligibility", input_data)
    return json.dumps(result["result"])

result_df = (
    df
    .withColumn("pricing", evaluate_pricing(col("data")))
    .withColumn("eligibility", evaluate_eligibility(col("data")))
)
```

## Parameterized jobs

Pass rule location as job parameter:

```python theme={null}
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'rules_bucket',
    'rules_key',
    'input_path',
    'output_path'
])

# Load all decisions from parameterized S3 zip location
response = s3.get_object(Bucket=args['rules_bucket'], Key=args['rules_key'])
zip_bytes = response['Body'].read()

loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
    for name in zf.namelist():
        if not name.endswith('/'):
            loaders[name] = zf.read(name).decode('utf-8')

loaders_broadcast = sc.broadcast(loaders)

# Read from parameterized input
df = spark.read.parquet(args['input_path'])

# ... process ...

# Write to parameterized output
result_df.write.mode("overwrite").parquet(args['output_path'])
```

Run with:

```bash theme={null}
aws glue start-job-run \
  --job-name my-rules-job \
  --arguments '{
    "--rules_bucket": "my-bucket",
    "--rules_key": "decisions.zip",
    "--input_path": "s3://my-bucket/input/",
    "--output_path": "s3://my-bucket/output/"
  }'
```

## Performance tuning

### Worker configuration

| Worker Type | vCPU | Memory | Recommended Partitions |
| ----------- | ---- | ------ | ---------------------- |
| G.1X        | 4    | 16 GB  | 2-4 per worker         |
| G.2X        | 8    | 32 GB  | 4-8 per worker         |
| G.4X        | 16   | 64 GB  | 8-16 per worker        |
| G.8X        | 32   | 128 GB | 16-32 per worker       |

### Repartitioning

```python theme={null}
# Calculate optimal partitions
num_workers = 10  # Set based on your job configuration
partitions_per_worker = 4
total_partitions = num_workers * partitions_per_worker

df = df.repartition(total_partitions)
```

### Coalesce for output

```python theme={null}
# Reduce partitions before writing to avoid small files
result_df.coalesce(10).write.mode("overwrite").parquet("s3://your-bucket/output/")
```

## Best practices

**Precompile on initialize.** The `ZenEvaluator` converts `dict[str, str]` to `dict[str, ZenDecisionContent]` once, then the loader returns precompiled content for maximum performance.

**Broadcast loaders dict.** Broadcast `dict[str, str]` (picklable) to all workers. Each worker precompiles once on first use.

**Use engine.evaluate directly.** No need to call `create_decision` - the engine's loader handles everything.

**Repartition appropriately.** Match partition count to worker count (typically 2-4x the number of DPUs).

**Enable job bookmarks.** For incremental processing, use `transformation_ctx` on both reads and writes.

**Store rules in S3.** Keep decision files in S3 for easy updates without redeploying the job.

<Note>
  The `ZenEvaluator` precompiles JSON strings to `ZenDecisionContent` on first initialization per worker. The engine's loader then returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.
</Note>
