Mining Temporal Patterns using an MPI cluster on AWS: Part 1

selection_005

Mining temporal patterns are important issues in enterprise data science.  At a recent summit at Stanford University,  I heard a speech from Prof. Jure Leskovec and he mentioned that temporal patterns in customer digital footprints yield best accuracies in customer behavior prediction, cross-sale recommendation, etc., when compared with other static customer profiles.  Since temporal patterns are so useful, what holds off it being widely applied in practical data science? I think there are two main challenges:

  • The raw data that contains temporal patterns is difficult to gather, especially the data about social individuals  (i.e., customers, agents, organizations).  Most of existing methods are based on studies on sensor signals, but undoubtedly collecting similar data from human is more challenging because of the privacy issues and potential costs.
  • Temporal patterns are difficult to analyze.  Adding the additional time dimension could complicate the problem, usually may result in a combinatorial explosion, thus makes it computational intensive.

This article is based on an ongoing series of experiments in collaboration with our university partners.  We have developed a temporal pattern mining algorithm using Message Passing Interface (MPI) to speed up pattern search in large scale temporal claim notes data.  Since in our firm there is no MPI environment, we tested the algorithm on Amazon AWS.  This blog will focus on the process to setup MPI cluster on AWS platform using StarCluster (http://star.mit.edu/cluster/), a great open-source tool developed at MIT.

1. Installing StarCluster

Getting StarCluster work was not smooth, and I was stuck for a day trying to install it on Mac but struggled with openssl package issue.  I gave up later and used ubuntu linux as local environment and installed StarCluster successfully.  However, it seems the StarCluster git repository hasn’t been update to reflect some recent bugs found relevant to NFS processes (http://star.mit.edu/cluster/mlarchives/2612.html), so I ended up with changing some of the code myself and recompile/install the package.

I put the changed version of StarCluster on my git:

https://github.com/cyberyu/starcluster_journeymap

where the changes I made were on

https://github.com/cyberyu/starcluster_journeymap/blob/master/starcluster/node.py

line 705 to line 706, and line 720 to line 721.

And the installation of StarCluster can be followed as (http://star.mit.edu/cluster/docs/latest/installation.html)

2. StarCluster config file and AMI image

I preinstalled everything in a running instance and loaded the data, then transformed its to an AMI image.  This blog (http://cs.smith.edu/dftwiki/index.php/Tutorial:_Create_an_MPI_Cluster_on_the_Amazon_Elastic_Cloud_(EC2)) gives a clear explanation about the process.

Below is the config file I used to create the MPI cluster.  I pre-setup an EBS data volume and AMI image and uncommented the relevant settings in the config file.

####################################
## StarCluster Configuration File ##
####################################
[global]
# Configure the default cluster template to use when starting a cluster
# defaults to 'smallcluster' defined below. This template should be usable
# out-of-the-box provided you've configured your keypair correctly
DEFAULT_TEMPLATE=smallcluster
# enable experimental features for this release
#ENABLE_EXPERIMENTAL=True
# number of seconds to wait when polling instances (default: 30s)
#REFRESH_INTERVAL=15
# specify a web browser to launch when viewing spot history plots
#WEB_BROWSER=chromium
# split the config into multiple files
#INCLUDE=~/.starcluster/aws, ~/.starcluster/keys, ~/.starcluster/vols

#############################################
## AWS Credentials and Connection Settings ##
#############################################
[aws info]
# This is the AWS credentials section (required).
# These settings apply to all clusters
# replace these with your AWS keys
AWS_ACCESS_KEY_ID = ****
AWS_SECRET_ACCESS_KEY = ****
# replace this with your account number
AWS_USER_ID= ****
# Uncomment to specify a different Amazon AWS region  (OPTIONAL)
# (defaults to us-east-1 if not specified)
# NOTE: AMIs have to be migrated!
AWS_REGION_NAME = eu-east-1
#AWS_REGION_HOST = ec2.eu-west-1.amazonaws.com
# Uncomment these settings when creating an instance-store (S3) AMI (OPTIONAL)
#EC2_CERT = /path/to/your/cert-asdf0as9df092039asdfi02089.pem
#EC2_PRIVATE_KEY = /path/to/your/pk-asdfasd890f200909.pem
# Uncomment these settings to use a proxy host when connecting to AWS
#AWS_PROXY = your.proxyhost.com
#AWS_PROXY_PORT = 8080
#AWS_PROXY_USER = yourproxyuser
#AWS_PROXY_PASS = yourproxypass

###########################
## Defining EC2 Keypairs ##
###########################
# Sections starting with "key" define your keypairs. See "starcluster createkey
# --help" for instructions on how to create a new keypair. Section name should
# match your key name e.g.:
[key mykey]
KEY_LOCATION=~/awskeys/mykey.pem

# You can of course have multiple keypair sections
# [key myotherkey]
# KEY_LOCATION=~/.ssh/myotherkey.rsa

################################
## Defining Cluster Templates ##
################################
# Sections starting with "cluster" represent a cluster template. These
# "templates" are a collection of settings that define a single cluster
# configuration and are used when creating and configuring a cluster. You can
# change which template to use when creating your cluster using the -c option
# to the start command:
#
#     $ starcluster start -c mediumcluster mycluster
#
# If a template is not specified then the template defined by DEFAULT_TEMPLATE
# in the [global] section above is used. Below is the "default" template named
# "smallcluster". You can rename it but dont forget to update the
# DEFAULT_TEMPLATE setting in the [global] section above. See the next section
# on defining multiple templates.

#[plugin mpich2]
#setup_class = starcluster.plugins.mpich2.MPICH2Setup

[cluster smallcluster]
#plugins = mpich2
# change this to the name of one of the keypair sections defined above
KEYNAME = mykey
# number of ec2 instances to launch
CLUSTER_SIZE = 10
# create the following user on the cluster
CLUSTER_USER = ***
# optionally specify shell (defaults to bash)
# (options: tcsh, zsh, csh, bash, ksh)
CLUSTER_SHELL = bash
# Uncomment to prepent the cluster tag to the dns name of all nodes created
# using this cluster config.  ie: mycluster-master and mycluster-node001
# If you choose to enable this option, it's recommended that you enable it in
# the DEFAULT_TEMPLATE so all nodes will automatically have the prefix
# DNS_PREFIX = True
# AMI to use for cluster nodes. These AMIs are for the us-east-1 region.
# Use the 'listpublic' command to list StarCluster AMIs in other regions
# The base i386 StarCluster AMI is ami-9bf9c9f2
# The base x86_64 StarCluster AMI is ami-3393a45a
# The base HVM StarCluster AMI is ami-6b211202
NODE_IMAGE_ID = ****
# instance type for all cluster nodes
# (options: m3.large, c3.8xlarge, i2.8xlarge, t2.micro, hs1.8xlarge, c1.xlarge, r3.4xlarge, g2.2xlarge, m1.small, c1.medium, m3.2xlarge, c3.2xlarge, m2.xlarge, m2.2xlarge, t2.small, r3.2xlarge, t1.micro, cr1.8xlarge, r3.8xlarge, cc1.4xlarge, m1.medium, r3.large, c3.xlarge, i2.xlarge, m3.medium, cc2.8xlarge, m1.large, cg1.4xlarge, i2.2xlarge, c3.large, i2.4xlarge, c3.4xlarge, r3.xlarge, t2.medium, hi1.4xlarge, m2.4xlarge, m1.xlarge, m3.xlarge)
NODE_INSTANCE_TYPE = m3.large
# Launch cluster in a VPC subnet (OPTIONAL)
SUBNET_ID=subnet-86f506ac
# Uncomment to assign public IPs to cluster nodes (VPC-ONLY) (OPTIONAL)
# WARNING: Using public IPs with a VPC requires:
# 1. An internet gateway attached to the VPC
# 2. A route table entry linked to the VPC's internet gateway and associated
#    with the VPC subnet with a destination CIDR block of 0.0.0.0/0
# WARNING: Public IPs allow direct access to your VPC nodes from the internet
#PUBLIC_IPS=True
# Uncomment to disable installing/configuring a queueing system on the
# cluster (SGE)
#DISABLE_QUEUE=True
# Uncomment to specify a different instance type for the master node (OPTIONAL)
# (defaults to NODE_INSTANCE_TYPE if not specified)
#MASTER_INSTANCE_TYPE = m1.small
# Uncomment to specify a separate AMI to use for the master node. (OPTIONAL)
# (defaults to NODE_IMAGE_ID if not specified)
#MASTER_IMAGE_ID = ami-3393a45a (OPTIONAL)
# availability zone to launch the cluster in (OPTIONAL)
# (automatically determined based on volumes (if any) or
# selected by Amazon if not specified)
#AVAILABILITY_ZONE = us-east-1c
# list of volumes to attach to the master node (OPTIONAL)
# these volumes, if any, will be NFS shared to the worker nodes
# see "Configuring EBS Volumes" below on how to define volume sections
#VOLUMES = oceandata, biodata
# list of plugins to load after StarCluster's default setup routines (OPTIONAL)
# see "Configuring StarCluster Plugins" below on how to define plugin sections
#PLUGINS = myplugin, myplugin2
# list of permissions (or firewall rules) to apply to the cluster's security
# group (OPTIONAL).
#PERMISSIONS = ssh, http
# Uncomment to always create a spot cluster when creating a new cluster from
# this template. The following example will place a $0.50 bid for each spot
# request.
#SPOT_BID = 0.50
# Uncomment to specify one or more userdata scripts to use when launching
# cluster instances. Supports cloudinit. All scripts combined must be less than
# 16KB
#USERDATA_SCRIPTS = /path/to/script1, /path/to/script2

###########################################
## Defining Additional Cluster Templates ##
###########################################
# You can also define multiple cluster templates. You can either supply all
# configuration options as with smallcluster above, or create an
# EXTENDS=<cluster_name> variable in the new cluster section to use all
# settings from <cluster_name> as defaults. Below are example templates that
# use the EXTENDS feature:

# [cluster mediumcluster]
# Declares that this cluster uses smallcluster as defaults
# EXTENDS=smallcluster
# This section is the same as smallcluster except for the following settings:
# KEYNAME=myotherkey
# NODE_INSTANCE_TYPE = c1.xlarge
# CLUSTER_SIZE=8
VOLUMES = dataABC

# [cluster largecluster]
# Declares that this cluster uses mediumcluster as defaults
# EXTENDS=mediumcluster
# This section is the same as mediumcluster except for the following variables:
# CLUSTER_SIZE=16

#############################
## Configuring EBS Volumes ##
#############################
# StarCluster can attach one or more EBS volumes to the master and then
# NFS_share these volumes to all of the worker nodes. A new [volume] section
# must be created for each EBS volume you wish to use with StarCluser. The
# section name is a tag for your volume. This tag is used in the VOLUMES
# setting of a cluster template to declare that an EBS volume is to be mounted
# and nfs shared on the cluster. (see the commented VOLUMES setting in the
# example 'smallcluster' template above) Below are some examples of defining
# and configuring EBS volumes to be used with StarCluster:

# Sections starting with "volume" define your EBS volumes
[volume dataABC]
VOLUME_ID = vol-0cb816a370ecc4f13
MOUNT_PATH = /data

# Same volume as above, but mounts to different location
# [volume biodata2]
# VOLUME_ID = vol-c999999
# MOUNT_PATH = /opt/

# Another volume example
# [volume oceandata]
# VOLUME_ID = vol-d7777777
# MOUNT_PATH = /mydata

# By default StarCluster will attempt first to mount the entire volume device,
# failing that it will try the first partition. If you have more than one
# partition you will need to set the PARTITION number, e.g.:
# [volume oceandata]
# VOLUME_ID = vol-d7777777
# MOUNT_PATH = /mydata
# PARTITION = 2

############################################
## Configuring Security Group Permissions ##
############################################
# Sections starting with "permission" define security group rules to
# automatically apply to newly created clusters. IP_PROTOCOL in the following
# examples can be can be: tcp, udp, or icmp. CIDR_IP defaults to 0.0.0.0/0 or
# "open to the # world"

# open port 80 on the cluster to the world
# [permission http]
# IP_PROTOCOL = tcp
# FROM_PORT = 80
# TO_PORT = 80

# open https on the cluster to the world
# [permission https]
# IP_PROTOCOL = tcp
# FROM_PORT = 443
# TO_PORT = 443

# open port 80 on the cluster to an ip range using CIDR_IP
# [permission http]
# IP_PROTOCOL = tcp
# FROM_PORT = 80
# TO_PORT = 80
# CIDR_IP = 18.0.0.0/8

# restrict ssh access to a single ip address (<your_ip>)
# [permission ssh]
# IP_PROTOCOL = tcp
# FROM_PORT = 22
# TO_PORT = 22
# CIDR_IP = <your_ip>/32

#####################################
## Configuring StarCluster Plugins ##
#####################################
# Sections starting with "plugin" define a custom python class which perform
# additional configurations to StarCluster's default routines. These plugins
# can be assigned to a cluster template to customize the setup procedure when
# starting a cluster from this template (see the commented PLUGINS setting in
# the 'smallcluster' template above). Below is an example of defining a user
# plugin called 'myplugin':

# [plugin myplugin]
# NOTE: myplugin module must either live in ~/.starcluster/plugins or be
# on your PYTHONPATH
# SETUP_CLASS = myplugin.SetupClass
# extra settings are passed as __init__ arguments to your plugin:
# SOME_PARAM_FOR_MY_PLUGIN = 1
# SOME_OTHER_PARAM = 2

######################
## Built-in Plugins ##
######################
# The following plugins ship with StarCluster and should work out-of-the-box.
# Uncomment as needed. Don't forget to update your PLUGINS list!
# See http://star.mit.edu/cluster/docs/latest/plugins for plugin details.
#
# Use this plugin to install one or more packages on all nodes
[plugin pkginstaller]
SETUP_CLASS = starcluster.plugins.pkginstaller.PackageInstaller
# # list of apt-get installable packages
# PACKAGES = mongodb, python-pymongo
#
# Use this plugin to create one or more cluster users and download all user ssh
# keys to $HOME/.starcluster/user_keys/<cluster>-<region>.tar.gz
# [plugin createusers]
# SETUP_CLASS = starcluster.plugins.users.CreateUsers
# NUM_USERS = 30
# # you can also comment out NUM_USERS and specify exact usernames, e.g.
# # usernames = linus, tux, larry
# DOWNLOAD_KEYS = True
#
# Use this plugin to configure the Condor queueing system
# [plugin condor]
# SETUP_CLASS = starcluster.plugins.condor.CondorPlugin
#
# The SGE plugin is enabled by default and not strictly required. Only use this
# if you want to tweak advanced settings in which case you should also set
# DISABLE_QUEUE=TRUE in your cluster template. See the plugin doc for more
# details.
[plugin sge]
SETUP_CLASS = starcluster.plugins.sge.SGEPlugin
MASTER_IS_EXEC_HOST = False
#
# The IPCluster plugin configures a parallel IPython cluster with optional
# web notebook support. This allows you to run Python code in parallel with low
# latency message passing via ZeroMQ.
# [plugin ipcluster]
# SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster
# # Enable the IPython notebook server (optional)
# ENABLE_NOTEBOOK = True
# # Set a password for the notebook for increased security
# # This is optional but *highly* recommended
# NOTEBOOK_PASSWD = a-secret-password
# # Set a custom directory for storing/loading notebooks (optional)
# NOTEBOOK_DIRECTORY = /path/to/notebook/dir
# # Set a custom packer. Must be one of 'json', 'pickle', or 'msgpack'
# # This is optional.
# PACKER = pickle
#
# Use this plugin to create a cluster SSH "dashboard" using tmux. The plugin
# creates a tmux session on the master node that automatically connects to all
# the worker nodes over SSH. Attaching to the session shows a separate window
# for each node and each window is logged into the node via SSH.
# [plugin tmux]
# SETUP_CLASS = starcluster.plugins.tmux.TmuxControlCenter
#
# Use this plugin to change the default MPI implementation on the
# cluster from OpenMPI to MPICH2.
# [plugin mpich2]
# SETUP_CLASS = starcluster.plugins.mpich2.MPICH2Setup
#
# Configure a hadoop cluster. (includes dumbo setup)
# [plugin hadoop]
# SETUP_CLASS = starcluster.plugins.hadoop.Hadoop
#
# Configure a distributed MySQL Cluster
# [plugin mysqlcluster]
# SETUP_CLASS = starcluster.plugins.mysql.MysqlCluster
# NUM_REPLICAS = 2
# DATA_MEMORY = 80M
# INDEX_MEMORY = 18M
# DUMP_FILE = test.sql
# DUMP_INTERVAL = 60
# DEDICATED_QUERY = True
# NUM_DATA_NODES = 2
#
# Install and setup an Xvfb server on each cluster node
# [plugin xvfb]
# SETUP_CLASS = starcluster.plugins.xvfb.XvfbSetup

The /etc/exports file in my AMI image is set as follows:

Selection_002

3. Start the cluster

Once correctly setup, starting a new MPI cluster is very easy:

starcluster start myclusterABC

Notice that command creates a new cluster, and if the same name cluster has been created above, it will pops up an error.  To overwrite the error, go to security group to delete the group @sc-myclusterABC before recreating it.

To restart an already created, but powered-off, cluster, use the following command:

starcluster start -x myclusterABC

To ssh/shutdown/terminate a running cluster, use the commands described on the manual webpage (http://star.mit.edu/cluster/docs/0.95.2/manual/launch.html).

After the cluster has been successfully setup, my AWS console looks like the following.

Selection_001

4. Running MPI code for Temporal Pattern mining

Our temporal pattern mining algorithm is written on MPI4py, so it can be run in the default setting of StarCluster.  I also preinstalled MPI4py and other relevant python packages in the AMI image so I can directly fire up the cluster and run.

Before running the MPI program, I needed to create as a cluster configuration file for mpi command, simply I copied the content from /etc/hosts on any node of the MPI cluster and saved it on the shared NFS folder.

The /etc/hosts file looks looks like following:

Selection_003

And I changed its content as an mpihosts.txt file as follows:

Selection_004

4.1  Supervised Pattern Analysis: MPI is 9 times faster than non-MPI

Next, I executed the supervised learning version of our pattern mining algorithm using

time /home/ubuntu/anaconda2/bin/mpirun --hostfile mpihosts.txt -np 10 /home/ubuntu/anaconda2/bin/python findPatterns.py

where findpatterns.py was our data mining code extracting the significant temporal patterns from insurance claim data. The details of the algorithm will be explained in future articles.

The MPI process took 26 seconds to finish and found 15 significant patterns.

Selection_006

To compare the performance, I executed the algorithm on a single node (master node) and it took 3 min 40 seconds to finish.  The results were identical.

time /home/ubuntu/anaconda2/bin/python findPatterns.py

Selection_007

4.2  Unsupervised Pattern Analysis: MPI is >200 times faster than non-MPI

Our algorithm can also extract temporal patterns through unsupervised learning, which is a more computational intensive process than supervised analysis.  I reran the algorithm on the same data set by turning off the supervised control flag (to ignore the given labels).  I had to increase the size of cluster to m3.xlarge because it demands more memory than supervised learning. Without using a MPI cluster, I waited for 1 hour and it still didn’t finish.  So I terminated that process, started with the MPI version instead, and found 223 temporal patterns in 17 seconds.

Selection_009Selection_010

5. Conclusion

Unlike academia and research labs, nowadays commercial organizations rarely deploy MPI clusters as a general computational platform.  In order to handle a specific type of problems which not necessarily have big data, but involve huge search spaces resulted from combinatorial explosions,  MPIs still have significant advantages.  This article recommended an open source tool — StarCluster — and demonstrated how to quickly setup a customizable MPI cluster in AWS to improve the efficiency of temporal pattern extraction.

In the next article, I will introduce the mathematical problem of Temporal Pattern Analysis and several interesting models.

Advertisements