运行环境
集群环境:CDH5.3.0
具体JAR版本如下:
spark版本:1.2.0-cdh5.3.0
hive版本:0.13.1-cdh5.3.0
hadoop版本:2.5.0-cdh5.3.0
spark sql的JAVA版简单示例
spark sql直接查询JSON格式的数据
spark sql的自定义函数
spark sql查询hive上面的表
import java.util.ArrayList;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.api.java.DataType;import org.apache.spark.sql.api.java.JavaSQLContext;import org.apache.spark.sql.api.java.JavaSchemaRDD;import org.apache.spark.sql.api.java.Row;import org.apache.spark.sql.api.java.UDF1;import org.apache.spark.sql.hive.api.java.JavaHiveContext;/** * 注意: * 使用JavaHiveContext时 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml * 2:需要增加postgresql或mysql驱动包的依赖 * 3:需要增加hive-jdbc,hive-exec的依赖 * */public class SimpleDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaHiveContext hiveCtx = new JavaHiveContext(sc);// testQueryJson(sqlCtx);// testUDF(sc, sqlCtx); testHive(hiveCtx); sc.stop(); sc.close(); } //测试spark sql直接查询JSON格式的数据 public static void testQueryJson(JavaSQLContext sqlCtx) { JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt"); rdd.printSchema(); // Register the input schema RDD rdd.registerTempTable("account"); JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10"); Listresult = accs.collect(); for (Row row : result) { System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + "," + row.getString(3)); } JavaRDD
names = accs.map(new Function () { @Override public String call(Row row) throws Exception { return row.getString(3); } }); System.out.println(names.collect()); } //测试spark sql的自定义函数 public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) { // Create a account and turn it into a Schema RDD ArrayList
accList = new ArrayList (); accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe")); JavaRDD accRDD = sc.parallelize(accList); JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class); rdd.registerTempTable("acc"); // 编写自定义函数UDF sqlCtx.registerFunction("strlength", new UDF1 () { @Override public Integer call(String str) throws Exception { return str.length(); } }, DataType.IntegerType); // 数据查询 List result = sqlCtx.sql("SELECT strlength('name'),name,address FROM acc LIMIT 10").collect(); for (Row row : result) { System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2)); } } //测试spark sql查询hive上面的表 public static void testHive(JavaHiveContext hiveCtx) { List
result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect(); for (Row row : result) { System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2)); } }}