Precompiled Redshift Python DB adapter

Recently I built an application that uses AWS lambda to load data from datalake to Redshift at regular intervals.  The steps to compile the adapter suitable for AWS Lambda environment is given here.  I also uploaded it to github here and one can use it without having to go through compilation steps.

psycopg2_my_github_2017

 

Ref: https://hiregion.org/2017/12/02/precompiled-python-redshift-db-adapter-for-aws-lambda-env/

https://github.com/shivamy/psycopg2

Compiling Python Redshift DB adapter for AWS Lambda env.

AWS lambda has gained huge momentum in the last couple of years and enabled software architects/ developers to build FaaS (Function as a Service).  As much as Lambda helps in scaling applications, it has some limitations like execution duration or memory space availability, etc.   For long running jobs, typically in the backend or batch processing, 5 minute duration can be a deal breaker.  But with appropriate data partitions and architecture it is still an excellent option for enterprises to scale their applications and be cost effective.

In the recent project, I architected data be loaded from a datalake into Redshift.  The data is produced by an engine in batches and pushed to s3.  The data partitioned on time scale and a consumer Python application will load this data at regular intervals into Redshift staging environment.  For scalable solution datalake can be populated from multiple producers and similarly one or more consumers can drain the datalake queue to load to Redshift.  The data from multiple staging tables are then loaded to final table after deduping and data augmentation.

Read More »

Invalid data dump – Amazon Redshift, Data Pipeline and S3

Amazon Data Pipeline (DPL) is late entrant to the ETL market but provides many features that are well integrated to AWS cloud.  In any data extraction process one would encounter invalid or incorrect data and that data may either be logged or ignored depending on the business requirements or severity of rejected data.

When you have your data flow through S3 to other platforms, be it, Redshift, RDS, DynamoDB, etc. in AWS you can use S3 to dump that data.   In an example, similar to one DPL below, in one of the step you could filter and dump to S3 for later analysis.

By standardizing the rejections from different DPLs, another DPL can regularly load them back into Redshift for quick realtime analysis or deep dive into them downstream.  This will also greatly help in recovery and reruns when needed.

Following is simple high level steps where rejected data is directed to S3.  The parameters are provided through the environment setup.  For example: #{myDPL_schema_name} = ‘prod_public’ and #{myDPL_error_log_path} = ‘s3://emr_cluster/ad/clicks/…’

 

-- PreProcess
-- Load staging stable and at the same time update data error column in it when possible.
INSERT INTO #{myDPL_schema_name}.#{myDPL_staging_table}
SELECT col1,
col2,
col3,
etc
CASE WHEN
AS data_error
FROM #{myDPL_schema_name}.#{myDPL_source_table}
LEFT OUTER JOIN #{myDPL_schema_name}.table_1
ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_1
ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_N
ON ...
WHERE ...


-- OR, If data_error column is updated separately...
UPDATE #{myDPL_schema_name}.{myDPL_staging_table}
SET data_error = ...
FROM #{myDPL_schema_name}.{myDPL_staging_table}
JOIN #{myDPL_schema_name}.dim_1
JOIN #{myDPL_schema_name}.dim_N
WHERE ...

-- Temporary table
CREATE TEMP TABLE this_subject_dpl_rejections AS (
SELECT
...
FROM #{myDPL_schema_name}.#{myDPL_staging_table}
WHERE data_error IS NOT NULL
);

-- Dump to S3
UNLOAD ('SELECT * FROM this_subject_dpl_rejections')
TO '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
hh=#{format(@scheduledStartTime,'HH')}/ran_on_#{@actualStartTime}_file_'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
GZIP
PARALLEL OFF
ALLOWOVERWRITE;

Now load the errors back to Redshift…

COPY #{myDPL_schema_name}.#{myDPL_error_table}
FROM '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
hh=#{format(@scheduledStartTime,'HH')}/'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
DELIMITER '|'
GZIP
TRIMBLANKS
TRUNCATECLUNS
IGNOREBLANKLINES