Sunday, March 6, 2011

Mahout and Python Integration Using JPype

I recently used Mahout to cluster some data using KMeans. After running Mahout, I found myself wondering how to get the output out of the sequence files Mahout created and into Python which I use for data analysis and plotting.

I decided to see if I could use JPype to read the sequence files directly from Python. This turned out to be quite easy so I decided to post some helpful instruction on how to do it.

JPython wasn't an option for me, because (to the best of my knowledge) JPython doesn't work with Python extensions numpy, matplotlib, or h5py which I rely on heavily.

The instructions below explain how to setup a python script to read and write the output of Mahout clustering.

You will first need to download and install the JPype package for python.

The first step to setting up JPype is determining the path to the dynamic library for the jvm ; on linux this will be a .so file on and on windows it will be a .dll.

In your python script, create a global variable with the path to this dll
jvmlib="/usr/java/jdk1.6.0_23/jre/lib/amd64/server/libjvm.so"

Next we need to figure out how we need to set the classpath for mahout. The easiest way to do this is to edit the script in "bin/mahout" to print out the classpath. Add the line "echo $CLASSPATH" to the script somewhere after the comment "run it" (this is line 195 or so). Execute the script to print out the classpath. Copy this output and paste it into a variable in your python script. The result for me looks like the following

classpath="/usr/local/programs/svn_mahout/conf::/usr/java/jdk1.6.0_23/lib/tools.jar:/usr/local/programs/svn_mahout/mahout-*.jar:/usr/local/programs/svn_mahout/core/target/mahout-core-0.5-SNAPSHOT-job.jar:/usr/local/programs/svn_mahout/examples/target/mahout-examples-0.5-SNAPSHOT-job.jar:/usr/local/programs/svn_mahout/lib/*.jar:/usr/local/programs/svn_mahout/examples/target/dependency/cglib-nodep-2.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-beanutils-1.7.0.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-cli-1.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-cli-2.0-mahout.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-codec-1.3.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-collections-3.2.1.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-dbcp-1.2.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-digester-1.7.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-httpclient-3.1.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-lang-2.4.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-logging-1.1.1.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-math-1.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/commons-pool-1.4.jar:/usr/local/programs/svn_mahout/examples/target/dependency/easymock-2.5.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/easymockclassextension-2.5.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/google-collections-1.0-rc2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/gson-1.3.jar:/usr/local/programs/svn_mahout/examples/target/dependency/guava-r03.jar:/usr/local/programs/svn_mahout/examples/target/dependency/hadoop-core-0.20.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/hbase-0.20.0.jar:/usr/local/programs/svn_mahout/examples/target/dependency/jets3t-0.7.1.jar:/usr/local/programs/svn_mahout/examples/target/dependency/junit-4.7.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-analyzers-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-benchmark-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-core-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-demos-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-highlighter-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-memory-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/lucene-wikipedia-3.0.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/mahout-collections-1.0.jar:/usr/local/programs/svn_mahout/examples/target/dependency/mahout-core-0.5-SNAPSHOT.jar:/usr/local/programs/svn_mahout/examples/target/dependency/mahout-core-0.5-SNAPSHOT-tests.jar:/usr/local/programs/svn_mahout/examples/target/dependency/mahout-math-0.5-SNAPSHOT.jar:/usr/local/programs/svn_mahout/examples/target/dependency/mahout-math-0.5-SNAPSHOT-tests.jar:/usr/local/programs/svn_mahout/examples/target/dependency/mahout-utils-0.5-SNAPSHOT.jar:/usr/local/programs/svn_mahout/examples/target/dependency/objenesis-1.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/slf4j-api-1.6.0.jar:/usr/local/programs/svn_mahout/examples/target/dependency/slf4j-jcl-1.6.0.jar:/usr/local/programs/svn_mahout/examples/target/dependency/uncommons-maths-1.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/watchmaker-framework-0.6.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/watchmaker-swing-0.6.2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/xml-apis-1.0.b2.jar:/usr/local/programs/svn_mahout/examples/target/dependency/xpp3_min-1.1.4c.jar:/usr/local/programs/svn_mahout/examples/target/dependency/xstream-1.3.1.jar"

Now we can create a function to start the jvm in python using jype

from jpype import *
jvm=None
def start_jpype():
 global jvm
 if (jvm is None):  
  cpopt="-Djava.class.path={cp}".format(cp=classpath)
  startJVM(jvmlib,"-ea",cpopt)
  jvm="started"


We can now use JPype to create sequence files which will contain vectors to be used by Mahout for kmeans. The example below is a function which creates vectors from two Gaussian distributions with unit variance.

