The generate_extract_config
function takes in a config object and processes it to create a list of event payloads for
the extract pipelines. The function first extracts the extract-related information from the config object and sets up
some defaults. It then iterates over the tables in the extract section of the config, creating a new configuration for
each table. The new configuration includes information about the source, the specific extract table, the job type and
source type, the database engine and secret, the port, the name of the database, and various extract-related parameters
such as the partition column, the lower and upper bounds, the extract type, the high watermark column name and type, the
low and high watermark values, the number of partitions, the fetch size, the worker number and type, and whether to
repartition the dataframe or extract S3 partitions. Finally, the function appends each new configuration to a list of
event arguments and returns the list.
The default
closure method is defined within the generate_extract_config
function and is used to handle default
arguments within the configuration. It takes in one parameter, item, which is the name of an extract-related parameter.
The method first checks if the table_config object, which is defined in the outer scope and holds the specific
configuration for the current table being processed, contains a value for the item parameter. If it does, the method
returns the value from the table_config
object. If not, it returns the default value for that parameter from the
defaults object, which is also defined in the outer scope and holds the default arguments for the entire extract
configuration.
This allows for the generate_extract_config function to handle both specific and default arguments for each table in the extract configuration. For example, if a specific table has a value for num_partitions in its configuration, that value will be used, otherwise the default value from defaults will be used.
config = {
"source_name": "my_source",
"extract": {
"job_type": "incremental",
"source_type": "rdbms",
"db_engine": "postgres",
"db_secret": "my_secret",
"db_port": "5432",
"db_name": "mydb",
"default_arguments": {
"partition_column": "id",
"lower_bound": "1",
"upper_bound": "10000",
"extract_type": "PE",
"hwm_col_name": "created_at",
"hwm_column_type": "TimestampType",
"lwm_value": "2022-01-01",
"hwm_value": "2022-02-01",
"repartition_dataframe": "True",
"extract_s3_partitions": "True",
"num_partitions": "10",
"fetchsize": "1000",
"worker_no": "4",
"worker_type": "m4.large"
},
"tables": {
"mydb.schema.table1": {
"hwm_col_name": "updated_at",
"num_partitions": "8"
},
"mydb.schema.table2": {
"extract_type": "FE",
"repartition_dataframe": "False"
},
"mydb.schema.table3": {}
}
}
}
event_arguments = generate_extract_config(config)
print(event_arguments)
The solution works with Yaml within the configuration layer, and the extract process first begins with an AWS Lambda function that makes use of the above method.
The lambda_handler
function is an AWS Lambda function that takes in an event payload and a context. The event payload
contains two key-value pairs: source_name
(a string representing the name of the config file stored in an S3 bucket)
and extract_tables
(a list of tables that are specified in the config file and are to be run in this extract Glue job)
.
The function is responsible for triggering an extract job by sending a payload to an AWS Step Function.
The function first logs the event payload and then uses the source_name
provided in the event payload to retrieve the
config file from an S3 bucket. The function then uses the generate_extract_config method to set up the config file into
a flat structure for the event payload for the extract pipelines.
The function then creates two lists: _tables_to_extract
and _tables_not_to_extract
. The _tables_to_extract
list is
populated with the extract tables that are specified in the _extract_tables
list provided in the event payload, and
additional data such as extract_s3_uri and tracking_table_name is added to each extract item in the list. The
_tables_not_to_extract
list is populated with the extract tables that are not specified in the _extract_tables
list
provided in the event payload.
The lambda function returns a dictionary with two keys: status_code and tables_to_extract. The status_code key has a value of 200, indicating that the function has executed successfully. The tables_to_extract key contains a list of dictionaries, where each dictionary represents a table to be extracted and includes additional data needed for the extract job, such as extract_s3_uri, tracking_table_name, etc. The tables contained in the list are specified in the extract_tables key of the event payload passed to the function. The function also logs the extract tables that will not be extracted in this run.
This is a JSON object that the AWS Lambda function returns when it’s invoked. It has two key-value pairs:
This object is returned by the lambda function, which is triggered by an event object, containing the source_name and extract_tables keys. The function retrieves the configuration file from an S3 bucket, and generates a list of tables to extract and their corresponding data that is necessary to execute the extract job. This list is returned by the function as the value of the tables_to_extract key, and it will be sent as a payload to the AWS Step Function, which will trigger the extract job based on the job_type specified in the configuration file.
Finally, this payload is submitted to the Extract Job Run.
Please note that the above code and associated documentation are provided as a reference and are not guaranteed to be error-free. This solution is a work in progress and may be subject to change. It is the responsibility of the user to thoroughly test and verify the correctness of the code before using it in production environments.