AWS Sagemaker and CNN for Dog Breed Classification

Motivation for this post came from a recent image classification project I did where given a dog image the application would identify its breed.   It turned out be more interesting and enjoyable than when I started the work.  Even though I ended up spending substantial time learning some new concepts; refreshing linear algebra and calculus; reading other related articles and research papers.

In this project I used AWS Sagemaker, a Machine Learning (ML) platform to build, train and deploy the models.  It provides all needed components including prebuilt image suitable for ML projects, Jupyter Notebook environment, infrastructure to deploy with single click from notebook, etc. It uses a ResNet CNN that can be trained from scratch, or trained using transfer learning when a large number of training images are not available.

If you want to jump to notebook code here.

Neural Network (NN)

Neural networks draw inspiration from their counter part in biological neural networks. There are many NN machine learning algorithms based on those including perceptron, Hopfield networks, CNN, RNN, LSTM, etc.  Here in the article I will be briefly covering perceptron and CNN.

Read More »

Meltdown and Spectre patch effects on Mac performance

With recently identified major flaws in modern CPUs, all major software and (CPU) hardware companies rushed to provide as quick solutions as possible to their systems.  For more detailed information checkout this site and for a programmer’s view or to test on a linux system try this.

Since the computer OS (Operating System) kernel space is highly protected (as well as any other process space) and isolated from other processes interference, any breakdown in this will lead to major issues.  Quote from the (Meltdown) paper – “The attack is independent of the operating system, and it does not rely on any software vulnerabilities. Meltdown breaks all security assumptions given by address space isolation as well as paravirtualized environments and, thus, every security mechanism building upon this foundation”

And paper also provides the details on the scope of  the issue which affects all modern computers/ phones!! Quote – “On affected systems, Meltdown enables an adversary to read memory of other processes or virtual machines in the cloud without any permissions or privileges, affecting millions of customers and virtually every user of a personal computer” 

(PS: Bold highlighting added my me)

Reading through the paper and looking at the example code snippet below took me back to the days when I did some assembly level programming on Intel 8086 series. It was fun, challenging and interesting.

1 ; rcx = kernel address
2 ; rbx = probe array
3 retry:
4 mov al, byte [rcx]
5 shl rax, 0xc
6 jz retry
7 mov rbx, qword [rbx + rax]

Read More »

Precompiled Redshift Python DB adapter

Recently I built an application that uses AWS lambda to load data from datalake to Redshift at regular intervals.  The steps to compile the adapter suitable for AWS Lambda environment is given here.  I also uploaded it to github here and one can use it without having to go through compilation steps.

psycopg2_my_github_2017

 

Ref: https://hiregion.org/2017/12/02/precompiled-python-redshift-db-adapter-for-aws-lambda-env/

https://github.com/shivamy/psycopg2

Compiling Python Redshift DB adapter for AWS Lambda env.

AWS lambda has gained huge momentum in the last couple of years and enabled software architects/ developers to build FaaS (Function as a Service).  As much as Lambda helps in scaling applications, it has some limitations like execution duration or memory space availability, etc.   For long running jobs, typically in the backend or batch processing, 5 minute duration can be a deal breaker.  But with appropriate data partitions and architecture it is still an excellent option for enterprises to scale their applications and be cost effective.

In the recent project, I architected data be loaded from a datalake into Redshift.  The data is produced by an engine in batches and pushed to s3.  The data partitioned on time scale and a consumer Python application will load this data at regular intervals into Redshift staging environment.  For scalable solution datalake can be populated from multiple producers and similarly one or more consumers can drain the datalake queue to load to Redshift.  The data from multiple staging tables are then loaded to final table after deduping and data augmentation.

Read More »

Invalid data dump – Amazon Redshift, Data Pipeline and S3

Amazon Data Pipeline (DPL) is late entrant to the ETL market but provides many features that are well integrated to AWS cloud.  In any data extraction process one would encounter invalid or incorrect data and that data may either be logged or ignored depending on the business requirements or severity of rejected data.

When you have your data flow through S3 to other platforms, be it, Redshift, RDS, DynamoDB, etc. in AWS you can use S3 to dump that data.   In an example, similar to one DPL below, in one of the step you could filter and dump to S3 for later analysis.

By standardizing the rejections from different DPLs, another DPL can regularly load them back into Redshift for quick realtime analysis or deep dive into them downstream.  This will also greatly help in recovery and reruns when needed.

Following is simple high level steps where rejected data is directed to S3.  The parameters are provided through the environment setup.  For example: #{myDPL_schema_name} = ‘prod_public’ and #{myDPL_error_log_path} = ‘s3://emr_cluster/ad/clicks/…’

 

-- PreProcess
-- Load staging stable and at the same time update data error column in it when possible.
INSERT INTO #{myDPL_schema_name}.#{myDPL_staging_table}
SELECT col1,
col2,
col3,
etc
CASE WHEN
AS data_error
FROM #{myDPL_schema_name}.#{myDPL_source_table}
LEFT OUTER JOIN #{myDPL_schema_name}.table_1
ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_1
ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_N
ON ...
WHERE ...


