SparkSQL的程序流程及操作 有更新!

SparkSQL程序运行流程

DataSource数据源操作

Sparksql提供了丰富的数据源操作,可以方便的导入各种各样的数据包括:json,text,orc,parquet,csv,jdbc,hive,libsvm
///\构建sparksession

SparkConf sconf = new SparkConf().setAppName("sparktest").setMaster("local[*]");
SparkSession spark = SparkSession.builder().config(sconf).getOrCreate();

1、 默认数据源:parquet,直接采用load()加载,相关配置

Dataset parquet_users = spark.read().load("data/resources/users.parquet");

2、 指定格式json,orc,text,parquet,csv加载-保存:

Spark SQL可以自动的识别json数据集的schema并且加载为DataFrame。通过SQLContext.read.json()来转换字符串类型RDD或json文件。这里的json不是传统的json文件内容,每一行要包含分隔,独立有效json对象。因此常规的多行json文件通常会失败。

可以使用toDF("列名1","列名2","列名3"...)为导入的数据指定列名,否则会使用默认列名,列的个数和列名的个数必须相同

//json
spark.read().json("data/resources/people.json") //加载
  .write().json("path");                        //保存

//csv
  spark.read().csv("")
         .write().csv("path");

 //parquet
  spark.read().parquet("")
         .write().parquet("path");

 //orc
  spark.read().orc("")
         .write().orc("path");

 //text
  spark.read().text("data/resources/people.txt")
         .write().text("path");

3、 JDBC数据源加载-保存

//jdbc_1:第一种加载保存方式
Dataset jdbcDF = spark.read()
        .format("jdbc")
        .option("url", "jdbc:mysql://ip:3306/testDB")
        .option("driver", "com.mysql.jdbc.driver")
        .option("dbtable", "tablename")
        .option("user", "username")
        .option("password", "password")
        .load();
jdbcDF.select("column")
        .as(Encoders.STRING())
        .write()
        .mode(SaveMode.Overwrite)
        .format("jdbc")
        .option("url", "jdbc:mysql://ip:3306/testDB")
        .option("driver", "com.mysql.jdbc.driver")
        .option("dbtable", "tablename")
        .option("user", "username")
        .option("password", "password")
        .save();
//jdbc_2:第二种加载保存方式
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");

Dataset jdbcDF2 = spark.read()
        .jdbc("jdbc:mysql://ip:3306/testDB", "tablename", connectionProperties);
jdbcDF2.select("column")
        .sort()
        .as(Encoders.STRING())
        .write()
        .jdbc("jdbc:mysql://ip:3306/testDB", "tablename", connectionProperties);

4、 Hive表数据加载:

.config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport()开启以上两个配置就可以操作Hive表了,(前提是,sqpark-sql命令行能够操作hive表,必须将hive的hive-site.xml配置文件放到spark的conf目录下)

String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
        .builder()
        .appName("Java Spark Hive Example")
        .config("spark.sql.warehouse.dir", warehouseLocation)
        .enableHiveSupport()
        .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'resources/kv1.txt' INTO TABLE src");
//可以直接查询数据,过滤,聚合使用sql中的聚合函数
//也可以将数据查询出来转换成DS/DF,使用DSL,

SparkSQL 数据操作:

SparkSQl中提供了两种操作数据的方式,SQL和DSL,SQL操作大家很熟悉,也很方便,提供DSL是为了弥补SQL操作功能或者性能上的不足,提供了更加强大操作组合,但是他们的底层都是一样的。

一、DSL操作数据

//people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

1、加载数据

SparkConf conf =  new SparkConf().setMaster("local[*]").setAppName("sqlVsDsl");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//加载数据构建DF
Dataset  peoplesDF = spark.read().format("json").load("data/resources/people.json");

2、输出Schema

peoplesDF.printSchema();
//自动识别了Json格式数据的Schema
root
 |-- age: long (nullable = true) 
 |-- name: string (nullable = true) 

3、查询某列:select

peoplesDF.select("name","age").show();
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
|Michael|null|
|   Andy|  30|
| Justin|  19|

4、条件过滤:filter

使用col()要导入:import static org.apache.spark.sql.functions.col;

peoplesDF.filter("age is NULL OR age>20").show(); //
/*
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|null|Michael|
|  30|   Andy|
+----+-------+
*/
peoplesDF.filter("age <20").show(); //peoplesDF.col("age").$less(20)
+---+------+
|age|  name|
+---+------+
| 19|Justin|
| 19|Justin|
+---+------+
//可以看出比较大小,null值并不参与
peoplesDF.filter(peoplesDF.col("age").$eq$eq$eq(30)).show();
+---+----+
|age|name|
+---+----+
| 30|Andy|
| 30|Andy|
+---+----+

5、去重复项:distinct

peoplesDF.distinct().show();
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

6、按列排序:orderby/sort

peoplesDF.orderBy("age").show();
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|null|Michael|
|  19| Justin|
|  19| Justin|
|  30|   Andy|
|  30|   Andy|
+----+-------+
peoplesDF.sort("name","age").show();
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  30|   Andy|
|  19| Justin|
|  19| Justin|
|null|Michael|
|null|Michael|
+----+-------+
//指定降序col().desc(),升序col().asc()默认升序
peoplesDF.sort(col("age").desc())

