Precompiled Redshift Python DB adapter

Recently I built an application that uses AWS lambda to load data from datalake to Redshift at regular intervals.  The steps to compile the adapter suitable for AWS Lambda environment is given here.  I also uploaded it to github here and one can use it without having to go through compilation steps.




Compiling Python Redshift DB adapter for AWS Lambda env.

AWS lambda has gained huge momentum in the last couple of years and enabled software architects/ developers to build FaaS (Function as a Service).  As much as Lambda helps in scaling applications, it has some limitations like execution duration or memory space availability, etc.   For long running jobs, typically in the backend or batch processing, 5 minute duration can be a deal breaker.  But with appropriate data partitions and architecture it is still an excellent option for enterprises to scale their applications and be cost effective.

In the recent project, I architected data be loaded from a datalake into Redshift.  The data is produced by an engine in batches and pushed to s3.  The data partitioned on time scale and a consumer Python application will load this data at regular intervals into Redshift staging environment.  For scalable solution datalake can be populated from multiple producers and similarly one or more consumers can drain the datalake queue to load to Redshift.  The data from multiple staging tables are then loaded to final table after deduping and data augmentation.

Read More »

Amazon CloudTrail parsing for specific API call using Spark

Recently I needed to parse Amazon Web Service CloudTrail log files to check for some specific API call.  The call was being deprecated by Amazon and we needed to upgrade our code to reflect the latest CLI (aws).  The logs I got from IT folks were json files and each file contained only one single line each with length anywhere from 0.5MB to 1MB! And this one line had array of json objects like below.

Problem:  Identify an application that is making some specific API calls to a service.   Not much other info was available and all we had was bunch of CloudTrail logs and AWS deprecated API call (DescribeJobFlows).
See – which basically suggested “DescribeJobFlows: This API is deprecated and will eventually be removed. We recommend you use…”

