Tuesday, December 23, 2014
Tuesday, November 18, 2014
Learning Spark MLLib
Machine learning (ML) is the term that refers to learning patterns in the data. In other words, ML can infer the pattern or nontrivial relationship between a set of observations and a desired response.
Random Forest
A simple solution to learn this pattern is to construct a decision-tree representation of the data. Nodes of the decision tree are attributes, and branches are possible attribute values. The leaf nodes comprise a classification.
A generalized form is known as the random forest, which is a forest of trees. A forest can be built as a set of multiple decision trees and avoid pruning for generalization. The final outcome of the RF algorithm is the mean value in case of regression or the class with maximum votes for classification. The RF algorithm also has greater accuracy across domains compared to the decision tree.
Classic examples of unsupervised learning are clustering and dimensionality reduction. Common clustering algorithms include k-means (based on centroid models), a mixture of Gaussians, hierarchical clustering (based on connectivity models), and an expectation maximization algorithm (which uses a multivariate normal distribution model). The various dimensionality reduction techniques include the factor analysis, principal component analysis (PCA), independent component analysis (ICA), and so forth. The Hidden Markov Models (HMMs) have been a useful approach for unsupervised learning for time-series data
Logistic Regression Algorithm in Spark
Random Forest
A simple solution to learn this pattern is to construct a decision-tree representation of the data. Nodes of the decision tree are attributes, and branches are possible attribute values. The leaf nodes comprise a classification.
A generalized form is known as the random forest, which is a forest of trees. A forest can be built as a set of multiple decision trees and avoid pruning for generalization. The final outcome of the RF algorithm is the mean value in case of regression or the class with maximum votes for classification. The RF algorithm also has greater accuracy across domains compared to the decision tree.
Classic examples of unsupervised learning are clustering and dimensionality reduction. Common clustering algorithms include k-means (based on centroid models), a mixture of Gaussians, hierarchical clustering (based on connectivity models), and an expectation maximization algorithm (which uses a multivariate normal distribution model). The various dimensionality reduction techniques include the factor analysis, principal component analysis (PCA), independent component analysis (ICA), and so forth. The Hidden Markov Models (HMMs) have been a useful approach for unsupervised learning for time-series data
Logistic Regression Algorithm in Spark
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).
cache();
spark-shell --master yarn-client
spark-shell --master local[2]
where N is the number of threads to run.
you should run the :help command to list available commands in the shell. :history and :h? :paste can help you correctly insert code from the clipboard.
sc.[\t]
One of the simplest ways to create an RDD is to use the parallelize method on SparkContext with a local collection of objects:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
Variables that are prefaced with val are immutable, and may not be changed to refer to another value once they are assigned, while variables that are prefaced with var may be changed to refer to different objects of the same type.
we are allowed to re-declare the same immutable variable.
rawblocks.first
we can use the collect method to return all of the contents of an RDD to the client as an array.
val head = rawblocks.take(10)
rdd.saveAsTextFile("hdfs:///user/ds/mynumbers")
head.foreach(println)
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
head.filter(isHeader).foreach(println)
head.filterNot(isHeader).length
head.filter(x => !isHeader(x)).length
Scala will allow us to use an underscore, _, to represent the argument to the anonymous function, so that we can save four characters:
head.filter(!isHeader(_)).length
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyCustomClass1], classOf[MyCustomClass2]))
spark-defaults.conf:
spark.kryo.classesToRegister=org.myorg.MyCustomClass1,org.myorg.MyCustomClass2
spark.serializer=org.apache.spark.serializer.KryoSerializer
implicit type conversion. Implicits work like this: if you call a method on a Scala object, and the Scala compiler does not see a definition for that method in the class definition for that object, then the compiler will try to convert your object to an instance of a class that does have that method defined.
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)
Creating Histograms
countByValue that performs this kind of computation very efficiently and returns the results to the client as a Map[T,Long].
val matchCounts = parsed.map(md => md.matched).countByValue()
Scala’s Map class does not have methods for sorting its contents on the keys or the values, we can convert a Map into a Scala Seq type, which does provide support for sorting.
val matchCountsSeq = matchCounts.toSeq
matchCountsSeq.sortBy(_._1).foreach(println)
matchCountsSeq.sortBy(_._2).reverse.foreach(println)
Summary Statistics For Continuous Variables
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
val stats = (0 until 9).map(i => {
parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
statsm.zip(statsn).map { case(m, n) =>
(m.missing + n.missing, m.stats.mean - n.stats.mean)
}.foreach(println)
Alternating Least Squares or ALS
These algorithms are types of latent-factor models. They try to explain observed interactions between large numbers of users and products through a relatively small number of unobserved, underlying reasons.
These algorithms are sometimes called matrix completion algorithms, because the original matrix A may be quite sparse, but the approximate product is completely dense. This is called a matrix factorization model, and is a model in the sense that it is predicting all of those missing values in sparse A.
As a matter of implementation, the algorithm repeatedly refines the contents of one matrix by solving a least-squares minimization problem that holds the other one fixed. Then it alternates, to refine the other matrix, and repeats iteratively. This is the source of its name. To begin the process, one matrix is filled with random feature vectors.
bin\spark-shell --driver-memory 2g --executor-memory 8g
The first step in building a model is to understand the data that is available, and parse or transform it into forms that are useful for analysis in Spark.
You can supply second argument to this method to specify a different and larger number of partitions. This might be set to match the number of cores in your cluster for example.
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
flatMap() is appropriate when each element maps to zero, one or more results. This allows the parsing to use Scala’s None and Some classes to optionally return a tuple, but not always:
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
Instead, a broadcast variable called artistAliasBC is created for artistAlias. This makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors, and many execute in parallel on each machine, this can save significant network traffic and memory.
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
model.userFeatures.mapValues(java.util.Arrays.toString).
take(1).foreach(println)
val existingProductIDs = rawUserArtistData.map(_.split(' ')).
filter(_(0).toInt == 2093760).map(_(1).toInt).collect().toSet
artistByID.filter(idName =>
existingProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
val recommendedProductIDs = recommendations.map(_.product).toSet
artistByID.filter(idName =>
recommendedProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
Typically, data is divided into three subsets: training, cross-validation (CV), and test sets.
regression refers to predicting a numeric quantity like size or income or temperature, while classification refers to predicting a label or category, like “spam” or “picture of a cat”.
These five features together, in order, are known as a feature vector.
The Spark MLlib abstraction for a feature vector is known as a LabeledPoint. LabeledPoint is only for numeric features. It can be used with categorical features, with appropriate encoding.
One such encoding is one-hot or 1-of-n encoding, in which one categorical feature that takes on N distinct values becomes N numeric features, each taking on the value 0 or 1.
val trainAndCVAndTestData = data.randomSplit(Array(0.8, 0.1, 0.1))
val trainData = trainAndCVAndTestData(0).cache()
val cvData = trainAndCVAndTestData(1).cache()
val testData = trainAndCVAndTestData(2).cache()
val model = DecisionTree.trainClassifier(
trainData, 7, Map[Int,Int](), "gini", 4, 100)
val predictionsAndLabels = cvData.map(example =>
(model.predict(example.features), example.label)
)
val metrics = new MulticlassMetrics(predictionsAndLabels)
cache();
spark-shell --master yarn-client
spark-shell --master local[2]
where N is the number of threads to run.
you should run the :help command to list available commands in the shell. :history and :h? :paste can help you correctly insert code from the clipboard.
sc.[\t]
One of the simplest ways to create an RDD is to use the parallelize method on SparkContext with a local collection of objects:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
Variables that are prefaced with val are immutable, and may not be changed to refer to another value once they are assigned, while variables that are prefaced with var may be changed to refer to different objects of the same type.
we are allowed to re-declare the same immutable variable.
rawblocks.first
we can use the collect method to return all of the contents of an RDD to the client as an array.
val head = rawblocks.take(10)
rdd.saveAsTextFile("hdfs:///user/ds/mynumbers")
head.foreach(println)
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
head.filter(isHeader).foreach(println)
head.filterNot(isHeader).length
head.filter(x => !isHeader(x)).length
Scala will allow us to use an underscore, _, to represent the argument to the anonymous function, so that we can save four characters:
head.filter(!isHeader(_)).length
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyCustomClass1], classOf[MyCustomClass2]))
spark-defaults.conf:
spark.kryo.classesToRegister=org.myorg.MyCustomClass1,org.myorg.MyCustomClass2
spark.serializer=org.apache.spark.serializer.KryoSerializer
implicit type conversion. Implicits work like this: if you call a method on a Scala object, and the Scala compiler does not see a definition for that method in the class definition for that object, then the compiler will try to convert your object to an instance of a class that does have that method defined.
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)
Creating Histograms
countByValue that performs this kind of computation very efficiently and returns the results to the client as a Map[T,Long].
val matchCounts = parsed.map(md => md.matched).countByValue()
Scala’s Map class does not have methods for sorting its contents on the keys or the values, we can convert a Map into a Scala Seq type, which does provide support for sorting.
val matchCountsSeq = matchCounts.toSeq
matchCountsSeq.sortBy(_._1).foreach(println)
matchCountsSeq.sortBy(_._2).reverse.foreach(println)
Summary Statistics For Continuous Variables
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
val stats = (0 until 9).map(i => {
parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
statsm.zip(statsn).map { case(m, n) =>
(m.missing + n.missing, m.stats.mean - n.stats.mean)
}.foreach(println)
Alternating Least Squares or ALS
These algorithms are types of latent-factor models. They try to explain observed interactions between large numbers of users and products through a relatively small number of unobserved, underlying reasons.
These algorithms are sometimes called matrix completion algorithms, because the original matrix A may be quite sparse, but the approximate product is completely dense. This is called a matrix factorization model, and is a model in the sense that it is predicting all of those missing values in sparse A.
As a matter of implementation, the algorithm repeatedly refines the contents of one matrix by solving a least-squares minimization problem that holds the other one fixed. Then it alternates, to refine the other matrix, and repeats iteratively. This is the source of its name. To begin the process, one matrix is filled with random feature vectors.
bin\spark-shell --driver-memory 2g --executor-memory 8g
The first step in building a model is to understand the data that is available, and parse or transform it into forms that are useful for analysis in Spark.
You can supply second argument to this method to specify a different and larger number of partitions. This might be set to match the number of cores in your cluster for example.
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
flatMap() is appropriate when each element maps to zero, one or more results. This allows the parsing to use Scala’s None and Some classes to optionally return a tuple, but not always:
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
Instead, a broadcast variable called artistAliasBC is created for artistAlias. This makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors, and many execute in parallel on each machine, this can save significant network traffic and memory.
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
model.userFeatures.mapValues(java.util.Arrays.toString).
take(1).foreach(println)
val existingProductIDs = rawUserArtistData.map(_.split(' ')).
filter(_(0).toInt == 2093760).map(_(1).toInt).collect().toSet
artistByID.filter(idName =>
existingProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
val recommendedProductIDs = recommendations.map(_.product).toSet
artistByID.filter(idName =>
recommendedProductIDs.contains(idName._1)
).values.collect().sorted.foreach(println)
Typically, data is divided into three subsets: training, cross-validation (CV), and test sets.
regression refers to predicting a numeric quantity like size or income or temperature, while classification refers to predicting a label or category, like “spam” or “picture of a cat”.
These five features together, in order, are known as a feature vector.
The Spark MLlib abstraction for a feature vector is known as a LabeledPoint. LabeledPoint is only for numeric features. It can be used with categorical features, with appropriate encoding.
One such encoding is one-hot or 1-of-n encoding, in which one categorical feature that takes on N distinct values becomes N numeric features, each taking on the value 0 or 1.
val trainAndCVAndTestData = data.randomSplit(Array(0.8, 0.1, 0.1))
val trainData = trainAndCVAndTestData(0).cache()
val cvData = trainAndCVAndTestData(1).cache()
val testData = trainAndCVAndTestData(2).cache()
val model = DecisionTree.trainClassifier(
trainData, 7, Map[Int,Int](), "gini", 4, 100)
val predictionsAndLabels = cvData.map(example =>
(model.predict(example.features), example.label)
)
val metrics = new MulticlassMetrics(predictionsAndLabels)
Learning Spark
At its core, Spark is a “computational engine” that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines, or a computing cluster.
you can create a file in the
log4j.rootCategory
you can create a file in the
conf
directory called log4j.properties
. The Spark developers already include a template for this file called log4j.properties.template
log4j.rootCategory
=
WARN, console
In Spark we express our computation through operations on distributed collections that are automatically parallelized across the cluster. These collections are called Resilient Distributed Datasets, or RDDs.
val
lines
=
sc
.
textFile
(
"README.md"
)
lines
.
count
()
lines
.
first
()
Driver programs access Spark through a
SparkContext
object, which represents a connection to a computing cluster. In the shell, a SparkContext is automatically created for you, as the variable calledsc
.
Once you have a SparkContext, you can use it to build resilient distributed datasets, or RDDs.
val
pythonLines
=
lines
.
filter
(
line
=>
line
.
contains
(
"Python"
))
JavaRDD
<
String
>
pythonLines
=
lines
.
filter
(
new
Function
<
String
,
Boolean
>()
{
Boolean
call
(
String
line
)
{
return
line
.
contains
(
"Python"
);
}
}
);
JavaRDD
<
String
>
pythonLines
=
lines
.
filter
(
line
->
line
.
contains
(
"Python"
));
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App"); JavaSparkContext sc = new JavaSparkContext(conf);
val conf = new SparkConf().setMaster("local").setAppName("My App") val sc = new SparkContext(conf)
libraryDependencies
++=
Seq
(
"org.apache.spark"
%%
"spark-core"
%
"1.1.0"
%
"provided"
)
val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} counts.saveAsTextFile(outputFile)
JavaRDD
<
String
>
input
=
sc
.
textFile
(
inputFile
);
// Split up into words.
JavaRDD
<
String
>
words
=
input
.
flatMap
(
new
FlatMapFunction
<
String
,
String
>()
{
public
Iterable
<
String
>
call
(
String
x
)
{
return
Arrays
.
asList
(
x
.
split
(
" "
));
}});
// Transform into word and count.
JavaPairRDD
<
String
,
Integer
>
counts
=
words
.
mapToPair
(
new
PairFunction
<
String
,
String
,
Integer
>(){
public
Tuple2
<
String
,
Integer
>
call
(
String
x
){
return
new
Tuple2
(
x
,
1
);
}}).
reduceByKey
(
new
Function2
<
Integer
,
Integer
,
Integer
>(){
public
Integer
call
(
Integer
x
,
Integer
y
){
return
x
+
y
;}});
Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.
An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java or Scala objects, including user-defined classes.
Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).
Although you can define new RDDs any time, Spark only computes them in a lazy fashion, the first time they are used in an action.
Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it usingRDD.persist()
.
In practice, you will often use
persist
to load a subset of your data into memory and query it repeatedly.pythonLines.persist(StorageLevel.MEMORY_ONLY_SER)
cache()
is the same as callingpersist()
with the default storage level.val lines = sc.parallelize(List("pandas", "i like pandas"))
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
JavaRDD
<
String
>
lines
=
sc
.
textFile
(
"/path/to/README.md"
);
If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs whereas actions return some other data type.
as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called thelineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.
take()
, which collects a number of elements from the RDDbadLinesRDD
.
take
(
10
).
foreach
(
println
)
RDDs also have a
collect()
function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to usecollect()
on it, socollect()
shouldn’t be used on large datasets.def
getMatchesNoReference
(
rdd
:
RDD
[
String
])
:
RDD
[
String
]
=
{
// Safe: extract just the field we need into a local variable
val
query_
=
this
.
query
rdd
.
map
(
x
=>
x
.
split
(
query_
))
}
RDD
<
String
>
errors
=
lines
.
filter
(
s
->
s
.
contains
(
"error"
));
val
result
=
input
.
map
(
x
=>
x
*
x
)
println
(
result
.
collect
().
mkString
(
","
))
Subscribe to:
Posts (Atom)