Outlier Detection From Large Scale Categorical Breast Cancer Datasets Using Spark 2.0.0: Pa...

Datetime:2016-08-23 01:33:00          Topic: Spark           Share

If you missed Part I, you can read ithere!

Developing Large-Scale Outlier Application with Spark Core

As I have already mentioned that the details of the attributes found in WDBC dataset: ID number, Diagnosis (M = malignant, B = benign) and ten real-valued features are computed for each cell nucleus: Radius, Texture, Perimeter, Area, Smoothness, Compactness, Concavity, Concave points, Symmetry and Fractal dimension [ 6 ]. For the dataset description in detail, please refer the first part of this title here.

These features are computed from a digitized image of a fine needle aspirate (FNA) of a breast mass. They describe characteristics of the cell nuclei present in the corresponding images. Well, we have enough knowing about the dataset up to this point, I believe.

Now, I will show how to develop a breast cancer diagnosis outlier pipeline step by step. Please note that I have used Spark 2.0.0 using Java for the demonstration. Some conceptual ideas have been referred from [ 7 ] and re-implemented with  Spark 2.0.0 . The complete source code including the Maven friendly pom.xml file and the dataset can be downloaded from my GitHub repositories here. 

Step-1: Import necessary packages/libraries/APIs

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import com.example.SparkSession.UtilityForSparkSession;
import scala.Tuple2;

Step-2: Create the Spark entry point by a Spark session

SparkSession spark = UtilityForSparkSession.mySession();

Here is the UtilityForSparkSession class that creates and returns a Spark session:

import org.apache.spark.sql.SparkSession;
public class UtilityForSparkSession {
    public static SparkSession mySession() {
        SparkSession spark = SparkSession.builder()
                .appName("UtilityForSparkSession")
                .master("local[*]") //You might want to set your driver IP:PORT acordingly
                .config("spark.sql.warehouse.dir", "E:/Exp/") //set your warehouse accordingly. 
                .getOrCreate(); 
        return spark;
    }
}

Step-3: create the first RDD by reading input  

Please note that the Spark Context initiated with Spark session does  not allow JavaRDD to be created but only the RDD of String. However, don't worry, I will convert the same RDD as JavaRDD when required.

Here the number of partition is 1. You should increase/decrease the number accordingly based on your dataset size or memory/disk capacity.

final String inputPath = "breastcancer/input/wdbc.data"; //Input path
RDD<String> lines_of_patient_records = spark.sparkContext().textFile(inputPath, 1);  
lines_of_patient_records.cache(); // Cache it if you have enough memory space; since, we will be using the same in next few steps

Step-4: Perform the flatMapToPair() to create the lines_of_patient_records RDD

JavaPairRDD<String,Integer> patient = lines_of_patient_records.toJavaRDD().flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String,Integer>> call(String row) { //For each line as variable 'row'     
                List<Tuple2<String,Integer>> results = new ArrayList<Tuple2<String,Integer>>();
                // As shown in above figure, a record has the following format: <Patient-id><,><feature_point1><,><feature_point2><,><feature_point3><,>...
                String[] tokens = row.split(","); //i.e. features
                for (int i = 1; i < tokens.length; i++) { //Except the first entry i.e. Patient-ID
                    results.add(new Tuple2<String,Integer>(tokens[i], 1));
                }
                return results.iterator();
            }
        });
/* Sample output should be like this (against 1 record i.e. 1 line):
        (M,1)
        (17.99,1)
        (10.38,1)
        (122.8,1)
        (1001,1)
        (0.1184,1)
        (0.2776,1)
        (0.3001,1)
        (0.1471,1)
        (0.2419,1)
        (0.07871,1)
        (1.095,1)
        (0.9053,1)
        (8.589,1)
        (153.4,1)
        (0.006399,1)
        (0.04904,1)
        (0.05373,1)
        (0.01587,1)
        (0.03003,1)
        (0.006193,1)
        (25.38,1)
        (17.33,1)
        (184.6,1)
        (2019,1)
        (0.1622,1)
        (0.6656,1)
        (0.7119,1)
        (0.2654,1)
        (0.4601,1)
        (0.1189,1)
        */

