Process large datasets with GoRules on AWS Glue’s serverless Spark infrastructure.
Setup
1. Create requirements file
Create requirements.txt and upload to S3:
| Parameter | Value |
|---|
--additional-python-modules | zen-engine |
Or for requirements file:
| Parameter | Value |
|---|
--python-modules-installer-option | -r |
--additional-python-modules | s3://your-bucket/requirements.txt |
3. IAM permissions
Ensure your Glue job role has access to:
- S3 buckets containing rules and data
- Any other AWS services your rules might reference
Singleton evaluator
Use a singleton pattern with precompiled ZenDecisionContent for optimal performance:
import zen
class ZenEvaluator:
"""
Singleton for ZenEngine with precompiled ZenDecisionContent.
Broadcast dict[str, str] (picklable), precompile to dict[str, ZenDecisionContent],
then engine.evaluate() uses the loader to return precompiled content.
"""
_engine: zen.ZenEngine | None = None
_content: dict[str, zen.ZenDecisionContent] = {}
_raw: dict[str, str] = {}
@classmethod
def initialize(cls, loaders: dict[str, str]):
"""Precompile loaders dict to ZenDecisionContent."""
if cls._raw != loaders:
cls._raw = loaders
cls._content = {k: zen.ZenDecisionContent(v) for k, v in loaders.items()}
cls._engine = None
@classmethod
def _get_engine(cls) -> zen.ZenEngine:
if cls._engine is None:
def loader(key: str) -> zen.ZenDecisionContent:
return cls._content[key]
cls._engine = zen.ZenEngine({"loader": loader})
return cls._engine
@classmethod
def evaluate(cls, key: str, context: dict) -> dict:
"""Evaluate a decision by key with given context."""
return cls._get_engine().evaluate(key, context)
Basic job
import sys
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
# Load all decisions from S3 zip at startup
import zipfile
import io
s3 = boto3.client('s3')
response = s3.get_object(Bucket='your-bucket', Key='decisions.zip')
zip_bytes = response['Body'].read()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith('/'):
loaders[name] = zf.read(name).decode('utf-8')
# Broadcast loaders dict (picklable) to all workers
loaders_broadcast = sc.broadcast(loaders)
# Define UDF - precompiles on first call, then reuses
@udf(returnType=StringType())
def evaluate_rules(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
# Load and process data
df = spark.read.parquet("s3://your-bucket/input/")
df = df.repartition(200)
logger.info(f"Processing {df.count()} rows")
result_df = df.withColumn("result", evaluate_rules(col("data")))
result_df.write \
.mode("overwrite") \
.parquet("s3://your-bucket/output/")
job.commit()
Processing structured columns
Process data from separate columns rather than JSON:
from pyspark.sql.types import StringType, DoubleType, BooleanType
bc = loaders_broadcast
@udf(returnType=StringType())
def evaluate_pricing(customer_tier, years_active, order_subtotal, item_count):
import json
ZenEvaluator.initialize(bc.value)
result = ZenEvaluator.evaluate("pricing", {
"customer": {
"tier": customer_tier,
"yearsActive": int(years_active) if years_active else 0
},
"order": {
"subtotal": float(order_subtotal) if order_subtotal else 0,
"items": int(item_count) if item_count else 0
}
})
return json.dumps(result["result"])
result_df = df.withColumn(
"pricing",
evaluate_pricing(
col("customer_tier"),
col("years_active"),
col("order_subtotal"),
col("item_count")
)
)
Error handling
Return structured results with success/error information:
from pyspark.sql.types import StructType, StructField, BooleanType, StringType
result_schema = StructType([
StructField("success", BooleanType(), False),
StructField("result", StringType(), True),
StructField("error", StringType(), True)
])
@udf(returnType=result_schema)
def evaluate_rules_safe(row_json):
import json
try:
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return (True, json.dumps(result["result"]), None)
except Exception as e:
return (False, None, str(e))
result_df = df.withColumn("evaluation", evaluate_rules_safe(col("data")))
# Filter and handle failures
success_df = result_df.filter(col("evaluation.success") == True)
failed_df = result_df.filter(col("evaluation.success") == False)
# Log failures
failed_count = failed_df.count()
if failed_count > 0:
logger.warn(f"{failed_count} rows failed evaluation")
failed_df.write.mode("overwrite").parquet("s3://your-bucket/failed/")
Reading from Glue Data Catalog
from awsglue.dynamicframe import DynamicFrame
# Read from cataloged table
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
database="my_database",
table_name="my_table"
)
df = dynamic_frame.toDF()
result_df = df.withColumn("result", evaluate_rules(col("data")))
# Write back to S3 with catalog update
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(result_df, glueContext, "result"),
connection_type="s3",
connection_options={"path": "s3://your-bucket/output/"},
format="parquet"
)
Job bookmarks
Enable job bookmarks to process only new data incrementally:
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
# Read with bookmark - only processes new data since last run
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
database="my_database",
table_name="my_table",
transformation_ctx="datasource" # Required for bookmarks
)
df = dynamic_frame.toDF()
# Process
result_df = df.withColumn("result", evaluate_rules(col("data")))
# Write with bookmark tracking
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(result_df, glueContext, "result"),
connection_type="s3",
connection_options={"path": "s3://your-bucket/output/"},
format="parquet",
transformation_ctx="datasink" # Required for bookmarks
)
job.commit() # Commits bookmark state
Multiple rule files
Process with multiple decision files using a single loaders dict. All decisions are extracted from decisions.zip:
# Load all decisions from S3 zip at startup
response = s3.get_object(Bucket='your-bucket', Key='decisions.zip')
zip_bytes = response['Body'].read()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith('/'):
loaders[name] = zf.read(name).decode('utf-8')
loaders_broadcast = sc.broadcast(loaders)
@udf(returnType=StringType())
def evaluate_pricing(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("pricing", input_data)
return json.dumps(result["result"])
@udf(returnType=StringType())
def evaluate_eligibility(row_json):
import json
ZenEvaluator.initialize(loaders_broadcast.value)
input_data = json.loads(row_json)
result = ZenEvaluator.evaluate("eligibility", input_data)
return json.dumps(result["result"])
result_df = (
df
.withColumn("pricing", evaluate_pricing(col("data")))
.withColumn("eligibility", evaluate_eligibility(col("data")))
)
Parameterized jobs
Pass rule location as job parameter:
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'rules_bucket',
'rules_key',
'input_path',
'output_path'
])
# Load all decisions from parameterized S3 zip location
response = s3.get_object(Bucket=args['rules_bucket'], Key=args['rules_key'])
zip_bytes = response['Body'].read()
loaders = {}
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if not name.endswith('/'):
loaders[name] = zf.read(name).decode('utf-8')
loaders_broadcast = sc.broadcast(loaders)
# Read from parameterized input
df = spark.read.parquet(args['input_path'])
# ... process ...
# Write to parameterized output
result_df.write.mode("overwrite").parquet(args['output_path'])
Run with:
aws glue start-job-run \
--job-name my-rules-job \
--arguments '{
"--rules_bucket": "my-bucket",
"--rules_key": "decisions.zip",
"--input_path": "s3://my-bucket/input/",
"--output_path": "s3://my-bucket/output/"
}'
Worker configuration
| Worker Type | vCPU | Memory | Recommended Partitions |
|---|
| G.1X | 4 | 16 GB | 2-4 per worker |
| G.2X | 8 | 32 GB | 4-8 per worker |
| G.4X | 16 | 64 GB | 8-16 per worker |
| G.8X | 32 | 128 GB | 16-32 per worker |
Repartitioning
# Calculate optimal partitions
num_workers = 10 # Set based on your job configuration
partitions_per_worker = 4
total_partitions = num_workers * partitions_per_worker
df = df.repartition(total_partitions)
Coalesce for output
# Reduce partitions before writing to avoid small files
result_df.coalesce(10).write.mode("overwrite").parquet("s3://your-bucket/output/")
Best practices
Precompile on initialize. The ZenEvaluator converts dict[str, str] to dict[str, ZenDecisionContent] once, then the loader returns precompiled content for maximum performance.
Broadcast loaders dict. Broadcast dict[str, str] (picklable) to all workers. Each worker precompiles once on first use.
Use engine.evaluate directly. No need to call create_decision - the engine’s loader handles everything.
Repartition appropriately. Match partition count to worker count (typically 2-4x the number of DPUs).
Enable job bookmarks. For incremental processing, use transformation_ctx on both reads and writes.
Store rules in S3. Keep decision files in S3 for easy updates without redeploying the job.
The ZenEvaluator precompiles JSON strings to ZenDecisionContent on first initialization per worker. The engine’s loader then returns precompiled content, avoiding repeated JSON parsing. This provides optimal performance for high-throughput processing.