Spark 读取mongo数据

Spark 读取mongo数据

1. 添加mongo的依赖

org.mongodb.spark
mongo-spark-connector_2.11
2.2.0

2. 通过新的SparkSession读取数据
[java]
package com.mongodb.spark_examples;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public final class GettingStarted {

public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using –conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();

// Create a JavaSparkContext using the SparkSession’s SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// More application logic would go here…

jsc.close();

}
}
[/java]

3.通过已经存在的SparkSession 读取数据
注意: MongoSpark从SparkContext中的conf读取mongo db的配置,所以添加新的配置必须更新到SparkSession的SparkContext,否则会报错,看了源代码才发现这个问题。
20180320233513
[java]
@Override
public Tuple2<Dataset<Row>, Schema> process(Tuple2<Dataset<Row>, Schema> data) throws ApplicationException, SystemRuntimeException {
SparkSession spark = this.getSparkSession();
getExtraOptions().forEach((k, v) -> {
spark.sparkContext().conf().set(k, v);
});

JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
Dataset<Row> readData = MongoSpark.load(sparkContext).toDF();
return new Tuple2<Dataset<Row>, Schema>(readData, SchemaUtils.buildSchema(readData.schema()));
}
[/java]

如果Mongo支持ssl,这也是一个坑。

发表评论

您的电子邮箱地址不会被公开。