Step-5: Count the frequency of the categorical data

Now, we need to count the frequency of all the feature-point including the label of course. However, we need to keep the categorical data or features as String [ 7 ]. Since the population contains both the numerics as well as characters data.

JavaPairRDD<String, Integer> count_frequency_by_reducebykey = patient.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

Now you might be interested in saving the RDD on HDFS or your local file system using:

count_frequency_by_reducebykey.saveAsTextFile("output/ReducedRDD");

If you read the above text file, the RDD should contain the following values:

/*
         * This should produced the output as:
        (M,212)
        (17.99,2)
        (10.38,1)
          .....
        (0.2776,2)
          ....
        (0.1189,2)
         *
         */

Step-6: Prepare a buffer Map for storing the Attribute Value Frequency (AVF) scores

Make sure that you have not used the HashMap, otherwise, you will miss the duplicated entries. Rather use a normal map with String and Integer so that it can hold  (key, value) pair => (feature, frequency).

final Map<String, Integer> AVPMap = count_frequency_by_reducebykey.collectAsMap();

Step-7: compute the AVF Scores using the buffer map (i.e. AVFMap)

Here , according to [7], for each input record of: <Patient-ID><,><feature_point1><,><feature_point2><,><feature_point3><,>.... The PairFunction<T, K, V>    T => Tuple2<K, V>: T = input record (i.e. line), K = Patient-ID, V = AVF score. 

JavaPairRDD<String,Double> AVF = lines_of_patient_records.toJavaRDD().mapToPair(new PairFunction<String, String, Double>() {
            @Override
            public Tuple2<String,Double> call(String rec) {  
                String[] tokens = rec.split(",");
                String recordID = tokens[0];  //Patient ID who is getting diagnosed
                int sum = 0;
                for (int i = 1; i < tokens.length; i++) {
                    sum += AVPMap.get(tokens[i]);
                }
                double N = (double) (tokens.length -1); //We are excluding the first entry i.e. patientID
                double AVFScore = ((double) sum) / N;
                return new Tuple2<String,Double>(recordID, AVFScore);
            }
        });

Step-8: Compute the lowest K AVF scores in descending order

The build in Java takeOrdered() method takes two parameters: K  and a comparator instance that returns the first K (smallest) elements from this RDD using the natural ordering for T while maintaining the order [7].

class TupleComparatorDescending implements Comparator<Tuple2<String,Double>>, Serializable {
final static TupleComparatorDescending INSTANCE = new TupleComparatorDescending();
   @Override
   public int compare(Tuple2<String,Double> t1, Tuple2<String,Double> t2) {
      return -(t1._2.compareTo(t2._2)); // sort based on AVF Score in descending order
   }
}

The TupleComparatorAscending class will be described at the end of step 10. The above code segment should print the following value for K = 30:

Output: => (PaitentID, AVF score)
            Descending AVF Score
            Outliers:
            (871642,28.225806451612904)
            (923748,28.096774193548388)
            (903483,28.032258064516128)
            (872113,28.0)
            (862722,27.967741935483872)
            (875099,27.967741935483872)
            (925236,27.903225806451612)
            (894047,27.903225806451612)
            (868999,27.870967741935484)
            (925311,27.870967741935484)
            (921092,27.838709677419356)
            (92751,27.806451612903224)
            (9113846,27.64516129032258)
            (91544002,14.096774193548388)
            (865432,13.935483870967742)
            (863031,13.870967741935484)
            (917062,13.838709677419354)
            (8912284,13.806451612903226)
            (897880,13.774193548387096)
            (905502,13.774193548387096)
            (9113239,13.774193548387096)
            (891716,13.774193548387096)
            (912519,13.774193548387096)
            (918192,13.774193548387096)
            (874839,13.741935483870968)
            (86973702,13.741935483870968)
            (9113455,13.741935483870968)
            (903011,13.741935483870968)
            (911384,13.709677419354838)
            (869931,13.709677419354838)

