Skip to content

Commit 8eb2e79

Browse files
author
dinsajwa
committed
fix(bda): updated blueprint schema, added suuport for file uploaded
1 parent e3d0e92 commit 8eb2e79

File tree

11 files changed

+137
-56
lines changed

11 files changed

+137
-56
lines changed

lambda/aws-bedrock-data-automation/bda_blueprint/create_schema.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
# and limitations under the License.
1111
#
1212
from typing import Annotated, List, Dict, Type
13-
from custom_blueprint_schema import create_schema_fields,custom_blue_print
13+
from custom_blueprint_schema import create_schema_fields, custom_blue_print
1414

15-
def create_schema(fields: List[Dict[str, str]]) -> Type[custom_blue_print]:
15+
def create_schema(fields: List[Dict[str, str]], document_class: str = "", document_description: str = "") -> Type[custom_blue_print]:
1616
"""
1717
Create a schema class based on a list of field configurations.
1818
@@ -25,6 +25,8 @@ def create_schema(fields: List[Dict[str, str]]) -> Type[custom_blue_print]:
2525
"alias": "field alias"
2626
}
2727
]
28+
document_class (str): The class of the document (default: "Notice of Assessment")
29+
document_description (str): Description of the document (default: "Notice of Assessment Document")
2830
2931
Returns:
3032
Type[BaseBlueprintSchema]: A new schema class
@@ -35,18 +37,24 @@ def create_schema(fields: List[Dict[str, str]]) -> Type[custom_blue_print]:
3537
for field in fields:
3638
field_name = field["name"]
3739
annotations[field_name] = Annotated[
38-
List[str],
40+
str,
3941
create_schema_fields(
4042
description=field["description"],
4143
alias=field["alias"]
4244
)
4345
]
4446

4547
# Create a new schema class dynamically
46-
return type(
48+
schema_class = type(
4749
"DynamicSchema",
4850
(custom_blue_print,),
4951
{
5052
"__annotations__": annotations
5153
}
52-
)
54+
)
55+
56+
# Set the class and description in the model_config
57+
schema_class.model_config["json_schema_extra"]["class"] = document_class
58+
schema_class.model_config["json_schema_extra"]["description"] = document_description
59+
60+
return schema_class

lambda/aws-bedrock-data-automation/bda_blueprint/custom_blueprint_schema.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class custom_blue_print(BaseModel):
3131
"json_schema_extra": {
3232
# Specify JSON Schema version and default metadata
3333
"$schema": "http://json-schema.org/draft-07/schema#",
34-
"description": "default",
35-
"documentClass": "default",
34+
"description": "",
35+
"class": "",
3636
},
3737
}
3838

@@ -53,6 +53,9 @@ def model_json_schema(cls, *args, **kwargs) -> dict:
5353

5454
# Remove top-level title if present
5555
schema.pop("title", None)
56+
57+
# Remove required field if present
58+
schema.pop("required", None)
5659

5760
# Process each field in the model
5861
properties = {}
@@ -67,21 +70,20 @@ def model_json_schema(cls, *args, **kwargs) -> dict:
6770

6871
def create_schema_fields(description: str, alias: str | None = None):
6972
"""
70-
Creates a field configuration for extractive schema fields.
73+
Creates a field configuration for explicit schema fields.
7174
7275
Args:
73-
description (str): Description of what should be extracted from the document
76+
description (str): Instruction of what should be extracted from the document
7477
alias (str | None): Optional alternative name for the field in the output
7578
7679
Returns:
77-
Field: Configured Pydantic Field with extractive properties
80+
Field: Configured Pydantic Field with explicit properties
7881
"""
7982
return Field(
80-
default_factory=list,
81-
description=description,
8283
alias=alias,
8384
json_schema_extra={
84-
"inferenceType": "extractive",
85-
"items": {"type": "string"},
85+
"type": "string",
86+
"inferenceType": "explicit",
87+
"instruction": description
8688
},
8789
)

lambda/aws-bedrock-data-automation/bda_blueprint/lambda.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,19 @@ def handler(event, context: LambdaContext):
158158
case "create":
159159
logger.info("create blueprint")
160160

161+
# Check if schema_file_name is provided
161162
if 'schema_file_name' in blueprint_details:
162163
input_key = blueprint_details['schema_file_name']
163164

164165
logger.info(f"Retrieving schema from S3: {input_bucket}/{input_key}")
165166
schema_content = get_schema(input_bucket, input_key)
166167

167-
if 'schema_fields' in blueprint_details:
168+
# Convert schema_content to a JSON string if it's a dictionary
169+
if isinstance(schema_content, dict):
170+
schema_content = json.dumps(schema_content)
171+
172+
# Only use schema_fields if schema_file_name is not provided
173+
elif 'schema_fields' in blueprint_details:
168174
schema_fields = blueprint_details['schema_fields']
169175

170176
# Validate schema_fields format
@@ -178,16 +184,31 @@ def handler(event, context: LambdaContext):
178184

179185
# Create schema using AWS BDA format
180186
try:
181-
schema_content = create_schema(schema_fields)
187+
# Get document class and description from blueprint_details if provided
188+
document_class = blueprint_details.get('document_class', "")
189+
document_description = blueprint_details.get('document_description', "")
190+
191+
# Create schema with the provided fields, class, and description
192+
schema_class = create_schema(
193+
schema_fields,
194+
document_class=document_class,
195+
document_description=document_description
196+
)
197+
198+
# Convert the schema class to a JSON schema dictionary and then to a JSON string
199+
schema_json = schema_class.model_json_schema()
200+
schema_content = json.dumps(schema_json)
182201
except Exception as e:
183-
print("Error creating schema")
202+
logger.error("Error creating schema", extra={"error": str(e)})
184203
return {
185204
'statusCode': 500,
186205
'body': json.dumps({
187206
'message': 'Error creating schema',
188207
'error': str(e)
189208
})
190209
}
210+
else:
211+
raise ValueError("Either schema_file_name or schema_fields must be provided")
191212

192213

193214
response= create_blueprint(schema_content,blueprint_details)

lambda/aws-bedrock-data-automation/bda_project/lambda.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,5 +135,3 @@ def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
135135
'error': str(e)
136136
})
137137
}
138-
139-

lambda/aws-bedrock-data-automation/bda_project/project_config.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,29 @@ def __init__(self, project_details: Dict[str, Any]):
107107

108108
def _validate_required_fields(self) -> None:
109109
"""Validate required fields are present in project_details"""
110-
required_fields = ['projectName','modality']
110+
required_fields = ['project_name', 'modality']
111111
missing_fields = [field for field in required_fields
112112
if not self.project_details.get(field)]
113113
if missing_fields:
114114
raise ValueError(f"Missing required fields: {', '.join(missing_fields)}")
115115

116116
# Validate project stage
117-
if self.project_details['projectStage'] not in [e.value for e in ProjectStage]:
118-
raise ValueError(f"Invalid projectStage. Must be one of: {[e.value for e in ProjectStage]}")
117+
project_stage = self.project_details.get('project_stage')
118+
if project_stage and project_stage not in [e.value for e in ProjectStage]:
119+
raise ValueError(f"Invalid project_stage. Must be one of: {[e.value for e in ProjectStage]}")
119120

120121
def _get_standard_output_config(self) -> Dict[str, Any]:
121122
"""Get standard output configuration if present"""
122123
config = self.project_details.get('standardOutputConfiguration', {})
123124
modality = self.project_details.get('modality')
125+
126+
# If modality is not provided, return empty dict
127+
if not modality:
128+
return {}
129+
130+
# Convert modality to lowercase for case-insensitive comparison
131+
modality = modality.lower()
132+
124133
if modality == Modality.DOCUMENT.value:
125134
return {'document': self._get_document_config(config.get('document', {}))}
126135
elif modality == Modality.IMAGE.value:
@@ -234,9 +243,9 @@ def _get_audio_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
234243
def project_config(self) -> Dict[str, Any]:
235244
"""Get complete project configuration"""
236245
config = {
237-
'projectName': self.project_details['projectName'],
238-
'projectDescription': self.project_details.get('projectDescription','sample description'),
239-
'projectStage': self.project_details.get('projectStage','LIVE')
246+
'projectName': self.project_details.get('project_name'),
247+
'projectDescription': self.project_details.get('project_description', 'sample description'),
248+
'projectStage': self.project_details.get('project_stage', 'LIVE')
240249
}
241250

242251
# Add standard output configuration if present
@@ -268,7 +277,7 @@ def project_config(self) -> Dict[str, Any]:
268277

269278
def __str__(self) -> str:
270279
"""String representation of project configuration"""
271-
return f"ProjectConfig(name={self.project_details['projectName']}, stage={self.project_details['projectStage']})"
280+
return f"ProjectConfig(name={self.project_details.get('project_name')}, stage={self.project_details.get('project_stage')})"
272281

273282
class ListProjectsConfig:
274283
"""Configuration class for listing Bedrock Data Automation projects"""

lambda/aws-bedrock-data-automation/data_processing/data_processing.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def invoke_data_automation_async(
100100
notification_config: Optional[NotificationConfig] = None,
101101
data_automation_profile_arn: Optional[str] = None,
102102
tags: Optional[list] = None,
103+
**kwargs # To handle any additional parameters that might be passed but not used
103104
) -> Dict[str, Any]:
104105
"""
105106
Invoke data automation asynchronously
@@ -162,7 +163,11 @@ def invoke_data_automation_async(
162163

163164
# Add data automation profile ARN if provided
164165
if data_automation_profile_arn:
166+
# The AWS documentation shows this parameter as dataAutomationProfileArn
165167
request_params['dataAutomationProfileArn'] = data_automation_profile_arn
168+
logger.info("Added data automation profile ARN", extra={
169+
"data_automation_profile_arn": data_automation_profile_arn
170+
})
166171

167172
# Add encryption configuration if provided
168173
if encryption_config:
@@ -191,6 +196,7 @@ def invoke_data_automation_async(
191196
logger.info("Invoking data automation", extra={
192197
"request_params": request_params
193198
})
199+
194200
response = self.client.invoke_data_automation_async(**request_params)
195201

196202
logger.info("Successfully invoked data automation", extra={

lambda/aws-bedrock-data-automation/data_processing/lambda.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def get_env_var(var_name: str, default: str = None) -> str:
7575
raise ValueError(f"Environment variable {var_name} is not set")
7676
return value
7777

78-
#@logger.inject_lambda_context
78+
@logger.inject_lambda_context
7979
def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
8080
"""
8181
Lambda handler to process EventBridge events and invoke data automation
@@ -200,7 +200,7 @@ def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
200200
"notification_config": configs['notification_config']
201201
})
202202

203-
# Check and add data_automation_profile_arn
203+
# Add data_automation_profile_arn to configs if provided
204204
if profile_arn := detail.get('data_automation_profile_arn'):
205205
configs['data_automation_profile_arn'] = profile_arn
206206
logger.info("Data automation profile ARN added", extra={
@@ -258,3 +258,5 @@ def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
258258
'error': str(e)
259259
}
260260
}
261+
262+

lambda/aws-bedrock-data-automation/data_result/data_automation_result.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,17 @@ async def get_results(self, invocation_arn: str, wait: bool = False) -> Dict[str
150150
"path": segment["custom_output_path"]
151151
})
152152

153-
# Add standard output
154-
standard_result = self._read_s3_json(segment["standard_output_path"])
155-
results.append(standard_result)
156-
logger.debug("Added standard output", extra={
157-
"path": segment["standard_output_path"]
158-
})
153+
# Add standard output if path exists
154+
if "standard_output_path" in segment:
155+
standard_result = self._read_s3_json(segment["standard_output_path"])
156+
results.append(standard_result)
157+
logger.debug("Added standard output", extra={
158+
"path": segment["standard_output_path"]
159+
})
160+
else:
161+
logger.warning("Missing standard_output_path in segment", extra={
162+
"segment_id": segment.get("segment_id", "unknown")
163+
})
159164

160165
logger.info("Successfully retrieved results", extra={
161166
"invoke_arn": invocation_arn,

lambda/aws-bedrock-data-automation/data_result/lambda.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,5 @@ def handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
150150
Main Lambda handler that wraps the async handler
151151
"""
152152
return asyncio.run(data_automation_status(event, context))
153+
154+

layer/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
boto3>=1.37.5
1+
boto3>=1.38.1
22
botocore>=1.35.5
33

src/patterns/gen-ai/aws-bedrock-data-automation/README.md

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ const blueprintEventbridge = new EventbridgeToLambda(this, 'CreateBlueprintEvent
155155
}
156156
},
157157
});
158+
159+
// print input bucket
160+
new cdk.CfnOutput(this, 'inputbucketname', { value: bdaConstruct.bdaInputBucket!.bucketName });
161+
new cdk.CfnOutput(this, 'outputbucketname', { value: bdaConstruct.bdaOutputBucket!.bucketName });
158162
```
159163

160164
Python
@@ -200,33 +204,56 @@ Create a bp_event.json file with following event in your project directory.
200204
"Source": "custom.bedrock.blueprint",
201205
"DetailType": "Bedrock Blueprint Request",
202206
"Detail": {
203-
"blueprint_name": "noa_bp",
204-
"blueprint_type": "DOCUMENT",
205-
"blueprint_stage": "LIVE",
206-
"operation": "CREATE",
207-
"schema_fields": [ // This is a sample schema, replace this with your expected blueprint schema.
208-
{
209-
"name": "Total income",
210-
"description": "Please analyze the following Notice of assesment report and extract information about Total income.",
211-
"alias": "Total income"
212-
},
213-
{
214-
"name": "Taxable Income",
215-
"description": "Please analyze the following Notice of assesment report and extract information about Taxable income.",
216-
"alias": "Taxable Income"
217-
},
218-
{
219-
"name": "Tax payable",
220-
"description": "Please analyze the following Notice of assesment report and extract information about Tax payable.",
221-
"alias": "Tax payable"
207+
"blueprint_name": "custom_blueprint",
208+
"blueprint_type": "DOCUMENT",
209+
"blueprint_stage": "LIVE",
210+
"operation": "CREATE",
211+
"document_class": "Notice of Assessment", // sample file and fields, replace it with your file
212+
"document_description": "A document issued by tax authorities that summarizes tax assessment details",
213+
"schema_fields": [
214+
{
215+
"name": "Total income",
216+
"description": "Please analyze the following Notice of assesment report and extract information about Total income.",
217+
"alias": "Total income"
218+
},
219+
{
220+
"name": "Taxable Income",
221+
"description": "Please analyze the following Notice of assesment report and extract information about Taxable income.",
222+
"alias": "Taxable Income"
223+
},
224+
{
225+
"name": "Tax payable",
226+
"description": "Please analyze the following Notice of assesment report and extract information about Tax payable.",
227+
"alias": "Tax payable"
228+
}
229+
]
222230
}
223-
]
224-
}
225231
}
226232
]
227233
}
228234
```
229235

236+
OR you can also upload your schema json file and create following event
237+
238+
```json
239+
{
240+
"Entries": [
241+
{
242+
"Source": "custom.bedrock.blueprint",
243+
"DetailType": "Bedrock Blueprint Request",
244+
"Detail": {
245+
"blueprint_name": "custom_bp",
246+
"blueprint_type": "DOCUMENT",
247+
"blueprint_stage": "LIVE",
248+
"operation": "CREATE",
249+
"schema_file_name": "claims_form.json" // upload this schema file to input s3 bucket
250+
}
251+
}
252+
]
253+
}
254+
```
255+
256+
230257
send event bridge event using below command
231258

232259
```
@@ -483,6 +510,7 @@ Create a bda_event.json file using below event and then use following cli comman
483510
"stage":"LIVE"
484511
}],
485512
"dataAutomationProfileArn":"BDA-CRIS profile",
513+
"dataAutomationProfileArn":"arn:aws:bedrock:{region_name}:{account_id}:data-automation-profile/us.data-automation-v1"
486514
}
487515
}
488516
]

0 commit comments

Comments
 (0)