Tuesday, November 18, 2014

Learning Spark

At its core, Spark is a “computational engine” that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines, or a computing cluster.

you can create a file in the conf directory called log4j.properties. The Spark developers already include a template for this file called log4j.properties.template 
log4j.rootCategory=WARN, console


In Spark we express our computation through operations on distributed collections that are automatically parallelized across the cluster. These collections are called Resilient Distributed Datasets, or RDDs.

val lines = sc.textFile("README.md")
lines.count()
lines.first()

Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. In the shell, a SparkContext is automatically created for you, as the variable called sc

Once you have a SparkContext, you can use it to build resilient distributed datasets, or RDDs.
val pythonLines = lines.filter(line => line.contains("Python"))
JavaRDD<String> pythonLines = lines.filter(
  new Function<String, Boolean>() {
    Boolean call(String line) { return line.contains("Python"); }
  }
);
JavaRDD<String> pythonLines = lines.filter(line -> line.contains("Python"));
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc = new JavaSparkContext(conf);

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
)
val words = input.flatMap(line => line.split(" "))
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
counts.saveAsTextFile(outputFile)
JavaRDD<String> input = sc.textFile(inputFile);
// Split up into words.
JavaRDD<String> words = input.flatMap(
  new FlatMapFunction<String, String>() {
    public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }});
// Transform into word and count.
JavaPairRDD<String, Integer> counts = words.mapToPair(
  new PairFunction<String, String, Integer>(){
    public Tuple2<String, Integer> call(String x){
      return new Tuple2(x, 1);
    }}).reduceByKey(new Function2<Integer, Integer, Integer>(){
        public Integer call(Integer x, Integer y){ return x + y;}});

Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java or Scala objects, including user-defined classes.
Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).

Although you can define new RDDs any time, Spark only computes them in a lazy fashion, the first time they are used in an action.

Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist()

In practice, you will often use persist to load a subset of your data into memory and query it repeatedly. 
pythonLines.persist(StorageLevel.MEMORY_ONLY_SER)

cache() is the same as calling persist() with the default storage level.
val lines = sc.parallelize(List("pandas", "i like pandas"))
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
JavaRDD<String> lines = sc.textFile("/path/to/README.md");

If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs whereas actions return some other data type.
as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called thelineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost. 

take(), which collects a number of elements from the RDD

badLinesRDD.take(10).foreach(println)

RDDs also have a collect() function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

  def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
    // Safe: extract just the field we need into a local variable
    val query_ = this.query
    rdd.map(x => x.split(query_))
  }
RDD<String> errors = lines.filter(s -> s.contains("error"));
val result = input.map(x => x * x)
println(result.collect().mkString(","))

No comments:

Post a Comment