If you analyse the above output, you will see that the AVF scores fall in 4 clusters (i.e. 28, 27, 14 and 13). It is also clearly seen that Patient-ID 91544002 falls in the cluster 14 which is a minority cluster. Therefore, it can be referred as an outlier. This also signifies that there might have some chances of breast cancer based on the data.

Step-9: Compute the lowest K AVF scores in ascending order

The below code segments prints the lowest K AVF scores in an ascending order for K =30.

List<Tuple2<String,Double>> outliers = AVF.takeOrdered(K, TupleComparatorAscending.INSTANCE);       
        System.out.println("Ascending AVF Score");
        System.out.println("Outliers: ");
        for(Iterator<Tuple2<String, Double>> it = outliers.iterator(); it.hasNext();)
        {
        System.out.println(it.next());
        }

The above code segment should print the AVF scores as follows:

Output: => (PaitentID, AVF score)
Ascending AVF Score
Outliers:
(854002,8.129032258064516)
(926954,8.225806451612904)
(905680,8.258064516129032)
(916799,8.290322580645162)
(873592,8.32258064516129)
(877486,8.32258064516129)
(901088,8.32258064516129)
(852552,8.35483870967742)
(854253,8.35483870967742)
(877989,8.387096774193548)
(911296202,8.387096774193548)
(9113538,8.387096774193548)
(85382601,8.387096774193548)
(88299702,8.387096774193548)
(913535,8.387096774193548)
(879523,8.387096774193548)
(884948,8.387096774193548)
(889719,8.387096774193548)
(8911163,8.387096774193548)
(845636,8.419354838709678)
(888570,8.419354838709678)
(8610404,8.419354838709678)
(89742801,8.419354838709678)
(842517,8.419354838709678)
(85638502,8.419354838709678)
(927241,8.419354838709678)
(881046502,8.451612903225806)
(88119002,8.451612903225806)
(855625,8.451612903225806)
(8953902,8.451612903225806)

Step-10: Stop the Spark session

Stop the Spark session when all the computations have been completed by calling the stop() method as follows:

spark.stop();

Auxiliary Classes

As mentioned earlier, the following two classes - i.e. TupleComparatorAscending and  TupleComparatorDescending are shown below which is re-used from [ 7 ].  I would like to thank Mr. Mahmoud Parsian for making the source code available from an excellent book title "Data Algorithm: Recipes for Scaling Up with Hadoop and Spark, O'Reilly Media, 2015". 

The TupleComparatorDescending Class

This class sorts based on the AVF Score in a descending order.

class TupleComparatorDescending implements Comparator<Tuple2<String,Double>>, Serializable {
final static TupleComparatorDescending INSTANCE = new TupleComparatorDescending();
   @Override
   public int compare(Tuple2<String,Double> t1, Tuple2<String,Double> t2) {
      return -(t1._2.compareTo(t2._2)); // sort based on AVF Score in a descending order
   }
}

The TupleComparatorAscending class

This class sorts based on the AVF Score in an ascending order.

class TupleComparatorAscending implements Comparator<Tuple2<String,Double>>, Serializable {
final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending();
   @Override
   public int compare(Tuple2<String,Double> t1,Tuple2<String,Double> t2) {
      return (t1._2.compareTo(t2._2)); // sort based on AVF Score
   }
}

The complete source codes including the Maven friendly pom.xml and the dataset is available on my GitHub repositories here. 

Conclusion

The outlier detection method that I have shown can be extended and applied to other sorts of applications like credit card fraud detection, network anomaly detection etc. However, to handle unstructured and multidimensional datasets non-trivial amounts of efforts need to be applied especially for the pre-processing, and feature engineering step; including feature extraction and feature selection.

Furthermore, the performance of the above algorithm can certainly be improved with proper model machine learning model tuning. Unfortunately, the current implementation of Spark provides the Hyperparameter tuning through model selection via Cross-validation and TranSplit. Interested readers should refer the Spark website for regular updates. 

In my next post, I will show how to implement the same application for TCGA dataset using Dataset<Row> which is a unified data abstraction of DataFrame and Dataset now in Spark 2.0.0. The reason is, of course, I found using the Dataset is more convenient and lightweight and computationally faster compared to RDD.





About List