The Spark script is a Glue Job using PySpark that is used to extract data from a JDBC source and save it to a DynamoDB table using non-parallel methods. The script supports full extracts and partial extracts, and can be used to continue with the extract on the next run, or reingest or rerun based on the success of the pipeline. The script supports the following JDBC engines: postgres and mysql.
The script starts by importing necessary modules such as GlueContext, transforms, SparkContext, and DataFrame from the awsglue package and functions and other required modules from the codebase package.
The script defines a main() function that starts by fetching the tracking table item using the get_tracking_table_item function from the codebase.aws.ddb package. Then it calls the determine_extract_plan function from the codebase.etl.extract package to determine the extract plan, passing it necessary parameters like provided_hwm_value, provided_lwm_value, extract_type and the tracking_table.
The script then updates the tracking table with new or existing data using the update_extract_tracking_table function from the codebase.aws.ddb package. If the row_count is -1 it means the pipeline is running OR it failed during the last run.
It then creates a JDBC url using the get_jdbc_url function from the codebase.etl.extract package, and a SQL where condition using the get_sql_where_condition function from the same package. It also converts the database namespaces using the convert_db_namespaces function and gets the pushdown query using the get_pushdown_query function.
The script then creates the JDBC parameters, that includes url, properties, table, and assigns it to _jdbc_params. The properties includes the user, password, driver, encrypt, trustServerCertificate, and applicationIntent.
The script then creates the source schema using the create_db_table_schema function from the codebase.etl.extract package and assigns it to source_schema.
The script then creates a Glue DynamicFrame using the glueContext.create_dynamic_frame.from_options method and passing it the JDBC parameters and the source schema.
The script then performs a repartition operation on the DynamicFrame using the repartition_dataframe parameter passed to it, and uses the num_partitions parameter to determine the number of partitions to be used.
The main()
function in this Spark JDBC extract code is responsible for extracting data from a JDBC source and saving
it to a DynamoDB table. The extract can be either a full extract or a partial extract, and can be resumed on the next
run or rerun based on the success of the pipeline.
The function first retrieves the tracking table item from DynamoDB, and then determines the extract plan based on the provided extract type, high and low watermark values, and the tracking table. The tracking table is then updated with new or existing data, with a default value of -1 for the number of rows extracted, indicating that the pipeline is either running or failed during the last run.
The tracking table also has the extract_successful
field that is either Y
or N
and is set to N
either while the
pipeline is running, or the last attempt has failed. This means the next run will accommodate for the previous fail, for
example, a PE run would try to extract the previous high watermark values and not increment until it has succeeded.
If the extract_successful
field is Y
it means the last run was successful.
The JDBC URL, SQL WHERE condition, extract table namespace, and pushdown query are then obtained using helper functions. The JDBC parameters, including the URL, user credentials, and driver, are set in a dictionary.
The source schema is created by inferring the data types of the columns in the extract table, and the data is then read
from the JDBC source using the read.jdbc()
method of the DataFrameReader class, with the JDBC parameters and schema
passed as arguments. The number of partitions is calculated based on the sample data, and the data is repartitioned
using the repartition()
method of the DataFrame class.
Finally, the data is written to DynamoDB using the write.dynamodb()
method of the DataFrameWriter class, with the
target table and mode specified as arguments. The number of rows extracted is then updated in the tracking table,
along with a flag indicating that the extract was successful.
Argument | Type | Description |
---|---|---|
extract_table | str | The name of the table to extract. This can be “schema_name.table_name” or just “table_name” |
db_engine | str | The database engine. Must be either ‘mysql’ or ‘postgres’. |
db_host | str | The hostname of the database server provided by the AWS Secret. |
db_port | int | The port number of the database server. |
db_name | str | The name of the database . |
db_user | str | The username to use for connecting to the database provided by the AWS Secret. |
db_password | str | The password to use for connecting to the database provided by the AWS Secret. |
hwm_col_name | str | The name of the high watermark column. |
hwm_column_type | str | The data type of the high watermark column. Must be either ‘int’ or ‘timestamp’. |
extract_type | str | The type of extract. Must be either ‘FE’ for full extract or ‘PE’ for partial extract. |
tracking_table_name | str | The name of the tracking table to use for storing the high and low watermark values. |
reingest | bool | Flag indicating whether to reingest data. If True, the extract will read data from the beginning of the table. If False, the extract will read data from the last high watermark value. |
hwm_value | Any | The high watermark value to use for the extract. If extract_type is ‘PE’, this value will be ignored in favor of the value in the tracking table. If extract_type is ‘FE’, this value will be used to determine the end of the extract. If hwm_column_type is ‘int’, this must be an int. If hwm_column_type is ‘timestamp’, this must be a string in the format ‘YYYY-MM-DD HH:MM:SS’. |
lwm_value | Any | The low watermark value to use for the extract. If extract_type is ‘PE’, this value will be ignored in favor of the value in the tracking table. If extract_type is ‘FE’, this value will be used to determine the start of the extract. If hwm_column_type is ‘int’, this must be an int. If hwm_column_type is ‘timestamp’, this must be a string in the format ‘YYYY-MM-DD HH: |
partition_column | bool | The column to use for partitioning the data. |
db_secret | str | The AWS Secrets Manager secret name that returns { "db_user" : "", "db_password": "", "db_host": ""} this is ideal as added security. |
lower_bound | str | This specifies the minimum value for the column used to partition the data. This is used to determine the range of data that should be extracted from the JDBC source in each partition. This is not used during Partial Extracts. |
upper_bound | str | This specifies the maximum value for the column used to partition the data. This is used to determine the range of data that should be extracted from the JDBC source in each partition. This is not used during Partial Extracts. |
num_partitions | int | used to specify the number of partitions that the resulting DataFrame should be repartitioned into. This can be useful in cases where the data is skewed, or if the number of rows in the table is much larger than the target number of rows per partition. |
fetchsize | int | This determines the number of rows that will be fetched at a time when the JDBC driver retrieves data from the database. This can be useful for optimizing the performance of the extract by allowing the driver to fetch data in smaller chunks, which can reduce the amount of memory required to hold the results in memory. However, setting the fetchsize too low can also lead to decreased performance, as it will require more round trips to the database to fetch all the data. |
repartition_dataframe | bool | The repartition_dataframe argument is a boolean value indicating whether or not the DataFrame should be repartitioned. If this argument is set to True, the number of partitions in the DataFrame will be adjusted to match the value of the num_partitions argument. This can be useful for improving the performance of the extract process, particularly if the data is skewed or if the target data store has a limited number of connections. However, repartitioning can also introduce overhead, so it should be used with caution and only when necessary. |
extract_s3_uri | str | The extract_s3_uri argument is the location in S3 where the extracted data will be saved. It is a string in the format s3://<bucket_name>/<path_to_folder> . |
extract_s3_partitions | str | A comma separated list of fields in the table being extracted. This specifies the number of partitions to use when writing the extracted data to S3. This can be useful for optimizing the performance of the extract process, as it allows the data to be distributed across multiple workers and written in parallel. It is generally recommended to set the number of partitions to a value that is large enough to fully utilize the available resources, but small enough to avoid overwhelming the S3 infrastructure. |
The DynamoDb table can be used to store the High Water Mark (HWM) and Low Water Mark (LWM) values for each extract table
that is being processed. These values can be used to determine the range of rows to extract from the JDBC source, for
example by using the determine_extract_plan
method in the codebase.etl.extract module. The HWM and LWM values can also
be used to check whether a previous extract job has been successful and whether a full or partial extract needs to be
done. Additionally, the DynamoDb table can be used to store metadata about the extract job, such as the number of rows
extracted and whether the extract was successful, and can be used to update the status of the extract job using the
update_extract_tracking_table
method in the codebase.aws.ddb module.
The glue job writes data to S3 by using the DataFrame.write
method with the .parquet()
format.
In this example, the extracted_dataframe is created from the jdbc source, converted to a DataFrame, and then written to
a specific location in S3 in parquet format. The mode is set to overwrite which means that if the files are already
present, they will be overwritten with the new files. The _extract_s3_uri is the destination S3 bucket and the
add_url_safe_current_time()
function is used to append a timestamp to the filename to make it unique.
Please note that the above code and associated documentation are provided as a reference and are not guaranteed to be error-free. This solution is a work in progress and may be subject to change. It is the responsibility of the user to thoroughly test and verify the correctness of the code before using it in production environments.