-- OR, If data_error column is updated separately...
UPDATE #{myDPL_schema_name}.{myDPL_staging_table}
SET data_error = ...
FROM #{myDPL_schema_name}.{myDPL_staging_table}
JOIN #{myDPL_schema_name}.dim_1
JOIN #{myDPL_schema_name}.dim_N
WHERE ...

-- Temporary table
CREATE TEMP TABLE this_subject_dpl_rejections AS (
SELECT
...
FROM #{myDPL_schema_name}.#{myDPL_staging_table}
WHERE data_error IS NOT NULL
);

-- Dump to S3
UNLOAD ('SELECT * FROM this_subject_dpl_rejections')
TO '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
hh=#{format(@scheduledStartTime,'HH')}/ran_on_#{@actualStartTime}_file_'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
GZIP
PARALLEL OFF
ALLOWOVERWRITE;

Now load the errors back to Redshift…

COPY #{myDPL_schema_name}.#{myDPL_error_table}
FROM '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
hh=#{format(@scheduledStartTime,'HH')}/'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
DELIMITER '|'
GZIP
TRIMBLANKS
TRUNCATECLUNS
IGNOREBLANKLINES

Amazon CloudTrail parsing for specific API call using Spark

Recently I needed to parse Amazon Web Service CloudTrail log files to check for some specific API call.  The call was being deprecated by Amazon and we needed to upgrade our code to reflect the latest CLI (aws).  The logs I got from IT folks were json files and each file contained only one single line each with length anywhere from 0.5MB to 1MB! And this one line had array of json objects like below.

Problem:  Identify an application that is making some specific API calls to a service.   Not much other info was available and all we had was bunch of CloudTrail logs and AWS deprecated API call (DescribeJobFlows).
See – http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeJobFlows.html which basically suggested “DescribeJobFlows: This API is deprecated and will eventually be removed. We recommend you use…”

{“Records”: [ {“eventVersion”:”1.02″,”userIdentity”:{“type”:”IAMUser”,”principalId”:”AIDAIVWSMF………”,”arn”:”arn:aws:iam::144……….:user/staging-user”,”accountId”:”14    4702935796″,”accessKeyId”:”………….”,”userName”:”………….”},”eventTime”:”2016-…..T13:50:16Z”,”eventSource”:”datapipeline.amazonaws.com”,”eventName”:”Report    TaskProgress”,”awsRegion”:”us-east-1″,”sourceIPAddress”:”54.2…….”,”userAgent”:”aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23    .25-b01/1.6.0_33″,”requestParameters”:{“taskId”:”37kSdZbBpwRjSng–/1Tms+pYiDGRURfXkPWl90Azl893QqHry1pryPTkUzLed5u1O2KIK9qh8pM1fk8fPBHzGbHDWhk0VTWFru3wJnanhJSuYLkEZ0fSmC….”},”responseElements”:{“canceled”:false},”requestID”:”65ac3d1a-476e-11e6-a25c-f339ebc3012a”,”eventID”:”4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69″,”eventType”:”AwsApiC    all”,”recipientAccountId”:”144……..”},….

It is actually array of JSON objects and for more info on the log see http://docs.aws.amazon.com/awscloudtrail/latest/userguide/view-cloudtrail-events-cli.html

Even before developing some code I wanted to test couple of ideas.  Following is one of the way I was able to identify the offending application and host(s).  After the successful evaluation, I wrote Python script to upload hundreds of files to S3’s new bucket and performed minor transformation to clean the data so that data was suitable for loading to Spark.

For quick readable json you can do:

> vim 144………_CloudTrail_us-east-1_20160711T1355Z_n0m854FRPqpkaPd3.json
 # and then using python json module
(in vim) :%!python -m json.tool

to see
    1  {                                                                                                                                                                          
2 "Records": [
3 {
4 "awsRegion": "us-east-1",
5 "eventID": "4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69",
6 "eventName": "ReportTaskProgress",
7 "eventSource": "datapipeline.amazonaws.com",
8 "eventTime": "2016-.....T13:50:16Z",
9 "eventType": "AwsApiCall",
10 "eventVersion": "1.02",
11 "recipientAccountId": "144...",
12 "requestID": "65ac3d1a-476e-11e6-a25c-f339ebc3012a",
13 "requestParameters": {
14 "taskId": "37kSdZbBpwRjSng/1Tms+p....
16 "responseElements": {
17 "canceled": false
18 },
19 "sourceIPAddress": "54........",
20 "userAgent": "aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23.25-b01/1.6.0_33",
21 "userIdentity": {
22 "accessKeyId": "AK...",
23 "accountId": "144...",
24 "arn": "arn:aws:iam::144....:user/staging-user",
25 "principalId": "...",
26 "type": "IAMUser",
27 "userName": "....-user"
28 }
29 },
30 {
 

will spit out readable output.   Next is performing some cleanup and have one json object per line.

(in vim) :1,$s#\{\”Records\”\:\ \[##     — vim removal of header
(in vim) :1,$s#\}\,#\}^V^M#g               — vim substitute each record onto it’s own line

I then loaded his file into Spark table and then run SparkSQL on it.   See screenshots below.

Note “DescribeJobFLows” calls above originating from an EMR instance on a specific IP.