What is Customer Lifetime Value ?

In contractual insurance business model, Customer Lifetime Value (CLV) is the single most important metric for understanding our customers.  In insurance, the importance of CLV is similar to the credit score in bank and credit card companies. CLV helps us to make important business decisions about sales, marketing, product development, and customer support. For instance:

  • Marketing: How much should I spend to acquire a customer?
  • Product: How can I offer products and services tailored for my best customers?
  • Customer Support: How much should I spend to service and retain a customer?
  • Sales: We type of customer should sales reps spend the most effort to target for?

The heart of CLV is the notion of the retention rate, or retention probability.  An important managerial task is to take a series of past retention numbers for a given group of customers and project them into the future to make more accurate predictions about customer tenure, lifetime value, and so on. A defining characteristic of contractual business setting is that the departure of a customer is observed, called customer attrition. As we seek to understand the difference of customer behaviors in attrition and retention, we created machine learning models by analyzing data about millions of customers and their past retention / attrition numbers.  The data contains information about auto, property and life policies (C360), customer demographics (Acixom), customer segmentation (LifeScape), customer billing process and customer journey map.  Once the model is trained and optimized, it is able to predict the retention probability of a customer in any future tenure with the firm, from which we can aggregate and predict the expected tenure of a customer over a period of time.

The expected tenure, predicted by machine learning models trained on billion rows of data, is then used to multiply with the expected customer value to calculate the CLV of our customers. The CLV can be widely used in lots of decision making process, and can have a dramatic impact throughout our business.

 

Chatbots: A new frontline to empower convenient intelligence for insurance industry

 

We human beings have been shackled to keyboards and monitors for more than forty years. Now it is time for a change. We need a new intelligent and universal mean of interaction that can hold voice or text conversations to link humans and machines, and to connect customers and companies.  In this new wave of technology, conversation is the new interface, and chatbots are the new applications. The power of the natural language processor, software that process and parse human language, enable Chabots to create a simple and universal means of interacting with technology.

Comprehending human conversation is no easy task for a computer, and companies all over the world have sunk hundreds of millions of dollars into the effort. In our lab, we have successfully built a series of product assistant chatbots to help our agents querying and searching product knowledge extracted from product reference manual website. Our chatbots is based on Google chatbot platform api.ai, and integrates text/voice input from Slack, Skype, Facebook Messenger, Mobile Applications and Amazon Alexa.  Our knowledge base was constructed by scraping sharepoint website, and transforming scrapped unstructured text to knowledge base.  Our knowledge query APIs were based on Elastic Search.

2017-05-15_2336.png

Build a service that never sleeps

Building Chatbots is an easy way to streamline that interaction by providing information to a customer faster and more efficiently than a customer service representative.

Never miss a meeting again

Even for processes not dependent upon customer service, chatbots can still be useful for any number of reasons. Just consider an administrative bot that schedules meetings for you. While it may seem like a mundane task that doesn’t need its own bot, this kind of practicality is incredibly useful and is a big time saver.

Access information in a more productive and consistent way

Have you thought how many time we waste each day on finding information in complicated computer systems or search through unorganized web content?  Even sometimes we know where the information is located, to ensure not missing the important updates, we have to check and confirm the correct version of content again and again? A well-designed Chatbot can save all these headaches.  By asking the correct questions, we can command chatbots to retrieve information efficiently from the consistent repositories.

Empower the convenient intelligence

Chatbots are the frontline of our enterprise level intelligent applications. Let’s Imagine an intriguing scene: one day our chatbots have integrated services such as recommender system, customer life time value scoring, insurance quoting API, telematics data analysis and real-time claim processing, etc., and are capable of delivering convenience to our customers at anytime and anyplace. We all know that day is not far away, and we are all excited to see its coming as a new dawn of revolutionized insurance industry.

 

Mining Temporal Patterns using an MPI cluster on AWS: Part 1

selection_005

Mining temporal patterns are important issues in enterprise data science.  At a recent summit at Stanford University,  I heard a speech from Prof. Jure Leskovec and he mentioned that temporal patterns in customer digital footprints yield best accuracies in customer behavior prediction, cross-sale recommendation, etc., when compared with other static customer profiles.  Since temporal patterns are so useful, what holds off it being widely applied in practical data science? I think there are two main challenges:

  • The raw data that contains temporal patterns is difficult to gather, especially the data about social individuals  (i.e., customers, agents, organizations).  Most of existing methods are based on studies on sensor signals, but undoubtedly collecting similar data from human is more challenging because of the privacy issues and potential costs.
  • Temporal patterns are difficult to analyze.  Adding the additional time dimension could complicate the problem, usually may result in a combinatorial explosion, thus makes it computational intensive.