7、分组聚合:groupBy().sum/max/min/avg/count..

//按name分组,求每组的age之和
peoplesDF.groupBy("name").sum("age").show();
+-------+--------+
|   name|sum(age)|
+-------+--------+
|Michael|    null|
|   Andy|      60|
| Justin|      38|
+-------+--------+

7、分组聚合:groupBy().agg()

peoplesDF.groupBy("name").agg(col("name"),max("age"),sum("age")).show();
+-------+-------+--------+--------+
|   name|   name|max(age)|sum(age)|
+-------+-------+--------+--------+
|Michael|Michael|    null|    null|
|   Andy|   Andy|      30|      60|
| Justin| Justin|      19|      38|
+-------+-------+--------+--------+

8、对某列进行操作:col()

peoplesDF.select(peoplesDF.col("name"),peoplesDF.col("age").$plus(1)).show();
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
peoplesDF.select(peoplesDF.col("name"),peoplesDF.col("age").$less(20)).show();
+-------+----------+
|   name|(age < 20)|
+-------+----------+
|Michael|      null|
|   Andy|     false|
| Justin|      true|
|Michael|      null|
|   Andy|     false|
| Justin|      true|
+-------+----------+

9、连接Join:join()

72c0362d058b49e990efbc904016f1c3-image.png

Default inner. Must be one of:
* inner, cross, outer, full, full_outer, left, left_outer,right, right_outer, left_semi, left_anti.

peoplesDF.join(peoplesDF,"age").show()
+---+------+------+
|age|  name|  name|
+---+------+------+
| 30|  Andy|  Andy|
| 30|  Andy|  Andy|
| 19|Justin|Justin|
| 19|Justin|Justin|
| 30|  Andy|  Andy|
| 30|  Andy|  Andy|
| 19|Justin|Justin|
| 19|Justin|Justin|
+---+------+------+
peoplesDF.join(peoplesDF.limit(2),"age").show()
+---+----+----+
|age|name|name|
+---+----+----+
| 30|Andy|Andy|
| 30|Andy|Andy|
+---+----+----+

Dataset df2 = peoplesDF.limit(2);
peoplesDF.join(df2,peoplesDF.col("age").equalTo(df2.col("age")),"left_outer").show();
+----+-------+----+----+
| age|   name| age|name|
+----+-------+----+----+
|null|Michael|null|null|
|  30|   Andy|  30|Andy|
|  19| Justin|null|null|
|null|Michael|null|null|
|  30|   Andy|  30|Andy|
|  19| Justin|null|null|
+----+-------+----+----+

10、去除空值:na.drop()

//去除包含空值的列
peoplesDF.na().drop().show();
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
| 30|  Andy|
| 19|Justin|

二 、Sql操作数据

1、临时视图

//临时试图
peoplesDF.createOrReplaceTempView("peopleView");
spark.sql("SELECT * from peopleView t WHERE t.age>=19 ").show();

2、全局视图

//全局视图
peoplesDF.createGlobalTempView("peopleGlobalView");
spark.sql("SELECT * from global_temp.peopleGlobalView t WHERE t.age>=19 ").show();

3、临时表

//临时表
peoplesDF.registerTempTable("tables");
spark.sql("SELECT * from tables t WHERE t.age>=19 ").show();
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
| 30|  Andy|
| 19|Justin|
+---+------+

hive数据操作

在以前的API中有HiveContext这个接口可以开启对Hive数据库的操作,现在统一的接口Sparksession同样能够开启对Hive的操作,

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val2|  2| val2|
// |  2| val2|  2| val2|
// |  4| val4|  4| val4|
// ...

hive数据保存

  • partitionBy(clomun) 分区表,按列分区
  • bucketBy(number,clomun) 桶表
spark.sql("SELECT * from tables t WHERE t.age>=19 ")
        .write()
        .sortBy("age")
        .mode(SaveMode.Overwrite)
        .partitionBy("age")
        .bucketBy(10,"name")
        .saveAsTable("hivetable");

saveAsTable,将数据保存到Hive表中,sparksession中未配置spar-warehouse,系统默认在本地创建,保存数据如下:创建了spark-warehouse,按ag分区,默认使用了snappy压缩。存储为parquet格式。

f71e1b22926d4fcd9f08d2cd16e500ac-image.png

sparksql 参数配置

  • spark.sql.shuffle.partitions # Reduce task的数量,默认是200M
  • spark.sql.files.maxParitionBytes # 读数据每个Partition的大小 默认128M
  • spark.sql.files.openCostInBytes # 小文件合并读的大小 默认4M
  • spark.sql.autoBroadcastJoinThreshold # 广播小表大小 默认10M

通过部分参数的优化可提高性能。

默认开启:

  • spark.sql.inMemoryColumnarStorage.compressed 默认为true,自动为列选择压缩方式
  • spark.sql.inMemoryColumnarStorage.batchSize 默认10000,柱状缓存的批数据大小,增大可提高内存利用率和压缩比,但会有OOM风险

. - - —— ————THE END——— —— - - .

⚠求而不得,往往不求而得!
⚠此文章为原创作品,转载务必保留本文地址及原作者。

评论

发表评论

validate