Log Parsing through Hadoop, Hive & Python

One of the primary analysis done on web access logs is some cohort analysis where one need to pull user access date time and along with other dimensions like user, ip, geo data, etc. Here I will be using Hadoop/ Hive/ Python to pull date, ip data from access log into Hadoop and run some queries. The example illustrates using Hadoop (version 0.20.1) streaming, SERDE, Hive’s (version 0.40) plugin customer mapper (get_access_log_ip).

The steps below load few thousand rows into a target table (dw_log_ip_test – data warehouse access log) “access_log_2010_01_25” then extract date from format like DD/Mon/YYYY:HH:MM:SS -800 to ‘DD/Mon/YYYY’ along with remote ip address through a Python streaming script.

Step 1: First create a table to access log (access_log_2010_01_25) and then load data into it.

CREATE TABLE access_log_2010_01_25 (
request_date STRING,
remote_ip STRING,
method STRING,
request STRING,
protocol STRING,
user STRING,
status STRING,
size STRING,
time STRING,
remote_host STRING,
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
"input.regex" = "\\[([^]]+)\\] ([^ ]*) \"([^ ]*) ([^ ]*) ([^ \"]*)\" user=([^ ]*) status=([^ ]*) size=([^ ]*) time=([^ ]*) host=([^ ]*) timestamp=([^ ]*) perf=([^ ]*)",
"output.format.string" = "%1$s %2$s \"%3$s %4$s %5$s\" user=%6$s status=%7$s size=%8$s time=%9$s host=%10$s timestamp=%11$s perf=%12$s"

hive> LOAD DATA LOCAL INPATH '/mnt/web_ser101/weblog_server101_20100125_1'
> OVERWRITE INTO TABLE access_log_2010_01_25;
#- After load the data in one of the record would look like:
#- 25/Jan/2010:13:14:05 -0800 GET /xmls/public/thumbnail.xml HTTP/1.1 - 302 250 0 abcd.com 1264454045 -

Step 2: Create a target test table

hive>  CREATE  TABLE  dw_log_ip_test (dt string, remote_ip string);

Step 3: In an editor of your choice, build a simple Python script (get_access_log_ip.py) that gets “date” string from “date/time” string and “remote_ip” address as below.

import sys
for line in sys.stdin.readlines():
line = line.strip()
fields = line.split('\t')
dt = fields[0].split(':')[0] #-- Get date 25/Jan/2010
ip = fields[1] #-- Get remote IP
print dt,"\t",ip

Step 4: Now extract data to dw_log_ip table and load only some limited data (10 seconds data)

hive>  FROM access_log_2010_01_25  L
> INSERT OVERWRITE TABLE dw_log_ip MAP L.request_date, L.remote_ip
> USING '/home/hadoop/hive/etl/scripts/get_access_log_ip.py' AS dt, remote_ip
> WHERE L.request_date > '25/Jan/2010:13:11:40'
> and L.request_date < '25/Jan/2010:13:11:50';

# Hive outputs some information like:
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
Ended Job = job_201001251623_0094, Tracking URL = http://hadoop_main:50030/jobdetails.jsp?jobid=job_201001251623_0094
Kill Command = /usr/local/hadoop/bin/hadoop job -Dmapred.job.tracker=hdfs://hadoop_main:9001 -kill job_201001251623_0094
2010-02-03 18:42:40,793 Stage-1 map = 0%, reduce = 0%
2010-02-03 18:42:53,874 Stage-1 map = 50%, reduce = 0%
2010-02-03 18:43:05,981 Stage-1 map = 100%, reduce = 0%
2010-02-03 18:43:09,012 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201001251623_0094
Ended Job = -416940662, job is filtered out (removed at runtime).
Launching Job 2 out of 2

Loading data to table dw_log_ip_test
11110 Rows loaded to dw_log_ip_test

#-- Now check the results...
hive > select dt, remote_ip from dw_log_ip;
hive > select dt, remote_ip, count(1)
> from dw_log_ip
> group by dt, remote_ip;