Spark 读取mongo数据
1. 添加mongo的依赖
2. 通过新的SparkSession读取数据
[java]
package com.mongodb.spark_examples;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using –conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Create a JavaSparkContext using the SparkSession’s SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// More application logic would go here…
jsc.close();
}
}
[/java]
3.通过已经存在的SparkSession 读取数据
注意: MongoSpark从SparkContext中的conf读取mongo db的配置,所以添加新的配置必须更新到SparkSession的SparkContext,否则会报错,看了源代码才发现这个问题。
[java]
@Override
public Tuple2<Dataset<Row>, Schema> process(Tuple2<Dataset<Row>, Schema> data) throws ApplicationException, SystemRuntimeException {
SparkSession spark = this.getSparkSession();
getExtraOptions().forEach((k, v) -> {
spark.sparkContext().conf().set(k, v);
});
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
Dataset<Row> readData = MongoSpark.load(sparkContext).toDF();
return new Tuple2<Dataset<Row>, Schema>(readData, SchemaUtils.buildSchema(readData.schema()));
}
[/java]
如果Mongo支持ssl,这也是一个坑。