At its core, Spark is a “computational engine” that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines, or a computing cluster.
you can create a file in the
log4j.rootCategory
you can create a file in the
conf directory called log4j.properties. The Spark developers already include a template for this file called log4j.properties.template log4j.rootCategory
=WARN, console
In Spark we express our computation through operations on distributed collections that are automatically parallelized across the cluster. These collections are called Resilient Distributed Datasets, or RDDs.
vallines=sc.textFile("README.md")
lines.count()
lines.first()Driver programs access Spark through aSparkContextobject, which represents a connection to a computing cluster. In the shell, a SparkContext is automatically created for you, as the variable calledsc.Once you have a SparkContext, you can use it to build resilient distributed datasets, or RDDs.valpythonLines=lines.filter(line=>line.contains("Python"))JavaRDD<String>pythonLines=lines.filter(newFunction<String,Boolean>(){Booleancall(Stringline){returnline.contains("Python");}});JavaRDD<String>pythonLines=lines.filter(line->line.contains("Python"));SparkConf conf = new SparkConf().setMaster("local").setAppName("My App"); JavaSparkContext sc = new JavaSparkContext(conf);val conf = new SparkConf().setMaster("local").setAppName("My App") val sc = new SparkContext(conf)libraryDependencies++=Seq("org.apache.spark"%%"spark-core"%"1.1.0"%"provided")val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} counts.saveAsTextFile(outputFile)JavaRDD<String>input=sc.textFile(inputFile);// Split up into words.JavaRDD<String>words=input.flatMap(newFlatMapFunction<String,String>(){publicIterable<String>call(Stringx){returnArrays.asList(x.split(" "));}});// Transform into word and count.JavaPairRDD<String,Integer>counts=words.mapToPair(newPairFunction<String,String,Integer>(){publicTuple2<String,Integer>call(Stringx){returnnewTuple2(x,1);}}).reduceByKey(newFunction2<Integer,Integer,Integer>(){publicIntegercall(Integerx,Integery){returnx+y;}});Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java or Scala objects, including user-defined classes.Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).Although you can define new RDDs any time, Spark only computes them in a lazy fashion, the first time they are used in an action.Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it usingRDD.persist().In practice, you will often usepersistto load a subset of your data into memory and query it repeatedly.pythonLines.persist(StorageLevel.MEMORY_ONLY_SER)cache()is the same as callingpersist()with the default storage level.val lines = sc.parallelize(List("pandas", "i like pandas"))JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));JavaRDD<String>lines=sc.textFile("/path/to/README.md");If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs whereas actions return some other data type.as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called thelineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.take(), which collects a number of elements from the RDDbadLinesRDD.take(10).foreach(println)RDDs also have acollect()function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to usecollect()on it, socollect()shouldn’t be used on large datasets.defgetMatchesNoReference(rdd:RDD[String]):RDD[String]={// Safe: extract just the field we need into a local variablevalquery_=this.queryrdd.map(x=>x.split(query_))}RDD<String>errors=lines.filter(s->s.contains("error"));valresult=input.map(x=>x*x)println(result.collect().mkString(","))
No comments:
Post a Comment