spark 算子操作 cogroup java版
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class CogroupOps {
public static void main(String[] args) {
/*
* 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序
* 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差
*/
SparkConf conf = new SparkConf().setAppName("java CogroupOps").setMaster("local");
/*
* 创建SparkContext对象
* SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等
* 同时还会负则Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer,String>> nameList = Arrays.asList(
new Tuple2<Integer, String>(1,"a"),
new Tuple2<Integer, String>(2,"b")
);
List<Tuple2<Integer,Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1,100),
new Tuple2<Integer, Integer>(2,200),
new Tuple2<Integer, Integer>(3,300),
new Tuple2<Integer, Integer>(2,400),
new Tuple2<Integer, Integer>(1,500)
);
JavaPairRDD<Integer,String> names = sc.parallelizePairs(nameList);
JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> nameScores = names.cogroup(scores);
nameScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> integerTuple2Tuple2) throws Exception {
System.out.println(integerTuple2Tuple2);
System.out.println(integerTuple2Tuple2._1() +":"+ integerTuple2Tuple2._2());
System.out.println("========================");
}
});
// sc.parallelizePairs(nameList).cogroup(sco);
sc.close();
}
}