我现在正在试验Spark和Mongodb,它们使用mongodb连接器连接火花和mongodb通信。下面是一个https://github.com/plaa/mongo-spark的例子,这个例子对我来说很好。
然后在这个例子的基础上,我使用了来自https://github.com/10gen-interns/big-data-exploration的一个更大的数据集,该数据集有600万个飞行数据记录。我想做的是查询mongodb数据集,然后再做一些进一步的处理。
航班数据的架构在https://gist.github.com/sweetieSong/6016700中。
见数据示例:
{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
"age" : 27,
"airTime" : 316,
"airlineId" : 19805,
"arrDelay" : -37,
"arrTime" : Date( 1336304580000 ),
"carrier" : "AA",
"carrierId" : "AA",
"crsArrTime" : Date( 1336306800000 ),
"crsDepTime" : Date( 1336294800000 ),
"crsElapsedTime" : 380,
"date" : Date( 1336262400000 ),
"dayOfMonth" : 6,
"dayOfWeek" : 7,
"depDelay" : -5,
"depTime" : Date( 1336294500000 ),
"destAirport" : "LAX",
"destAirportId" : 12892,
"destCity" : "Los Angeles, CA",
"destCityId" : 32575,
"destState" : "California",
"destStateId" : "CA",
"destWAC" : 91,
"distance" : 2475,
"diverted" : true,
"elapsedTime" : 348,
"flightNum" : 1,
"month" : 5,
"numDivAirportLandings" : 0,
"numFlights" : 1,
"origAirport" : "JFK",
"origAirportId" : 12478,
"origCity" : "New York, NY",
"origCityId" : 31703,
"origState" : "New York",
"origStateId" : "NY",
"origWAC" : 22,
"quarter" : 2,
"tailNum" : "N323AA",
"taxiIn" : 19,
"taxiOut" : 13,
"wheelsOff" : Date( 1336295280000 ),
"wheelsOn" : Date( 1336303440000 ),
"year" : 2012 }我的scala代码是
val sc = new SparkContext("local", "Scala Word Count")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])println(“我们正在运行scala..count ",mongoRDD.count())
出于测试目的,我只想首先从destAirport 'LAX‘获取所有记录,我不知道查询是什么样子的,所以我尝试了两种不同格式的查询,"{destAirport:'LAX'}“和"{_id.destAirport:'LAX'}”
在运行应用程序时,控制台输出以下信息
INFO MongoCollectionSplitter: Created MongoCollectionSplitter: min={ "_id“:{ "$oid”:“51bf29d8ca69141e42097d7f”},max= { "_id“:{ "$oid”:"51bf29dfca69141e420991ad"}}
14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751不管查询是什么(甚至不设置查询),spark总是执行1191个任务。每项任务都会输出类似的单词。mongoRDD.count()总是输出0。
我的第一个问题是什么是正确的查询?
而且,以前我想的是mongodb所做的事情,即mongodb首先查询所有集合,然后将结果发回进行处理。但是现在在我看来,mongodb会将集合分割成很多,然后查询集合的那一小部分,然后发送该部分的结果。是吗?
发布于 2014-08-14 07:49:56
我的第一个问题是什么是正确的查询?
我不认为有“正确”的查询--您需要根据要处理的数据进行查询
而且,以前我想的是mongodb所做的事情,即mongodb首先查询所有集合,然后将结果发回进行处理。但是现在在我看来,mongodb会将集合分割成很多,然后查询集合的那一小部分,然后发送该部分的结果。是吗?
我也遇到过同样的问题。
我相信,在给定newAPIHadoopRDD的情况下,MongoInputSplit.class在计算拆分时不考虑查询。它只在计算了分裂后才应用。这意味着,无论查询有多细,拆分的数量都将保持不变,并且将与集合的大小成正比。
newAPIHadoopRDD正在使用StandaloneMongoSplitter。请注意,该类没有使用查询来计算拆分边界。它只是使用mongo的内部"splitVector“命令;从这里的文档- http://api.mongodb.org/internal/current/commands.html看,它也不考虑查询。
但我没有很好的解决办法。一种更好的方法是只在计算查询之后分割mongo集合,但这需要拆分器的另一个实现。下面是一些关于这个问题的好文章:http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/
https://stackoverflow.com/questions/25203325
复制相似问题