spark 算子操作 cogroup java版

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class CogroupOps {

    public static void main(String[] args) {
        /*
          * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序
          * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差
          */
        SparkConf conf = new SparkConf().setAppName("java CogroupOps").setMaster("local");


         /*
         * 创建SparkContext对象
         * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等
         * 同时还会负则Spark程序往Master注册程序等
         * SparkContext是整个Spark应用程序中最为至关重要的一个对象
         */
        JavaSparkContext sc = new JavaSparkContext(conf);



        List<Tuple2<Integer,String>> nameList = Arrays.asList(
                new Tuple2<Integer, String>(1,"a"),
                new Tuple2<Integer, String>(2,"b")
        );

        List<Tuple2<Integer,Integer>> scoreList = Arrays.asList(
                new Tuple2<Integer, Integer>(1,100),
                new Tuple2<Integer, Integer>(2,200),
                new Tuple2<Integer, Integer>(3,300),
                new Tuple2<Integer, Integer>(2,400),
                new Tuple2<Integer, Integer>(1,500)
        );

        JavaPairRDD<Integer,String> names = sc.parallelizePairs(nameList);
        JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(scoreList);

        JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>  nameScores = names.cogroup(scores);
        nameScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
            public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2);
                System.out.println(integerTuple2Tuple2._1() +":"+ integerTuple2Tuple2._2());
                System.out.println("========================");

            }
        });


       // sc.parallelizePairs(nameList).cogroup(sco);


        sc.close();
    }

}