{“Records”: [ {“eventVersion”:”1.02″,”userIdentity”:{“type”:”IAMUser”,”principalId”:”AIDAIVWSMF………”,”arn”:”arn:aws:iam::144……….:user/staging-user”,”accountId”:”14    4702935796″,”accessKeyId”:”………….”,”userName”:”………….”},”eventTime”:”2016-…..T13:50:16Z”,”eventSource”:””,”eventName”:”Report    TaskProgress”,”awsRegion”:”us-east-1″,”sourceIPAddress”:”54.2…….”,”userAgent”:”aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23    .25-b01/1.6.0_33″,”requestParameters”:{“taskId”:”37kSdZbBpwRjSng–/1Tms+pYiDGRURfXkPWl90Azl893QqHry1pryPTkUzLed5u1O2KIK9qh8pM1fk8fPBHzGbHDWhk0VTWFru3wJnanhJSuYLkEZ0fSmC….”},”responseElements”:{“canceled”:false},”requestID”:”65ac3d1a-476e-11e6-a25c-f339ebc3012a”,”eventID”:”4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69″,”eventType”:”AwsApiC    all”,”recipientAccountId”:”144……..”},….

It is actually array of JSON objects and for more info on the log see

Even before developing some code I wanted to test couple of ideas.  Following is one of the way I was able to identify the offending application and host(s).  After the successful evaluation, I wrote Python script to upload hundreds of files to S3’s new bucket and performed minor transformation to clean the data so that data was suitable for loading to Spark.

For quick readable json you can do:

> vim 144………_CloudTrail_us-east-1_20160711T1355Z_n0m854FRPqpkaPd3.json
 # and then using python json module
(in vim) :%!python -m json.tool

to see
    1  {                                                                                                                                                                          
2 "Records": [
3 {
4 "awsRegion": "us-east-1",
5 "eventID": "4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69",
6 "eventName": "ReportTaskProgress",
7 "eventSource": "",
8 "eventTime": "2016-.....T13:50:16Z",
9 "eventType": "AwsApiCall",
10 "eventVersion": "1.02",
11 "recipientAccountId": "144...",
12 "requestID": "65ac3d1a-476e-11e6-a25c-f339ebc3012a",
13 "requestParameters": {
14 "taskId": "37kSdZbBpwRjSng/1Tms+p....
16 "responseElements": {
17 "canceled": false
18 },
19 "sourceIPAddress": "54........",
20 "userAgent": "aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23.25-b01/1.6.0_33",
21 "userIdentity": {
22 "accessKeyId": "AK...",
23 "accountId": "144...",
24 "arn": "arn:aws:iam::144....:user/staging-user",
25 "principalId": "...",
26 "type": "IAMUser",
27 "userName": "....-user"
28 }
29 },
30 {

will spit out readable output.   Next is performing some cleanup and have one json object per line.

(in vim) :1,$s#\{\”Records\”\:\ \[##     — vim removal of header
(in vim) :1,$s#\}\,#\}^V^M#g               — vim substitute each record onto it’s own line

I then loaded his file into Spark table and then run SparkSQL on it.   See screenshots below.

Note “DescribeJobFLows” calls above originating from an EMR instance on a specific IP.

Purging Vertica tables/ partitions at regular intervals

First thing first, especially, when there is potential to misunderstand due to definitions of specific words – purging vs truncating vs deleting data. In this article, my interest is to purge the tables of deleted rows.

Truncate => Removes all storage associated with a table, while preserving the table definitions.
Purge => Permanently removes deleted data from physical storage so that the disk space can be reused.  You can purge historical data up to and including the epoch in which the Ancient History Mark is contained.
Delete => It marks tuples as no longer valid in the current epoch. It does not delete data from disk storage for base tables.

Vertica purge_table or purge_partitions statement purges all projections of the specified table and can temporarily take significant disk space while performing the purge.

Following query returns list of queries that when executed purges data for each partitioned tables.

   — Look into only large tables (TABLE_SIZE_OF_INTEREST), say 1Billion row,
   —  that have good chunk deletes (>20%)
   SELECT ‘SELECT purge_partition(”’|| P.projection_schema ||’.’|| P.anchor_table_name || ”’,”’||PRT.partition_key||”’);’
    FROM projections P inner join
    (   SELECT projection_id, partition_key
        ,SUM(ros_row_count) as total_rows
        FROM partitions
        GROUP BY 1,2
        HAVING SUM(deleted_row_count)/SUM(ros_row_count) > 0.2
    ) PRT
    ON PRT.projection_id = P.projection_id
    GROUP BY PRT.partition_key, P.projection_schema, P.anchor_table_name
    HAVING max(total_rows) <= {TABLE_SIZE_OF_INTEREST}
    ORDER BY max(total_rows) DESC
    LIMIT 25 ;

Similarly for those tables that are non-partitioned use following query.

    SELECT ‘select purge_table(”’|| projection_schema ||’.’|| anchor_table_name || ”’);’
    FROM projections P inner join
    (   SELECT schema_name, projection_name ,
            sum(deleted_row_count) delete_rows ,
            sum(delete_vector_count) delete_vector_count ,
            sum(total_row_count) total_rows,
            sum(deleted_row_count)/sum(total_row_count) as delete_percent
        FROM storage_containers
        GROUP BY 1,2
        HAVING sum(deleted_row_count)/sum(total_row_count) > 0.2
    ) A
    ON A.schema_name = P.projection_schema
      AND A.projection_name = P.projection_name
    INNER JOIN tables T on P.anchor_table_id = T.table_id
    WHERE length(T.partition_expression) = 0
    GROUP BY 1
    HAVING max(total_rows) <= {TABLE_SIZE_OF_INTEREST}
    ORDER BY max(total_rows) DESC
    LIMIT 25 ;

With a wrapper around these queries in Python/ Perl/ Bash, etc one can easily go through the list executing each statement and cleaning up old deleted data for improved performance.