Machine learning (ML) is the term that refers to learning patterns in the data. In other words, ML can infer the pattern or nontrivial relationship between a set of observations and a desired response.
Random Forest
A simple solution to learn this pattern is to construct a decision-tree representation of the data. Nodes of the decision tree are attributes, and branches are possible attribute values. The leaf nodes comprise a classification.
A generalized form is known as the random forest, which is a forest of trees. A forest can be built as a set of multiple decision trees and avoid pruning for generalization. The final outcome of the RF algorithm is the mean value in case of regression or the class with maximum votes for classification. The RF algorithm also has greater accuracy across domains compared to the decision tree.
Classic examples of unsupervised learning are clustering and dimensionality reduction. Common clustering algorithms include k-means (based on centroid models), a mixture of Gaussians, hierarchical clustering (based on connectivity models), and an expectation maximization algorithm (which uses a multivariate normal distribution model). The various dimensionality reduction techniques include the factor analysis, principal component analysis (PCA), independent component analysis (ICA), and so forth. The Hidden Markov Models (HMMs) have been a useful approach for unsupervised learning for time-series data
Logistic Regression Algorithm in Spark
Random Forest
A simple solution to learn this pattern is to construct a decision-tree representation of the data. Nodes of the decision tree are attributes, and branches are possible attribute values. The leaf nodes comprise a classification.
A generalized form is known as the random forest, which is a forest of trees. A forest can be built as a set of multiple decision trees and avoid pruning for generalization. The final outcome of the RF algorithm is the mean value in case of regression or the class with maximum votes for classification. The RF algorithm also has greater accuracy across domains compared to the decision tree.
Classic examples of unsupervised learning are clustering and dimensionality reduction. Common clustering algorithms include k-means (based on centroid models), a mixture of Gaussians, hierarchical clustering (based on connectivity models), and an expectation maximization algorithm (which uses a multivariate normal distribution model). The various dimensionality reduction techniques include the factor analysis, principal component analysis (PCA), independent component analysis (ICA), and so forth. The Hidden Markov Models (HMMs) have been a useful approach for unsupervised learning for time-series data
Logistic Regression Algorithm in Spark
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).
cache();
spark-shell --master yarn-client
spark-shell --master local[2]
where N is the number of threads to run.
you should run the :help command to list available commands in the shell. :history and :h? :paste can help you correctly insert code from the clipboard.
sc.[\t]
One of the simplest ways to create an RDD is to use the parallelize method on SparkContext with a local collection of objects:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
Variables that are prefaced with val are immutable, and may not be changed to refer to another value once they are assigned, while variables that are prefaced with var may be changed to refer to different objects of the same type.
we are allowed to re-declare the same immutable variable.
rawblocks.first
we can use the collect method to return all of the contents of an RDD to the client as an array.
val head = rawblocks.take(10)
rdd.saveAsTextFile("hdfs:///user/ds/mynumbers")
head.foreach(println)
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
head.filter(isHeader).foreach(println)
head.filterNot(isHeader).length
head.filter(x => !isHeader(x)).length
Scala will allow us to use an underscore, _, to represent the argument to the anonymous function, so that we can save four characters:
head.filter(!isHeader(_)).length
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyCustomClass1], classOf[MyCustomClass2]))
spark-defaults.conf:
spark.kryo.classesToRegister=org.myorg.MyCustomClass1,org.myorg.MyCustomClass2
spark.serializer=org.apache.spark.serializer.KryoSerializer
implicit type conversion. Implicits work like this: if you call a method on a Scala object, and the Scala compiler does not see a definition for that method in the class definition for that object, then the compiler will try to convert your object to an instance of a class that does have that method defined.
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)
Creating Histograms
countByValue that performs this kind of computation very efficiently and returns the results to the client as a Map[T,Long].
val matchCounts = parsed.map(md => md.matched).countByValue()
Scala’s Map class does not have methods for sorting its contents on the keys or the values, we can convert a Map into a Scala Seq type, which does provide support for sorting.
val matchCountsSeq = matchCounts.toSeq
matchCountsSeq.sortBy(_._1).foreach(println)
matchCountsSeq.sortBy(_._2).reverse.foreach(println)
Summary Statistics For Continuous Variables
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
val stats = (0 until 9).map(i => {
parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
statsm.zip(statsn).map { case(m, n) =>
(m.missing + n.missing, m.stats.mean - n.stats.mean)
}.foreach(println)
Alternating Least Squares or ALS
These algorithms are types of latent-factor models. They try to explain observed interactions between large numbers of users and products through a relatively small number of unobserved, underlying reasons.
These algorithms are sometimes called matrix completion algorithms, because the original matrix A may be quite sparse, but the approximate product is completely dense. This is called a matrix factorization model, and is a model in the sense that it is predicting all of those missing values in sparse A.
As a matter of implementation, the algorithm repeatedly refines the contents of one matrix by solving a least-squares minimization problem that holds the other one fixed. Then it alternates, to refine the other matrix, and repeats iteratively. This is the source of its name. To begin the process, one matrix is filled with random feature vectors.
bin\spark-shell --driver-memory 2g --executor-memory 8g
The first step in building a model is to understand the data that is available, and parse or transform it into forms that are useful for analysis in Spark.
You can supply second argument to this method to specify a different and larger number of partitions. This might be set to match the number of cores in your cluster for example.
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
flatMap() is appropriate when each element maps to zero, one or more results. This allows the parsing to use Scala’s None and Some classes to optionally return a tuple, but not always:
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
Instead, a broadcast variable called artistAliasBC is created for artistAlias. This makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors, and many execute in parallel on each machine, this can save significant network traffic and memory.
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
model.userFeatures.mapValues(java.util.Arrays.toString).
take(1).foreach(println)
val existingProductIDs = rawUserArtistData.map(_.split(' ')).
filter(_(0).toInt == 2093760).map(_(1).toInt).collect().toSet
artistByID.filter(idName =>
existingProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
val recommendedProductIDs = recommendations.map(_.product).toSet
artistByID.filter(idName =>
recommendedProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
Typically, data is divided into three subsets: training, cross-validation (CV), and test sets.
regression refers to predicting a numeric quantity like size or income or temperature, while classification refers to predicting a label or category, like “spam” or “picture of a cat”.
These five features together, in order, are known as a feature vector.
The Spark MLlib abstraction for a feature vector is known as a LabeledPoint. LabeledPoint is only for numeric features. It can be used with categorical features, with appropriate encoding.
One such encoding is one-hot or 1-of-n encoding, in which one categorical feature that takes on N distinct values becomes N numeric features, each taking on the value 0 or 1.
val trainAndCVAndTestData = data.randomSplit(Array(0.8, 0.1, 0.1))
val trainData = trainAndCVAndTestData(0).cache()
val cvData = trainAndCVAndTestData(1).cache()
val testData = trainAndCVAndTestData(2).cache()
val model = DecisionTree.trainClassifier(
trainData, 7, Map[Int,Int](), "gini", 4, 100)
val predictionsAndLabels = cvData.map(example =>
(model.predict(example.features), example.label)
)
val metrics = new MulticlassMetrics(predictionsAndLabels)
cache();
spark-shell --master yarn-client
spark-shell --master local[2]
where N is the number of threads to run.
you should run the :help command to list available commands in the shell. :history and :h? :paste can help you correctly insert code from the clipboard.
sc.[\t]
One of the simplest ways to create an RDD is to use the parallelize method on SparkContext with a local collection of objects:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
Variables that are prefaced with val are immutable, and may not be changed to refer to another value once they are assigned, while variables that are prefaced with var may be changed to refer to different objects of the same type.
we are allowed to re-declare the same immutable variable.
rawblocks.first
we can use the collect method to return all of the contents of an RDD to the client as an array.
val head = rawblocks.take(10)
rdd.saveAsTextFile("hdfs:///user/ds/mynumbers")
head.foreach(println)
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
head.filter(isHeader).foreach(println)
head.filterNot(isHeader).length
head.filter(x => !isHeader(x)).length
Scala will allow us to use an underscore, _, to represent the argument to the anonymous function, so that we can save four characters:
head.filter(!isHeader(_)).length
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyCustomClass1], classOf[MyCustomClass2]))
spark-defaults.conf:
spark.kryo.classesToRegister=org.myorg.MyCustomClass1,org.myorg.MyCustomClass2
spark.serializer=org.apache.spark.serializer.KryoSerializer
implicit type conversion. Implicits work like this: if you call a method on a Scala object, and the Scala compiler does not see a definition for that method in the class definition for that object, then the compiler will try to convert your object to an instance of a class that does have that method defined.
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)
Creating Histograms
countByValue that performs this kind of computation very efficiently and returns the results to the client as a Map[T,Long].
val matchCounts = parsed.map(md => md.matched).countByValue()
Scala’s Map class does not have methods for sorting its contents on the keys or the values, we can convert a Map into a Scala Seq type, which does provide support for sorting.
val matchCountsSeq = matchCounts.toSeq
matchCountsSeq.sortBy(_._1).foreach(println)
matchCountsSeq.sortBy(_._2).reverse.foreach(println)
Summary Statistics For Continuous Variables
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
val stats = (0 until 9).map(i => {
parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
statsm.zip(statsn).map { case(m, n) =>
(m.missing + n.missing, m.stats.mean - n.stats.mean)
}.foreach(println)
Alternating Least Squares or ALS
These algorithms are types of latent-factor models. They try to explain observed interactions between large numbers of users and products through a relatively small number of unobserved, underlying reasons.
These algorithms are sometimes called matrix completion algorithms, because the original matrix A may be quite sparse, but the approximate product is completely dense. This is called a matrix factorization model, and is a model in the sense that it is predicting all of those missing values in sparse A.
As a matter of implementation, the algorithm repeatedly refines the contents of one matrix by solving a least-squares minimization problem that holds the other one fixed. Then it alternates, to refine the other matrix, and repeats iteratively. This is the source of its name. To begin the process, one matrix is filled with random feature vectors.
bin\spark-shell --driver-memory 2g --executor-memory 8g
The first step in building a model is to understand the data that is available, and parse or transform it into forms that are useful for analysis in Spark.
You can supply second argument to this method to specify a different and larger number of partitions. This might be set to match the number of cores in your cluster for example.
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
flatMap() is appropriate when each element maps to zero, one or more results. This allows the parsing to use Scala’s None and Some classes to optionally return a tuple, but not always:
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
Instead, a broadcast variable called artistAliasBC is created for artistAlias. This makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors, and many execute in parallel on each machine, this can save significant network traffic and memory.
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
model.userFeatures.mapValues(java.util.Arrays.toString).
take(1).foreach(println)
val existingProductIDs = rawUserArtistData.map(_.split(' ')).
filter(_(0).toInt == 2093760).map(_(1).toInt).collect().toSet
artistByID.filter(idName =>
existingProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
val recommendedProductIDs = recommendations.map(_.product).toSet
artistByID.filter(idName =>
recommendedProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
Typically, data is divided into three subsets: training, cross-validation (CV), and test sets.
regression refers to predicting a numeric quantity like size or income or temperature, while classification refers to predicting a label or category, like “spam” or “picture of a cat”.
These five features together, in order, are known as a feature vector.
The Spark MLlib abstraction for a feature vector is known as a LabeledPoint. LabeledPoint is only for numeric features. It can be used with categorical features, with appropriate encoding.
One such encoding is one-hot or 1-of-n encoding, in which one categorical feature that takes on N distinct values becomes N numeric features, each taking on the value 0 or 1.
val trainAndCVAndTestData = data.randomSplit(Array(0.8, 0.1, 0.1))
val trainData = trainAndCVAndTestData(0).cache()
val cvData = trainAndCVAndTestData(1).cache()
val testData = trainAndCVAndTestData(2).cache()
val model = DecisionTree.trainClassifier(
trainData, 7, Map[Int,Int](), "gini", 4, 100)
val predictionsAndLabels = cvData.map(example =>
(model.predict(example.features), example.label)
)
val metrics = new MulticlassMetrics(predictionsAndLabels)