def create_inputs(ifile,*args,**param):
 """Create a sequence file containing some normally distributed
        ifile - path to the sequence file to create
 """
 
 #matrix of the cluster means
 cmeans=np.array([[1,1],[-1,-1]],np.int)
 
 nperc=30  #number of points per cluster
 
 vecs=[]
 
 vnames=[]
 for cind in range(cmeans.shape[0]):
  pts=np.random.randn(nperc,2)
  pts=pts+cmeans[cind,:].reshape([1,cmeans.shape[1]])
  vecs.append(pts)
 
  #names for the vectors
  #names are just the points with an index
  #we do this so we can validate by cross-refencing the name with the vector
  vn=np.empty(nperc,dtype=(np.str,30))
  for row in range(nperc):
   vn[row]="c"+str(cind)+"_"+pts[row,0].astype((np.str,4))+"_"+pts[row,1].astype((np.str,4))
  vnames.append(vn)
  
 vecs=np.vstack(vecs)
 vnames=np.hstack(vnames)
 

 #start the jvm
 start_jpype()
 
 #create the sequence file that we will write to
 io=JPackage("org").apache.hadoop.io 
 FileSystemCls=JPackage("org").apache.hadoop.fs.FileSystem
 
 PathCls=JPackage("org").apache.hadoop.fs.Path
 path=PathCls(ifile)

 ConfCls=JPackage("org").apache.hadoop.conf.Configuration 
 conf=ConfCls()
 
 fs=FileSystemCls.get(conf)
 
 #vector classes
 VectorWritableCls=JPackage("org").apache.mahout.math.VectorWritable
 DenseVectorCls=JPackage("org").apache.mahout.math.DenseVector
 NamedVectorCls=JPackage("org").apache.mahout.math.NamedVector
 writer=io.SequenceFile.createWriter(fs, conf, path, io.Text,VectorWritableCls)
 
 
 vecwritable=VectorWritableCls()
 for row in range(vecs.shape[0]):
  nvector=NamedVectorCls(DenseVectorCls(JArray(JDouble,1)(vecs[row,:])),vnames[row])
  #need to wrap key and value because of overloading
  wrapkey=JObject(io.Text("key "+str(row)),io.Writable)
  wrapval=JObject(vecwritable,io.Writable)
  
  vecwritable.set(nvector)
  writer.append(wrapkey,wrapval)
  
 writer.close()

Similarly we can use JPype to easily read the clustered points outputted by mahout.
def read_clustered_pts(ifile,*args,**param):
 """Read the clustered points
 ifile - path to the sequence file containing the clustered points
 """ 

 #start the jvm
 start_jpype()
 
 #create the sequence file that we will write to
 io=JPackage("org").apache.hadoop.io 
 FileSystemCls=JPackage("org").apache.hadoop.fs.FileSystem
 
 PathCls=JPackage("org").apache.hadoop.fs.Path
 path=PathCls(ifile)

 ConfCls=JPackage("org").apache.hadoop.conf.Configuration 
 conf=ConfCls()
 
 fs=FileSystemCls.get(conf)
 
 #vector classes
 VectorWritableCls=JPackage("org").apache.mahout.math.VectorWritable
 NamedVectorCls=JPackage("org").apache.mahout.math.NamedVector
 
 
 ReaderCls=io.__getattribute__("SequenceFile$Reader") 
 reader=ReaderCls(fs, path,conf)
 

 key=reader.getKeyClass()()
 

 valcls=reader.getValueClass()
 vecwritable=valcls()
 while (reader.next(key,vecwritable)):  
  weight=vecwritable.getWeight()
  nvec=vecwritable.getVector()
  
  cname=nvec.__class__.__name__
  if (cname.rsplit('.',1)[1]=="NamedVector"):  
   print "cluster={key} Name={name} x={x} y={y}".format(key=key.toString(),name=nvec.getName(),x=nvec.get(0),y=nvec.get(1))
  else:
   raise NotImplementedError("Vector isn't a NamedVector. Need to modify/test the code to handle this case.")


Finally we can create a function to print out the actual cluster centers found by mahout,
def getClusters(ifile,*args,**param):
 """Read the centroids from the clusters outputted by kmenas
           ifile - Path to the sequence file containing the centroids
 """ 

 #start the jvm
 start_jpype()
 
 #create the sequence file that we will write to
 io=JPackage("org").apache.hadoop.io 
 FileSystemCls=JPackage("org").apache.hadoop.fs.FileSystem
 
 PathCls=JPackage("org").apache.hadoop.fs.Path
 path=PathCls(ifile)

 ConfCls=JPackage("org").apache.hadoop.conf.Configuration 
 conf=ConfCls()
 
 fs=FileSystemCls.get(conf)
 
 #vector classes
 VectorWritableCls=JPackage("org").apache.mahout.math.VectorWritable
 NamedVectorCls=JPackage("org").apache.mahout.math.NamedVector
 ReaderCls=io.__getattribute__("SequenceFile$Reader")
 reader=ReaderCls(fs, path,conf)
 

 key=io.Text()
 

 valcls=reader.getValueClass()

 vecwritable=valcls()
 
 while (reader.next(key,vecwritable)):  
  center=vecwritable.getCenter()
  
  print "id={cid} center={center}".format(cid=vecwritable.getId(),center=center.values)
  pass

1 comment:

  1. AttributeError: 'org.apache.mahout.math.VectorWritable' object has no attribute 'getWeight'

    Can you repair this example? I don't understand, but want to use Python with Mahout

    ReplyDelete