站点图标 起风网

Spark第一个应用程序

首先要对源码进行编译,生成对应hadoop版本的spark开发程序jar包,上篇已经写了具体的过程,这里不再赘述。

在安装spark的机器上,下载eclipse-java-x86_64版本,将spark-assembly.jar和spark/lib下全部加进路径,建立普通java project

WordCount代码

package sparktest.util.test;import java.util.Arrays;import java.util.regex.Pattern;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public final class JavaWordCount {  private static final Pattern SPACE = Pattern.compile(" ");  public static void main(String[] args) throws Exception {    if (args.length < 2) {      System.err.println("Usage: JavaWordCount <master> <file>");      System.exit(1);    }    JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",        System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));    ctx.addJar("/opt/eclipse/JavaSparkT.jar");    JavaRDD<String> lines = ctx.textFile(args[1], 1);        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {      @Override      public Iterable<String> call(String s) {        return Arrays.asList(SPACE.split(s));      }    });        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {      @Override      public Tuple2<String, Integer> call(String s) {        return new Tuple2<String, Integer>(s, 1);      }    });        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {      @Override      public Integer call(Integer i1, Integer i2) {        return i1 + i2;      }    });    counts.saveAsTextFile(args[2]); //    counts.s    /*List<Tuple2<String, Integer>> output = counts.collect();    for (Tuple2<?,?> tuple : output) {      System.out.println(tuple._1() + ": " + tuple._2());    }*/    System.exit(0);  }}

 

输入的三个参数:

spark://master:7077 /usr/local/hadoop/hadoop-2.4.0/test.txt /usr/local/hadoop/hadoop-2.4.0/result.txt

分别spark入口 本地输入文件 本地输出文件

输出:

[root@localhost result.txt]# cat part-00000 (xing,2)(xiao,1)(ya,2)(shi,1)(,2)(wo,1)(yi,4)(zhi,1)

 

注意: ctx.addJar(“/opt/eclipse/JavaSparkT.jar”);  这一句很关键,要不然会报错

问题:

14/07/07 10:26:11 WARN TaskSetManager: Lost TID 0 (task 1.0:0)
14/07/07 10:26:11 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: JavaWordCount$1
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

解决办法:

spark在运行的时候需要将类分发到各个节点上去,然后在各个节点调用这个类去完成任务,在java程序里边需要将这个类本身提交到各个节点上去

可以通过java普通的方法调用的方式实现或者 在eclipse里边通过右键run as java application的方式运行。不过需要在程序中用addJar()方法或者在初始化SparkContext的时候将本身的WordCount 的Jar包增加进去。

 

Spark类型:

Supported types for Avro -> SparkSQL conversion

avro types to SparkSQL types:

boolean -> BooleanTypeint -> IntegerTypelong -> LongTypefloat -> FloatTypedouble -> DoubleTypebytes -> BinaryTypestring -> StringTyperecord -> StructTypeenum -> StringTypearray -> ArrayTypemap -> MapTypefixed -> BinaryType

Spark Load/Save 方式

 1.3.0统一了load/save API,让用户按需自由选择外部数据源。这套API包括:

  1.SQLContext.table

  从SQL表中加载DataFrame。

  2.SQLContext.load

  从指定的外部数据源加载DataFrame。

  3.SQLContext.createExternalTable

  将指定位置的数据保存为外部SQL表,元信息存入Hive metastore,并返回包含相应数据的DataFrame。

  4.DataFrame.save

  将DataFrame写入指定的外部数据源。

  5.DataFrame.saveAsTable

  将DataFrame保存为SQL表,元信息存入Hive metastore,同时将数据写入指定位置。

 

Spark与Hive的兼容特性

 

文章转载于:https://www.cnblogs.com/kxdblog/p/4503525.html

原著是一个有趣的人,若有侵权,请通知删除

退出移动版