This article is based on an ongoing series of experiments in collaboration with our university partners.  We have developed a temporal pattern mining algorithm using Message Passing Interface (MPI) to speed up pattern search in large scale temporal claim notes data.  Since in our firm there is no MPI environment, we tested the algorithm on Amazon AWS.  This blog will focus on the process to setup MPI cluster on AWS platform using StarCluster (http://star.mit.edu/cluster/), a great open-source tool developed at MIT.

1. Installing StarCluster

Getting StarCluster work was not smooth, and I was stuck for a day trying to install it on Mac but struggled with openssl package issue.  I gave up later and used ubuntu linux as local environment and installed StarCluster successfully.  However, it seems the StarCluster git repository hasn’t been update to reflect some recent bugs found relevant to NFS processes (http://star.mit.edu/cluster/mlarchives/2612.html), so I ended up with changing some of the code myself and recompile/install the package.

I put the changed version of StarCluster on my git:

https://github.com/cyberyu/starcluster_journeymap

where the changes I made were on

https://github.com/cyberyu/starcluster_journeymap/blob/master/starcluster/node.py

line 705 to line 706, and line 720 to line 721.

And the installation of StarCluster can be followed as (http://star.mit.edu/cluster/docs/latest/installation.html)

2. StarCluster config file and AMI image

I preinstalled everything in a running instance and loaded the data, then transformed its to an AMI image.  This blog (http://cs.smith.edu/dftwiki/index.php/Tutorial:_Create_an_MPI_Cluster_on_the_Amazon_Elastic_Cloud_(EC2)) gives a clear explanation about the process.

Below is the config file I used to create the MPI cluster.  I pre-setup an EBS data volume and AMI image and uncommented the relevant settings in the config file.

####################################
## StarCluster Configuration File ##
####################################
[global]
# Configure the default cluster template to use when starting a cluster
# defaults to 'smallcluster' defined below. This template should be usable
# out-of-the-box provided you've configured your keypair correctly
DEFAULT_TEMPLATE=smallcluster
# enable experimental features for this release
#ENABLE_EXPERIMENTAL=True
# number of seconds to wait when polling instances (default: 30s)
#REFRESH_INTERVAL=15
# specify a web browser to launch when viewing spot history plots
#WEB_BROWSER=chromium
# split the config into multiple files
#INCLUDE=~/.starcluster/aws, ~/.starcluster/keys, ~/.starcluster/vols

#############################################
## AWS Credentials and Connection Settings ##
#############################################
[aws info]
# This is the AWS credentials section (required).
# These settings apply to all clusters
# replace these with your AWS keys
AWS_ACCESS_KEY_ID = ****
AWS_SECRET_ACCESS_KEY = ****
# replace this with your account number
AWS_USER_ID= ****
# Uncomment to specify a different Amazon AWS region  (OPTIONAL)
# (defaults to us-east-1 if not specified)
# NOTE: AMIs have to be migrated!
AWS_REGION_NAME = eu-east-1
#AWS_REGION_HOST = ec2.eu-west-1.amazonaws.com
# Uncomment these settings when creating an instance-store (S3) AMI (OPTIONAL)
#EC2_CERT = /path/to/your/cert-asdf0as9df092039asdfi02089.pem
#EC2_PRIVATE_KEY = /path/to/your/pk-asdfasd890f200909.pem
# Uncomment these settings to use a proxy host when connecting to AWS
#AWS_PROXY = your.proxyhost.com
#AWS_PROXY_PORT = 8080
#AWS_PROXY_USER = yourproxyuser
#AWS_PROXY_PASS = yourproxypass

###########################
## Defining EC2 Keypairs ##
###########################
# Sections starting with "key" define your keypairs. See "starcluster createkey
# --help" for instructions on how to create a new keypair. Section name should
# match your key name e.g.:
[key mykey]
KEY_LOCATION=~/awskeys/mykey.pem

# You can of course have multiple keypair sections
# [key myotherkey]
# KEY_LOCATION=~/.ssh/myotherkey.rsa

################################
## Defining Cluster Templates ##
################################
# Sections starting with "cluster" represent a cluster template. These
# "templates" are a collection of settings that define a single cluster
# configuration and are used when creating and configuring a cluster. You can
# change which template to use when creating your cluster using the -c option
# to the start command:
#
#     $ starcluster start -c mediumcluster mycluster
#
# If a template is not specified then the template defined by DEFAULT_TEMPLATE
# in the [global] section above is used. Below is the "default" template named
# "smallcluster". You can rename it but dont forget to update the
# DEFAULT_TEMPLATE setting in the [global] section above. See the next section
# on defining multiple templates.

#[plugin mpich2]
#setup_class = starcluster.plugins.mpich2.MPICH2Setup

[cluster smallcluster]
#plugins = mpich2
# change this to the name of one of the keypair sections defined above
KEYNAME = mykey
# number of ec2 instances to launch
CLUSTER_SIZE = 10
# create the following user on the cluster
CLUSTER_USER = ***
# optionally specify shell (defaults to bash)
# (options: tcsh, zsh, csh, bash, ksh)
CLUSTER_SHELL = bash
# Uncomment to prepent the cluster tag to the dns name of all nodes created
# using this cluster config.  ie: mycluster-master and mycluster-node001
# If you choose to enable this option, it's recommended that you enable it in
# the DEFAULT_TEMPLATE so all nodes will automatically have the prefix
# DNS_PREFIX = True
# AMI to use for cluster nodes. These AMIs are for the us-east-1 region.
# Use the 'listpublic' command to list StarCluster AMIs in other regions
# The base i386 StarCluster AMI is ami-9bf9c9f2
# The base x86_64 StarCluster AMI is ami-3393a45a
# The base HVM StarCluster AMI is ami-6b211202
NODE_IMAGE_ID = ****
# instance type for all cluster nodes
# (options: m3.large, c3.8xlarge, i2.8xlarge, t2.micro, hs1.8xlarge, c1.xlarge, r3.4xlarge, g2.2xlarge, m1.small, c1.medium, m3.2xlarge, c3.2xlarge, m2.xlarge, m2.2xlarge, t2.small, r3.2xlarge, t1.micro, cr1.8xlarge, r3.8xlarge, cc1.4xlarge, m1.medium, r3.large, c3.xlarge, i2.xlarge, m3.medium, cc2.8xlarge, m1.large, cg1.4xlarge, i2.2xlarge, c3.large, i2.4xlarge, c3.4xlarge, r3.xlarge, t2.medium, hi1.4xlarge, m2.4xlarge, m1.xlarge, m3.xlarge)
NODE_INSTANCE_TYPE = m3.large
# Launch cluster in a VPC subnet (OPTIONAL)
SUBNET_ID=subnet-86f506ac
# Uncomment to assign public IPs to cluster nodes (VPC-ONLY) (OPTIONAL)
# WARNING: Using public IPs with a VPC requires:
# 1. An internet gateway attached to the VPC
# 2. A route table entry linked to the VPC's internet gateway and associated
#    with the VPC subnet with a destination CIDR block of 0.0.0.0/0
# WARNING: Public IPs allow direct access to your VPC nodes from the internet
#PUBLIC_IPS=True
# Uncomment to disable installing/configuring a queueing system on the
# cluster (SGE)
#DISABLE_QUEUE=True
# Uncomment to specify a different instance type for the master node (OPTIONAL)
# (defaults to NODE_INSTANCE_TYPE if not specified)
#MASTER_INSTANCE_TYPE = m1.small
# Uncomment to specify a separate AMI to use for the master node. (OPTIONAL)
# (defaults to NODE_IMAGE_ID if not specified)
#MASTER_IMAGE_ID = ami-3393a45a (OPTIONAL)
# availability zone to launch the cluster in (OPTIONAL)
# (automatically determined based on volumes (if any) or
# selected by Amazon if not specified)
#AVAILABILITY_ZONE = us-east-1c
# list of volumes to attach to the master node (OPTIONAL)
# these volumes, if any, will be NFS shared to the worker nodes
# see "Configuring EBS Volumes" below on how to define volume sections
#VOLUMES = oceandata, biodata
# list of plugins to load after StarCluster's default setup routines (OPTIONAL)
# see "Configuring StarCluster Plugins" below on how to define plugin sections
#PLUGINS = myplugin, myplugin2
# list of permissions (or firewall rules) to apply to the cluster's security
# group (OPTIONAL).
#PERMISSIONS = ssh, http
# Uncomment to always create a spot cluster when creating a new cluster from
# this template. The following example will place a $0.50 bid for each spot
# request.
#SPOT_BID = 0.50
# Uncomment to specify one or more userdata scripts to use when launching
# cluster instances. Supports cloudinit. All scripts combined must be less than
# 16KB
#USERDATA_SCRIPTS = /path/to/script1, /path/to/script2

###########################################
## Defining Additional Cluster Templates ##
###########################################
# You can also define multiple cluster templates. You can either supply all
# configuration options as with smallcluster above, or create an
# EXTENDS=<cluster_name> variable in the new cluster section to use all
# settings from <cluster_name> as defaults. Below are example templates that
# use the EXTENDS feature:

# [cluster mediumcluster]
# Declares that this cluster uses smallcluster as defaults
# EXTENDS=smallcluster
# This section is the same as smallcluster except for the following settings:
# KEYNAME=myotherkey
# NODE_INSTANCE_TYPE = c1.xlarge
# CLUSTER_SIZE=8
VOLUMES = dataABC

# [cluster largecluster]
# Declares that this cluster uses mediumcluster as defaults
# EXTENDS=mediumcluster
# This section is the same as mediumcluster except for the following variables:
# CLUSTER_SIZE=16

#############################
## Configuring EBS Volumes ##
#############################
# StarCluster can attach one or more EBS volumes to the master and then
# NFS_share these volumes to all of the worker nodes. A new [volume] section
# must be created for each EBS volume you wish to use with StarCluser. The
# section name is a tag for your volume. This tag is used in the VOLUMES
# setting of a cluster template to declare that an EBS volume is to be mounted
# and nfs shared on the cluster. (see the commented VOLUMES setting in the
# example 'smallcluster' template above) Below are some examples of defining
# and configuring EBS volumes to be used with StarCluster:

# Sections starting with "volume" define your EBS volumes
[volume dataABC]
VOLUME_ID = vol-0cb816a370ecc4f13
MOUNT_PATH = /data

# Same volume as above, but mounts to different location
# [volume biodata2]
# VOLUME_ID = vol-c999999
# MOUNT_PATH = /opt/

# Another volume example
# [volume oceandata]
# VOLUME_ID = vol-d7777777
# MOUNT_PATH = /mydata

# By default StarCluster will attempt first to mount the entire volume device,
# failing that it will try the first partition. If you have more than one
# partition you will need to set the PARTITION number, e.g.:
# [volume oceandata]
# VOLUME_ID = vol-d7777777
# MOUNT_PATH = /mydata
# PARTITION = 2

############################################
## Configuring Security Group Permissions ##
############################################
# Sections starting with "permission" define security group rules to
# automatically apply to newly created clusters. IP_PROTOCOL in the following
# examples can be can be: tcp, udp, or icmp. CIDR_IP defaults to 0.0.0.0/0 or
# "open to the # world"

# open port 80 on the cluster to the world
# [permission http]
# IP_PROTOCOL = tcp
# FROM_PORT = 80
# TO_PORT = 80

# open https on the cluster to the world
# [permission https]
# IP_PROTOCOL = tcp
# FROM_PORT = 443
# TO_PORT = 443

# open port 80 on the cluster to an ip range using CIDR_IP
# [permission http]
# IP_PROTOCOL = tcp
# FROM_PORT = 80
# TO_PORT = 80
# CIDR_IP = 18.0.0.0/8

# restrict ssh access to a single ip address (<your_ip>)
# [permission ssh]
# IP_PROTOCOL = tcp
# FROM_PORT = 22
# TO_PORT = 22
# CIDR_IP = <your_ip>/32

#####################################
## Configuring StarCluster Plugins ##
#####################################
# Sections starting with "plugin" define a custom python class which perform
# additional configurations to StarCluster's default routines. These plugins
# can be assigned to a cluster template to customize the setup procedure when
# starting a cluster from this template (see the commented PLUGINS setting in
# the 'smallcluster' template above). Below is an example of defining a user
# plugin called 'myplugin':

# [plugin myplugin]
# NOTE: myplugin module must either live in ~/.starcluster/plugins or be
# on your PYTHONPATH
# SETUP_CLASS = myplugin.SetupClass
# extra settings are passed as __init__ arguments to your plugin:
# SOME_PARAM_FOR_MY_PLUGIN = 1
# SOME_OTHER_PARAM = 2

######################
## Built-in Plugins ##
######################
# The following plugins ship with StarCluster and should work out-of-the-box.
# Uncomment as needed. Don't forget to update your PLUGINS list!
# See http://star.mit.edu/cluster/docs/latest/plugins for plugin details.
#
# Use this plugin to install one or more packages on all nodes
[plugin pkginstaller]
SETUP_CLASS = starcluster.plugins.pkginstaller.PackageInstaller
# # list of apt-get installable packages
# PACKAGES = mongodb, python-pymongo
#
# Use this plugin to create one or more cluster users and download all user ssh
# keys to $HOME/.starcluster/user_keys/<cluster>-<region>.tar.gz
# [plugin createusers]
# SETUP_CLASS = starcluster.plugins.users.CreateUsers
# NUM_USERS = 30
# # you can also comment out NUM_USERS and specify exact usernames, e.g.
# # usernames = linus, tux, larry
# DOWNLOAD_KEYS = True
#
# Use this plugin to configure the Condor queueing system
# [plugin condor]
# SETUP_CLASS = starcluster.plugins.condor.CondorPlugin
#
# The SGE plugin is enabled by default and not strictly required. Only use this
# if you want to tweak advanced settings in which case you should also set
# DISABLE_QUEUE=TRUE in your cluster template. See the plugin doc for more
# details.
[plugin sge]
SETUP_CLASS = starcluster.plugins.sge.SGEPlugin
MASTER_IS_EXEC_HOST = False
#
# The IPCluster plugin configures a parallel IPython cluster with optional
# web notebook support. This allows you to run Python code in parallel with low
# latency message passing via ZeroMQ.
# [plugin ipcluster]
# SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster
# # Enable the IPython notebook server (optional)
# ENABLE_NOTEBOOK = True
# # Set a password for the notebook for increased security
# # This is optional but *highly* recommended
# NOTEBOOK_PASSWD = a-secret-password
# # Set a custom directory for storing/loading notebooks (optional)
# NOTEBOOK_DIRECTORY = /path/to/notebook/dir
# # Set a custom packer. Must be one of 'json', 'pickle', or 'msgpack'
# # This is optional.
# PACKER = pickle
#
# Use this plugin to create a cluster SSH "dashboard" using tmux. The plugin
# creates a tmux session on the master node that automatically connects to all
# the worker nodes over SSH. Attaching to the session shows a separate window
# for each node and each window is logged into the node via SSH.
# [plugin tmux]
# SETUP_CLASS = starcluster.plugins.tmux.TmuxControlCenter
#
# Use this plugin to change the default MPI implementation on the
# cluster from OpenMPI to MPICH2.
# [plugin mpich2]
# SETUP_CLASS = starcluster.plugins.mpich2.MPICH2Setup
#
# Configure a hadoop cluster. (includes dumbo setup)
# [plugin hadoop]
# SETUP_CLASS = starcluster.plugins.hadoop.Hadoop
#
# Configure a distributed MySQL Cluster
# [plugin mysqlcluster]
# SETUP_CLASS = starcluster.plugins.mysql.MysqlCluster
# NUM_REPLICAS = 2
# DATA_MEMORY = 80M
# INDEX_MEMORY = 18M
# DUMP_FILE = test.sql
# DUMP_INTERVAL = 60
# DEDICATED_QUERY = True
# NUM_DATA_NODES = 2
#
# Install and setup an Xvfb server on each cluster node
# [plugin xvfb]
# SETUP_CLASS = starcluster.plugins.xvfb.XvfbSetup

The /etc/exports file in my AMI image is set as follows:

Selection_002

3. Start the cluster

Once correctly setup, starting a new MPI cluster is very easy:

starcluster start myclusterABC

Notice that command creates a new cluster, and if the same name cluster has been created above, it will pops up an error.  To overwrite the error, go to security group to delete the group @sc-myclusterABC before recreating it.

To restart an already created, but powered-off, cluster, use the following command:

starcluster start -x myclusterABC

To ssh/shutdown/terminate a running cluster, use the commands described on the manual webpage (http://star.mit.edu/cluster/docs/0.95.2/manual/launch.html).

After the cluster has been successfully setup, my AWS console looks like the following.

Selection_001

4. Running MPI code for Temporal Pattern mining

Our temporal pattern mining algorithm is written on MPI4py, so it can be run in the default setting of StarCluster.  I also preinstalled MPI4py and other relevant python packages in the AMI image so I can directly fire up the cluster and run.

Before running the MPI program, I needed to create as a cluster configuration file for mpi command, simply I copied the content from /etc/hosts on any node of the MPI cluster and saved it on the shared NFS folder.

The /etc/hosts file looks looks like following:

Selection_003

And I changed its content as an mpihosts.txt file as follows:

Selection_004

4.1  Supervised Pattern Analysis: MPI is 9 times faster than non-MPI

Next, I executed the supervised learning version of our pattern mining algorithm using

time /home/ubuntu/anaconda2/bin/mpirun --hostfile mpihosts.txt -np 10 /home/ubuntu/anaconda2/bin/python findPatterns.py

where findpatterns.py was our data mining code extracting the significant temporal patterns from insurance claim data. The details of the algorithm will be explained in future articles.

The MPI process took 26 seconds to finish and found 15 significant patterns.

Selection_006

To compare the performance, I executed the algorithm on a single node (master node) and it took 3 min 40 seconds to finish.  The results were identical.

time /home/ubuntu/anaconda2/bin/python findPatterns.py

Selection_007

4.2  Unsupervised Pattern Analysis: MPI is >200 times faster than non-MPI

Our algorithm can also extract temporal patterns through unsupervised learning, which is a more computational intensive process than supervised analysis.  I reran the algorithm on the same data set by turning off the supervised control flag (to ignore the given labels).  I had to increase the size of cluster to m3.xlarge because it demands more memory than supervised learning. Without using a MPI cluster, I waited for 1 hour and it still didn’t finish.  So I terminated that process, started with the MPI version instead, and found 223 temporal patterns in 17 seconds.

Selection_009Selection_010

5. Conclusion

Unlike academia and research labs, nowadays commercial organizations rarely deploy MPI clusters as a general computational platform.  In order to handle a specific type of problems which not necessarily have big data, but involve huge search spaces resulted from combinatorial explosions,  MPIs still have significant advantages.  This article recommended an open source tool — StarCluster — and demonstrated how to quickly setup a customizable MPI cluster in AWS to improve the efficiency of temporal pattern extraction.

In the next article, I will introduce the mathematical problem of Temporal Pattern Analysis and several interesting models.

Text Summarization using Sequence-to-Sequence model in Tensorflow and GPU computing: Part 2 – AWS P2 Instance Installation

Merry Christmas!

I finally got my toys for the Christmas – AWS P2 instances, Yay!  This weekend Chicago snowed again, so a perfect timing to try Cloud!  This article was a footnote of my first touch with multi-GPU platform.  I discussed the advantage of multi-GPU platform in Deep Learning package Tensorflow, and tried Seq2Seq attention model and Convolutional Neural Network and their applications in text summarization and image classification.

This article was mainly about architecture, and a follow up of my previous post:

Text Summarization using Sequence-to-Sequence model in Tensorflow and GPU computing: Part I – How to get things running

Operating System: Ubuntu Linux 16.04 64-bit
Tensorflow r 0.11  (At the time this blog is written, TF r 0.12 has been released but I downgrade to 0.11 because of some compatibility issues)
Anaconda Python 2.7 64-bit
NVIDIA CUDA 8.0
NVIDIA cuDNN 5.1

AWS Platform

P2-xlarge  / 1 GPU / 4 vCPU / 61 G RAM

P2-8xlarge / 8GPU / 32 vCPU / 488 G RAM

P2-16xlarge / 16GPU / 64 vCPU / 732 G RAM

1. Install CUDA on P2 instance

aws_p2.png

I followed my early post to setup CUDA and Tensorflow.  Though I never expected a home run cause I knew there would always be some hurdles,  still I ended up spending half of day figuring out all the problems. Here are some major ones and solutions:

1.1 Disable Nouveau

To disable Nouveau on AWS, I added the following lines to the /etc/modprobe.d/blacklist-nouveau.conf  file:

blacklist nouveau 
blacklist lbm-nouveau 
options nouveau modeset=0 
alias nouveau off 
alias lbm-nouveau off

1.2 Update the kernel

$ apt-get install linux-source 
$ apt-get install linux-headers-3.13.0-37-generic

1.3 Test CUDA

Once CUDA installed correctly, I used ./deviceQuery to determine that the GPU model in P2 instance is Tesla K80.  I expected Titan Pascal :O.

p2_gpu

2. Install Tensorflow r 0.11.0

2.1  Some issues in compilation and installation

While installing r 0.12.0, at the first time, the ./configure successfully finished but at

"bazel build -c opt //tensorflow/tools/pip_package:build_pip_package"

it reported an error:

 ......./tensorflow/tensorflow/python/BUILD:1806:1: in cc_library rule //tensorflow/python:tf_session_helper: non-test target '//tensorflow/python:tf_session_helper' depends on testonly target '//tensorflow/python:construction_fails_op' and doesn't have testonly attribute set

The workaround was commenting out these two lines of “…./python/BUILD”:

 1811 ":construction_fails_op",
 1813 ":test_ops_kernels",
 then the build is fine.

2.2  CXXABI_1.3.8 not found error

When testing TF installation in Python, ran into an error like following:

CXXABI_error.pngand resolved by the following solution:

strings /usr/lib/x86_64-linux-gnu/libstdc++.so.6 | grep CXXABI_1.3.8
cp /usr/lib/x86_64-linux-gnu/libstdc++.so.6 /home/ubuntu/anaconda2/lib/

2.3  Why I still use r 0.11.0

At the time I tried this, Tensorflow r 0.12 has been released, but it popped up too many warnings and error messages when running the text summarization models. For example:

scalar_summary.png

So I decided to stick to r 0.11.0.

3.  Tensorflow on multi-GPU instance

3.1 Tensorflow supports no more than 8 GPUs

When running text summarization model on P16-xlarge instance, I ran into the following error:

CUDA_peer_error.png

A dig on github (https://github.com/tensorflow/tensorflow/issues/5789) indicates that current releases of Tensorflow do not fully support more than 8 GPU devices.  To avoid error on platform which has more than 8 GPU,  I restricted the number of GPU in Python code:

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7,8"

Of course, the other GPUs are wasted therefore it seems there is no advantage of using P16-xlarge instance for Tensorflow.

3.2 Text summarization performance comparison

My ultimate goal was originally to optimize the platform to carry large scale text summarization training.  With that in mind, I reran the experiment using the toy data mentioned in my early post.  The model was trained on 20 articles, and epoch was set to 5000.

To my surprise, Multi-GPU architectures did not bring significant improvement to Text Summarization model.  I mainly compared 7 settings:

  • P1 GPU:  AWS 2xlarge instance equipped with Single TELSA K80 GPU
  • P8 n_gpu=8:  AWS 8xlarge instance equipped with Eight TELSA K80 GPU, setting the —num_gpus=8 explicitly
  • P8 n_gpu=2:  AWS 8xlarge instance equipped with Eight TELSA K80 GPU, setting the —num_gpus=2 explicitly
  • P8 n_gpu=0:  AWS 8xlarge instance equipped with Eight TELSA K80 GPU, setting the —num_gpus=0 explicitly
  • P16 GPU:  AWS 16xlarge instance equipped with Eight TELSA K80 GPU, setting the —num_gpus=0 explicitly.  Because Tensorflow has difficulty handling > 8 GPUs, I set the os.environ[“CUDA_VISIBLE_DEVICES”]=”0,1,2,3,4,5,6,7,8″
  • P16 CPU: AWS 16xlarge instance equipped with Eight TELSA K80 GPU, with the train function of seq2seq_attention model uses tf.device('/cpu:0'All other settings use tf.device('/gpu:0'
  • GTX 970: My own desktop machine with single GTX 970 GPU and 6-core AMD CPU
    textsum_compare.png

    As the picture shows, multi-GPU architecture does not bring significant improvement on efficiency. A further look of the model code gives me a feeling that the text summarization model does not exploit the parallelization of GPU.  The “_Train” function seems only using the first GPU / CPU device.   As known, Tensor flow was originally deployed on Google Cloud Platform and was not optimized for GPU.  So maybe the Text Summarization model hasn’t fully exploited the advantages yet.  It ought to be rewritten in a GPU-specified divide-and-conquer manner, and to exploit parallelism at all stages on multiple GPUs.  Sounds a lots of work, and I am not an expert in low-level programming.  Maybe for the same effort, it will be easier for me to create a similar model structure in Caffe because its better support for multi-GPU?

def _Train(model, data_batcher):
   """Runs model training."""
  with tf.device('/gpu:0'):  # was ('/cpu:0')
      model.build_graph()
      saver = tf.train.Saver()
      summary_writer = tf.train.SummaryWriter(FLAGS.train_dir)
      sv = tf.train.Supervisor(logdir=FLAGS.log_root, is_chief=True,<span class="Apple-converted-space"></span>saver=saver, summary_op=None,<span class="Apple-converted-space"> </span>save_summaries_secs=60, save_model_secs=FLAGS.checkpoint_secs,global_step=model.global_step)
      sess = sv.prepare_or_wait_for_session(config=tf.ConfigProto(allow_soft_placement=True)
      running_avg_loss = 0
      step = 0
   ...

With a bit disappointment, I continued to explore other models.  In next section, I will show another (better) example in Tensorflow that can actually benefit from high-end multi-GPU architecture.

3.3 CNN Image classification benchmark CIFAR-10

CIFAR-10 is a CNN (convolutional neural network) demo included in Tensorflow as an image classification benchmark. The problem is to classify RGB 32×32 pixel images across 10 categories: airplane, automobile, bird, cat, deer, dog, frog, horse, ship, and truck.

cafir10.jpg

Tensorflow provides a multi-GPU version algorithm for model training. According to Tensorflow documentation:

” In a workstation with multiple GPU cards, each GPU will have similar speed and contain enough memory to run an entire CIFAR-10 model. Thus, we opt to design our training system in the following manner:

  • Place an individual model replica on each GPU.
  • Update model parameters synchronously by waiting for all GPUs to finish processing a batch of data.

Note that each GPU computes inference as well as the gradients for a unique batch of data. This setup effectively permits dividing up a larger batch of data across the GPUs. 

Parallelism_CNN.png

Therefore, the advantage of multi-GPU architecture here is to build model on larger datasets within approximately the same amount of time. More training data can lead to more accurate model predictions.  In other words, for a same amount of data, more GPU can reduce the training time but keep the model performance more or less the same. Due to the time limitations, I only trained models for 10K steps.

CIFAR10 CNN model is located at

~/tensorflow/tensorflow/models/image/cifar10

and

cifar10_multi_gpu_train.py

is the training algorithm for multi-GPU and

cifar10_eval.py

evaluates the trained model (auto-saved as checkpoint) as precision value of 10-class image classification on 10K held-out CIFAR test data.  To specify the number of GPU in training, I used:

python cifar10_multi_gpu_train.py --num_gpus=2

Tensorflow r.12.0 has made changes on these script from r.11.0, if the Tensorflow package was compiled and installed as r.11.0, there will be compatibility issues.  I extract these script from archive r 0.11.0 version.

1GPU  (20k images) 2GPU (40K images) 3GPU (60K images) 4GPU (80K images) 6GPU (120K images) 8GPU (160K images)
Time(min) 20 23 30 32 42 52
Precision 0.815 0.826 0.826 0.832 0.832 0.836
Images/Sec 1022 2064 2422 2719 3000 3347
Sec/batch 0.125 0.068 0.047 0.045 0.04 0.035

CNN_CIFAR_image_size.png

As shown in the figure above,  with more images used in training, the model performance increased from 0.816 (20K images in training) to 0.836 (160K images in training). While the training data increased 8 times, due to the parallelization of multi-GPU, the training time only increased 2.5 times.  The author of CIFAR model reported a best precision score about 0.86 trained for 1M steps.

GPU_ratio.png

The second figure compared the ratio of data processing speed (Images/sec) / (Second/batch) vs. the increase of GPU numbers. It is very difficult to obtain linear speed-up in m-GPU platform.  As shown, an 8 GPU platform only led to 3.5 times speed up comparing to single GPU system.

4. Conclusion

AWS P2 instances are powerful platforms to exploit the power of GPU computation. Unfortunately, the software package seems lagging behind, and cannot benefit from the high-end multi-GPU architecture provided in P2.16xlarge instances.

The current text summarization model in Tensorflow has not exploited the full potential of multi-GPU system, and hopefully smart contributors will restructure the code in future releases.

One side note was, regarding AWS P2 instance, each time I stopped/restarted, or created a new instance from a saved AMI image with CUDA/Tensorflow preinstalled, the environment was broken again and again.  I ended up recompiling / reinstalling the environment so many times and now I can install everything with my eyes covered.  It was a big fun, and more fun when I see the coming Amazon AWS bill :).

Is my data big enough for GPU computing? A PyCuda experiment

PyCUDA,  Python,  C/C++, Nvidia CUDA,  GPU Computing

GPU: Geforce GTX 970 4G
CPU: AMD FX 6300
Memory: 8G

Operating System: Ubuntu Linux 16.04 64-bit

We often heard about GPU computing being faster than CPU.  There is no doubt for most Deep Learning algorithms GPU outperforms CPU on efficiency.  But what about other daily computations?  Now exploiting the advantage of GPU becomes more and more easier, due to the improved development platform and API access interface, and a more intriguing paradigm is to combine the power of CPU cluster with GPU-powered node for very complex computation.  When data is parallelized and assigned to each node, the algorithm should be able to determine the most powerful computational device. The answer of this question depends on many factors, but one of them is surely empirical: How big is the data? Is it big enough to gain advantage from GPU?  In this blog, I will setup a PyCuda experiment to explore the best CPU-GPU switching point on my old desktop for matrix mulitplication problem.

0.  Install NVIDIA CUDA

I have discussed the step of installing NVIDIA CUDA in my previous post (https://wordpress.com/post/eilianyu.wordpress.com/177)

1.  Compile and Install Numpy and OpenBLAS

To fairly compare performance between GPU and CPU, we have to ensure Numpy is linked to OpenBLAS. Otherwise, the comparison would be “flawed” because we haven’t fully exploit opportunities provided by the underlying hardware, as suggested by this interesting article (https://people.eecs.berkeley.edu/~sangjin/2013/02/12/CPU-GPU-comparison.html).

Please follow  https://hunseblog.wordpress.com/2014/09/15/installing-numpy-and-openblas/  to install Numpy and OpenBLAS.

(I had an error “Program received signal SIGILL: Illegal instruction.” when upgrading from AMD Phenom to FX because the architectural change.  Please make sure download the latest version of openBLAS and set the Target correctly as suggested in this post  https://github.com/xianyi/OpenBLAS/issues/382 ).

After installation, please double check that Numpy is correctly linked to openBLAS:

$python
>>import numpy as np
>>np.__config__.show()
lapack_opt_info:
    libraries = ['openblas', 'openblas']
    library_dirs = ['/opt/openblas/lib']
    define_macros = [('HAVE_CBLAS', None)]
    language = c
blas_opt_info:
    libraries = ['openblas', 'openblas']
    library_dirs = ['/opt/openblas/lib']
    define_macros = [('HAVE_CBLAS', None)]
    language = c
openblas_info:
    libraries = ['openblas', 'openblas']
    library_dirs = ['/opt/openblas/lib']
    define_macros = [('HAVE_CBLAS', None)]
    language = c
openblas_lapack_info:
    libraries = ['openblas', 'openblas']
    library_dirs = ['/opt/openblas/lib']
    define_macros = [('HAVE_CBLAS', None)]
    language = c
blas_mkl_info:
  NOT AVAILABLE

And

$locate lapack_lite.so
/home/cyberyu/openblas/numpy/build/lib.linux-x86_64-2.7/numpy/linalg/lapack_lite.so
/usr/local/lib/python2.7/dist-packages/numpy-1.11.2-py2.7-linux-x86_64.egg/numpy/linalg/lapack_lite.so

$ldd /usr/local/lib/python2.7/dist-packages/numpy-1.11.2-py2.7-linux-x86_64.egg/numpy/linalg/lapack_lite.so
cyberyu@cyberyugpu:~$ ldd /usr/local/lib/python2.7/dist-packages/numpy-1.11.2-py2.7-linux-x86_64.egg/numpy/linalg/lapack_lite.so
    linux-vdso.so.1 =>  (0x00007ffceef67000)
    libopenblas.so.0 => /opt/openblas/lib/libopenblas.so.0 (0x00007f8ea1b45000)
    libpthread.so.0 => /lib/x86_64-linux-gnu/libpthread.so.0 (0x00007f8ea18fd000)
    libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007f8ea152d000)
    libm.so.6 => /lib/x86_64-linux-gnu/libm.so.6 (0x00007f8ea121d000)
    libgfortran.so.3 => /usr/lib/x86_64-linux-gnu/libgfortran.so.3 (0x00007f8ea0eed000)
    /lib64/ld-linux-x86-64.so.2 (0x00005578bbd00000)
    libquadmath.so.0 => /usr/lib/x86_64-linux-gnu/libquadmath.so.0 (0x00007f8ea0cad000)
    libgcc_s.so.1 => /lib/x86_64-linux-gnu/libgcc_s.so.1 (0x00007f8ea0a95000)

2.  Install PyCuda

Obtain and install PyCuda from Andreas Klöckner’s web page.

PyCUDA provides an interface to access Nvidia‘s CUDA parallel computation API from Python.  Please notice it is just an interface, so the core algorithm should still be written in CUDA’s programming model based on C and C++.

PyCUDA has a series of dependencies, so please make sure installing them all.

3.  Install OpenCV and OpenMP

To fully exploit the platform, I install OpenMP to enhance OpenCV with multithreading. OpenCV is available at http://docs.opencv.org/2.4/doc/tutorials/introduction/linux_install/linux_install.html.  OpenMP is available at http://bisqwit.iki.fi/story/howto/openmp/.

To keep the discussion concise, I skip all the trouble-shootings for OpenCV and OpenMP setup. Once they are correctly installed, I use the following command to compile the C++ code:

$g++ -fpic rbf_kernel_simple.cpp -o rbf_kernel_simple.o

 

4.  A simple test comparing CPU and GPU

My experiment started with a very simple (naive) function. The goal is to take a large single dimensional float array x[i], where i is the index, as input and update its elements as new values equal to x[i]*x[i] – 0.25 and repeat some stupid loops for 1000 times.  The overall Python code is illustrated as follows:

import os
os.environ['LD_LIBRARY_PATH'] = '/opt/openblas/lib/'
import numpy
import time
from pycuda.compiler import SourceModule
import pycuda.autoinit
from numpy import linalg as LA

mod = SourceModule("""
__global__ void cudakernel(float * buf)
{
   int i = threadIdx.x + blockIdx.x * blockDim.x;
   buf[i] = 1.0f * i / 10000;
   for (int j = 0; j < 1000; j++)
      buf[i] = buf[i] * buf[i] - 0.25f;
}
""")

a = numpy.ones(1000)  # construct an array of 1000 elements
a = a.astype(numpy.float32)

import pycuda.driver as cuda
t1 = time.time()
a_gpu = cuda.mem_alloc(a.nbytes)
cuda.memcpy_htod(a_gpu, a)

func = mod.get_function("cudakernel")
func(a_gpu, block = (250, 1, 1), grid=(4,1))
a_doubled = numpy.empty_like(a)
cuda.memcpy_dtoh(a_doubled, a_gpu)

t2 = time.time()-t1
print "GPU time used:", t2*1000, " ms "

t3 = time.time()

import ctypes as C
my_test_lib = C.CDLL('/home/cyberyu/PycharmProjects/mpi_pycuda_test/cudakernelmp.o')
outdata = numpy.zeros(a.size, dtype=numpy.double)
a_cpued = my_test_lib.serial_add(C.c_void_p(a.ctypes.data), C.c_int(1000), C.c_int(2000), C.c_void_p(outdata.ctypes.data))

t4 = time.time()-t3
print "CPU time nedded:", t2*1000, " ms "
print "Error is " + str(LA.norm(a_doubled - outdata))

The code above uses the PyCUDA SourceModule system to pass the cuda C code to the cubin file compiler, and load that cubin file into the current CUDA context. The Cuda C code is wrapped in mod=SourceModule(“”” … “””).   Notice that the first line defines the index of the array using threadIdx, blockIdx, and blockDim variables, which are delicately controlled by Cuda’s block and grid parallelism mechanism.   Understanding the concept of threads, blocks, and grids, more importantly, to design heuristics that automatically scales with data is critical to, also the beauty of, CUDA development.  Some nice references online:

http://users.wfu.edu/choss/CUDA/docs/Lecture%205.pdf

http://cuda-programming.blogspot.com/2013/01/thread-and-block-heuristics-in-cuda.html

The relationship of block and thread in single dimensional assignment is illustrated in the first reference document by Samuel S. Cho:

figure3.png

In my code, I choose the block size as (250,1,1) , and grid size as (4,1) because we are processing a single dimensional array.  So in the first dimension, the multiplication of block size with grid size should be equal to the array length 1000.  Otherwise, the parallelism will not lead to correct result (can be checked via the error variable).  Other settings are also possible, such as

func(a_gpu, block = (100, 1, 1), grid=(10,1))
or
func(a_gpu, block = (500, 1, 1), grid=(2,1))

The first part of the code is to compute the processing time for GPU.  In the second part, I calculate the processing time of CPU by writing a separate C code  cudakernelmp.c :

void serial_add(float * indatav,  int n_count, int m_count, float *outdatav){
const double * indata = (double *) indatav;
double * outdata = (double *) outdatav;

#pragma omp parallel for
for(int i = 0; i < n_count; i++)
     {
        outdata[i] = 1.0f * i / n_count;

        #pragma omp parallel for
        for(int j = 0; j < m_count; j++)
        {
           outdata[i] =outdata[i] * outdata[i] - 0.25f;
        }
     }
}

Then I compile it to linked library:

gcc -shared cudakernelmp.c -o cudakernelmp.o

And it is called in line 38 of the python code.

The output of the python code is:

GPU time used: 1.57809257507  ms 
CPU time nedded: 21.1148262024  ms 
Error is 8.85293575516e-08

It shows GPU is 20 faster than CPU on this naive task and the difference (L2 norm) of the results is almost negligible.

5.  Comparing CPU and GPU for Matrix Multiplication

The real advantage of GPU computing lies in the complicated matrix computation. In the next experiment, I extract the matrix multiplication code from Andreas Klöckner’s PyCuda library. The original algorithm is listed at:

https://wiki.tiker.net/PyCuda/Examples/MatrixmulTiled

Unlike the previous single dimensional experiment, now we need to exploit the assignment of threads w.r.t. blocks and grids in 2D space.  The relationship is illustrated as follows (taken from Samuel S. Cho’s lecture slide):

figure2.png

 

I made small changes on the dynamic assignment of block and grid size. To make it simple, I consider square matrix multiplication only. I haven’t tested it on non-square matrices.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

"""
Multiples two square matrices together using multiple blocks and shared memory.
Each thread block is assigned a "tile" of the resulting matrix and is responsible
for generating the elements in that tile. Each thread in a block computes one element
of the tile.
"""

import numpy as np
from numpy import linalg as la
import time
import matplotlib.pylab as plt

from pycuda import driver, compiler, gpuarray, tools
# -- initialize the device
import pycuda.autoinit

kernel_code_template = """
__global__ void MatrixMulKernel(float *A, float *B, float *C)
{
 const uint wA = %(MATRIX_SIZE)s;
 const uint wB = %(MATRIX_SIZE)s;

 // Block index
 const uint bx = blockIdx.x;
 const uint by = blockIdx.y;

 // Thread index
 const uint tx = threadIdx.x;
 const uint ty = threadIdx.y;

 // Index of the first sub-matrix of A processed by the block
 const uint aBegin = wA * %(BLOCK_SIZE)s * by;
 // Index of the last sub-matrix of A processed by the block
 const uint aEnd = aBegin + wA - 1;
 // Step size used to iterate through the sub-matrices of A
 const uint aStep = %(BLOCK_SIZE)s;

 // Index of the first sub-matrix of B processed by the block
 const uint bBegin = %(BLOCK_SIZE)s * bx;
 // Step size used to iterate through the sub-matrices of B
 const uint bStep = %(BLOCK_SIZE)s * wB;

 // The element of the block sub-matrix that is computed
 // by the thread
 float Csub = 0;
 // Loop over all the sub-matrices of A and B required to
 // compute the block sub-matrix
 for (int a = aBegin, b = bBegin;
 a <= aEnd;
 a += aStep, b += bStep)
 {
 // Shared memory for the sub-matrix of A
 __shared__ float As[%(BLOCK_SIZE)s][%(BLOCK_SIZE)s];
 // Shared memory for the sub-matrix of B
 __shared__ float Bs[%(BLOCK_SIZE)s][%(BLOCK_SIZE)s];

 // Load the matrices from global memory to shared memory
 // each thread loads one element of each matrix
 As[ty][tx] = A[a + wA * ty + tx];
 Bs[ty][tx] = B[b + wB * ty + tx];
 // Synchronize to make sure the matrices are loaded
 __syncthreads();

 // Multiply the two matrices together;
 // each thread computes one element
 // of the block sub-matrix
 for (int k = 0; k < %(BLOCK_SIZE)s; ++k)
 Csub += As[ty][k] * Bs[k][tx];

 // Synchronize to make sure that the preceding
 // computation is done before loading two new
 // sub-matrices of A and B in the next iteration
 __syncthreads();
 }

 // Write the block sub-matrix to global memory;
 // each thread writes one element
 const uint c = wB * %(BLOCK_SIZE)s * by + %(BLOCK_SIZE)s * bx;
 C[c + wB * ty + tx] = Csub;
}
"""

def benchmarkCPU(scale):
  rsCPU = []

  print 'Start CPU processing'

  for scaleFactor in xrange(scale):

       # load the matrices
       a_cpu = np.load('testmat_{}.npz'.format(scaleFactor))['arr_0']
       b_cpu = np.load('testmat_{}.npz'.format(scaleFactor))['arr_1']

       MATRIX_SIZE = 2**(scaleFactor) * 16
       print "==" * 100
       print 'Loading matrix size of ' + str(MATRIX_SIZE)

       # compute reference on the CPU to verify GPU computation
       at1 = time.time()
       c_cpu = np.dot(a_cpu, b_cpu)
       at2 = time.time()
       dt12 = (at2 - at1)*1000
       print "CPU time used:", dt12, " ms "

       # save the results in npz
       np.savez('cpu_res_{}.npz'.format(scaleFactor), c_cpu)
       rsCPU.append(dt12)

  return rsCPU

def benchmarkGPU(scale):
   rsGPU = []
   rsCOPY= []
   print 'Start GPU processing'

   # define size of blocks and tiles sub-matrix
   # (we assume that the block size is same as tile size)
   TILE_SIZE = 16
   BLOCK_SIZE = TILE_SIZE

   for scaleFactor in xrange(scale):

       MATRIX_SIZE = 2 ** (scaleFactor) * 16

       print "==" * 100
       print 'Loading Matrix size of ' + str(MATRIX_SIZE)

       # load the matrices
       a_cpu = np.load('testmat_{}.npz'.format(scaleFactor))['arr_0']
       b_cpu = np.load('testmat_{}.npz'.format(scaleFactor))['arr_1']

       at1 = time.time()
       a_gpu = gpuarray.to_gpu(a_cpu)
       b_gpu = gpuarray.to_gpu(b_cpu)

       at2 = time.time()
       dt12= (at2-at1)*1000

       print "COPY time used:", dt12, " ms "

       # create empty gpu array for the result (C = A * B)
       c_gpu = gpuarray.empty((MATRIX_SIZE, MATRIX_SIZE), np.float32)

       # get the kernel code from the template
       # by specifying the constants MATRIX_SIZE and BLOCK_SIZE
       kernel_code = kernel_code_template % {
           'MATRIX_SIZE': MATRIX_SIZE,
           'BLOCK_SIZE': BLOCK_SIZE,
       }

       # compile the kernel code
       mod = compiler.SourceModule(kernel_code)

       # get the kernel function from the compiled module
       matrixmul = mod.get_function("MatrixMulKernel")

       # call the kernel on the card
      matrixmul(
         # inputs
         a_gpu, b_gpu,
         # output
         c_gpu,
         # grid of multiple blocks
         # Andreas' original code is: grid = (MATRIX_SIZE // TILE_SIZE, MATRIX_SIZE // TILE_SIZE),
         grid=( (MATRIX_SIZE + TILE_SIZE -1) // TILE_SIZE, (MATRIX_SIZE + TILE_SIZE -1) // TILE_SIZE),
         # block of multiple threads
         block=(TILE_SIZE, TILE_SIZE, 1),
      )

      # copy result from GPU
      re = c_gpu.get()

      at3 = time.time()
      dt23 = (at3 - at2)*1000
      print "GPU time used:", dt23, " ms "

      np.savez('gpu_res_{}.npz'.format(scaleFactor), re)

      rsGPU.append(dt23)
      rsCOPY.append(dt12)

    return [rsGPU, rsCOPY]

def calErr(scale):

   rsErr=[]
   print 'Comparing Error'

   for scaleFactor in xrange(scale):
        res_cpu = np.load('cpu_res_{}.npz'.format(scaleFactor))['arr_0']
        res_gpu = np.load('gpu_res_{}.npz'.format(scaleFactor))['arr_0']

        err = la.norm(res_cpu - res_gpu)
        rsErr.append(err)

   return rsErr

def generate_mat(scale):
     # generate some large matrices and store them as npz files
     # I can only try scaleFactor = 9 because of the memory limit of my GPU card.

     print 'Generating Matrices'

     for scaleFactor in xrange(scale):
         MATRIX_SIZE = 2 ** (scaleFactor) * 16
         a_cpu = np.random.randn(MATRIX_SIZE, MATRIX_SIZE).astype(np.float32)
         b_cpu = np.random.randn(MATRIX_SIZE, MATRIX_SIZE).astype(np.float32)
         np.savez('testmat_{}.npz'.format(scaleFactor), a_cpu, b_cpu)

def main():

    GSCALE = 10

    generate_mat(GSCALE)
    rsCPU = benchmarkCPU(GSCALE)
    rs = benchmarkGPU(GSCALE)
    rsGPU = rs[0]
    rsCopy = rs[1]
    rsErr= calErr(GSCALE)

    labels = [2**(x)*16 for x in xrange(GSCALE)]
    plt.plot(xrange(GSCALE), rsCPU,'b-', label="CPU processing time")
    plt.plot(xrange(GSCALE), rsGPU,'r-', label="GPU processing time")
    plt.plot(xrange(GSCALE), rsCopy, 'y-', label="Copy processing time")
    plt.xticks(xrange(GSCALE), labels, rotation='vertical')

    plt.grid(True, which="major", linestyle="dotted")
    plt.yscale("log")

    plt.ylabel("Logrithm Response time (msec)", fontsize=9)
    plt.xlabel("Matrix Size ", fontsize=9)

    plt.xticks(fontsize=9)
    plt.yticks(fontsize=9)

    plt.legend(loc='upper left', fancybox=True, shadow=True, prop=dict(size=8))

    ax2 = plt.twinx()
    ax2.set_ylabel('Error', color='g')
    ax2.plot(xrange(GSCALE), rsErr, 'g-', label="Norm difference")

    ax2.legend(loc=0)
    plt.show()

if __name__ == "__main__":
    main()

The program generates 10 pair of matrices from 16×16 to 8192×8192, and calculates their mutiplications respectively using CUDA tiled matrix multiplication algorithm and numpy dotproduct. The matrices are pregenerated and saved as numpy npz files, and loaded back for benchmarking.  The CPU processing time and GPU procesing time are compared on multiplication matrices of the same sizes. The Copy time from CPU to GPU is separately tracked. Finally, the difference of the CPU and GPU results are evaluated as the L2-norm of production matrix.

 

Matrix Size 16 32 64 128 256 512 1024 2048 4096 8192
GPU (ms) 170.27 1.76 1.36 1.39 0.98 2.59 9.24 45.35 303.45 2134.40
Copy from CPU to GPU (ms) 0.38 0.71 0.51 0.53 0.52 1.23 3.23 8.64 31.01 116.77
CPU (ms) 0.42 0.01 0.03 0.74 0.55 3.21 50.41 199.14 1675.36 11114.58
Error(L2 norm) 0.00 0.00 0.00 0.00 0.00 0.00 0.02 0.08 0.31 1.22

figure_1-1

As shown in the figure above (notice that the timing is represented in log scale), GPU computing does not bring any advantage on multiplication of smaller matrices, and the overhead of Copy time is significant.  When the matrix size becomes larger than 512,  GPU computing outperforms CPU on efficiency.

Be aware of hidden data errors using spark-sas7bdat pacakge to ingest SAS datasets to Spark

During my work today,  I was surprised by several errors I found using spark-sas7bdat package to ingest SAS datasets to Spark.  These errors could have huge impact to enterprise data warehouse system, if you are using the current version of spark-sas7bat version 1.1.4-s_2.10 listed on maven to ingest data from SAS to Spark environment.

I will briefly explain three major problems I’ve found.   Before I start,  I want to state that I like spark-sas7bat package and no doubt it is a great free tool which saves users tons of money from buying official SAS license.

0.  Experimental Setup

I used the following code based on SAS proc hadoop to move SAS dataset to HDFS:


%let hdpcfg = '/opt/sas94/bin/hadoop/conf/combined-site.xml';
proc hadoop cfg=&hdpcfg verbose ;
hdfs mkdir = '/hdfsuser/demo/';
 hdfs copyfromlocal='/newdata/birthday.sas7bdat'
 out='/hdfsuser/demo/birthday.sas7bdat' overwrite;
run;

And I used the following code in PySpark to save SAS dataset to Spark-Hive.


PYSPARK_DRIVER_PYTHON=$PYTHON_HOME/ipython pyspark --master yarn-client --packages saurfang:spark-sas7bdat:1.1.4-s_2.10

from pyspark.sql import HiveContext
sqlContext=HiveContext(sc)
df=sqlContext.read.format('com.github.saurfang.sas.spark').load("hdfs:////user/username/data.sas7bdat")
df.printSchema()
df.write.format('orc').saveAsTable('db1.table_data')

SAS version: 9.4.1.0

Spark version: 1.6.1

Python version: 2.7.12

Hadoop 2.7.1.2.4.2.0-258  hortonworks -r 13debf893a605e8a88df18a7d8d214f571e05289

1.  SAS Date variables ingested in Hive are rolled back one day earlier

First create a simple data set in SAS:

 

data newdata.birthday;
 input @01 employee_id 6.
 @08 last_name $10.
 @19 birthday date9.;
 format employee_id 6.
 last_name $10.
 birthday date9.;
 datalines;
 1247 Garcia 04APR1954
 1078 Gibson 23APR1936
 1005 Knapp 06OCT1938
 1024 Mueller 17JUN1953
;

Then using the script listed in Section 0 to ingest data to Hive, and the results are:

employee_id last_name birthday
0 1247.0 Garcia 1954-04-03
1 1078.0 Gibson 1936-04-22
2 1005.0 Knapp 1938-10-05
3 1024.0 Mueller 1953-06-16

Notice that all the dates are rolled back one day earlier.

2.  SAS Datetime variables ingested in Hive are rolled back some hours


data newdata.birthtime;
 input @01 employee_id 6.
 @08 last_name $10.
 @19 birthtime datetime21.2;
 format employee_id 6.
 last_name $10.
 birthtime datetime21.2;
 datalines;
 1247 Garcia 04APR1954:08:00:00.00
 1078 Gibson 23APR1936:16:00:00.00
 1005 Knapp 06OCT1938:23:59:59.00
 1024 Mueller 17JUN1953:00:00:00.01
;

In hive, for obs 0 and 2, the datetime is rolled back 6 hours. For obs 1 and 3, the datetime is rolled back 5 hours.  One guess is the timezone reference error and summertime adjustment(probably) causing this issue.

obs employee_id last_name birthtime
0 1247.0 Garcia 1954-04-04 02:00:00.0
1 1078.0 Gibson 1936-04-23 11:00:00.0
2 1005.0 Knapp 1938-10-06 17:59:59.0
3 1024.0 Mueller 1953-06-16 19:00:00.01

3.  Missing value misrepresentation caused by SAS data set compression

I found when the original SAS dataset is large and contains missing values, the default configuration of  “COMPRESS=TRUE” may lead to misrepresentations in HIVE.

Because the problem was found on a complicated data set during my work, and I cannot share it to the public, I will only list out the SAS script and the screen shots here. The SAS data set has many missing values and is composed of date, character, numerical, datetime features. The total number of features is 636.

Basically I extract the top 1k records from the original huge data set using two different compression options:


data mydata.demo_compressed_1k(compress=Yes);
 set mydata.originaldata (obs=1000);
run;


data mydata.demo_uncompressed_1k(compress=NO);
 set mydata.originaldata (obs=1000);
run;

The results are shown in the next two figures. The top one is the correct data (uncompressed) and the bottom figure is the mistaken data (compressed).

correct

mistaken

As shown, due to compression, the NULL values are replaced by 0, and sometimes random extremely large numbers. The risk here is in SAS, Compression=YES is a default setting, so unless users explicitly turn that off, all datasets will be compressed automatically and can be totally mistaken in HIVE after ingestion.

Final words

I love to see the respond from the author of spark-sas7bat package and hope these issues can be solved. It is no doubt a great open source package and hope it becomes more reliable. However, the impact of the existing issues can be huge for all ingested data and it may affect lots of business applications.  Be aware of this and do some necessary sanity check if you think your applications and data warehouse may have been affected!

MEMO: Ingesting SAS datasets to Spark/Hive

In SAS (assuming integration with Hadoop), export the dataset to HDFS using proc hadoop:

%let hdpcfg = '/opt/sas94/bin/hadoop/conf/combined-site.xml';
proc hadoop cfg=&hdpcfg verbose ;
  hdfs mkdir = '/user/username/';
  hdfs copyfromlocal='/sasfolder/data.sas7bdat'
  out='/user/username/data.sas7bdat' overwrite;
  run;

Start Spark cluster with spark-sas7bdat package:

PYSPARK_DRIVER_PYTHON=$PYTHON_HOME/ipython pyspark \ 
--master yarn-client \ 
--num-executors 10 \ 
--driver-memory 10G \ 
--executor-memory 10G \ 
--executor-cores 10 \ 
--queue team --conf spark.driver.maxResultSize=10G \
--packages saurfang:spark-sas7bdat:1.1.4-s_2.10

In PySpark, execute the following command:

from pyspark.sql import HiveContext
sqlContext=HiveContext(sc)
df=sqlContext.read.format('com.github.saurfang.sas.spark').load("hdfs:////user/username/data.sas7bdat")
df.printSchema()
df.write.format('orc').saveAsTable('db1.table_data')