我使用火花流2.3.0版本,其中使用杰克逊2.6.7。我使用的是,它使用jackson版本2.9.5。
我正在尝试从ip地址获取地理细节,使用maxmind library.Below代码在相同的项目和包中工作得很好。
package org.apache.spark.examples.streaming
import java.io.File
import java.net.InetAddress
import com.maxmind.db.CHMCache
import com.maxmind.geoip2.DatabaseReader
//import com.tvid.converter.IpParser.ip2geo
object GeoIP2Test {
def getGeoFromIP( ip_address:String, reader:DatabaseReader) : String = {
//val reader = new DatabaseReader.Builder(new File(db_file)).withCache(new CHMCache()).build()
//DatabaseReader reader = new DatabaseReader.Builder(database).build();
val ipAddress = InetAddress.getByName(ip_address)
// Replace "city" with the appropriate method for your database, e.g.,
// "country".
val response = reader.city(ipAddress)
val country = response.getCountry
// String country_iso_code = country.getIsoCode();
val country_name = country.getName
val subdivision = response.getMostSpecificSubdivision
val subdivision_name = subdivision.getName
// String subdivision_iso_code = subdivision.getIsoCode();
val city = response.getCity
val city_name = city.getName
val postal = response.getPostal
val postal_code = postal.getCode
val location = response.getLocation
val latitude = location.getLatitude.toString
val longitude = location.getLongitude.toString
val res = Array(country_name, subdivision_name, city_name, postal_code, latitude, longitude)
val geo_details = res(0) + "," + res(1) + "," + res(2) + "," + res(3) + "," + res(4) + "," + res(5)
return geo_details
}
def main(args: Array[String]): Unit = {
val db_file = "/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb"
val ip_address = "123.123.123.123"
val reader = new DatabaseReader.Builder(new File(db_file)).withCache(new CHMCache()).build()
val geo_details = getGeoFromIP(ip_address,reader)
print(geo_details)
//try java method integration
//val res = ip2geo(ip_address,db_file)
//print(res)
}
}这件事很好,给了我机会:中国,北京,北京,空,39.9289,116.3883
但是,当我试图在火花流中使用此方法时,可以使用代码段:
var db_file = ssc.sparkContext.broadcast(new DatabaseReader.Builder(new File("/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb")).withCache(new CHMCache()).build())
val reader=db_file.value
//val db_file = "/Users/ajay/Documents/maxmind_databse/GeoIP2-City.mmdb"
val ip_address = "123.123.123.123"
val geo_details = getGeoFromIP(ip_address,reader)
print(geo_details)它给我带来了错误:
Exception in thread "main" com.amazonaws.SdkClientException: Unable to marshall request to JSON: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)作为火花杰克逊版本是2.6.7。
我的build.sbt是:
name := "scala_spark_stream_metrices"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"
libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"如何确保getGeoFromIP方法使用Jackson2.9.5,重写Sparkaskson2.6.7
发布于 2018-05-21 14:35:41
将build.sbt改为下面的代码片段解决了我的问题。
name := "scala_spark_stream_metrices"
version := "1.0"
scalaVersion := "2.11.8"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.5"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-cbor
dependencyOverrides += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"
// https://mvnrepository.com/artifact/com.github.seratch/awscala
libraryDependencies += "com.github.seratch" %% "awscala" % "0.6.3"
//geo
// https://mvnrepository.com/artifact/com.maxmind.geoip2/geoip2
//libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0" exclude ("com.fasterxml.jackson.core","jackson-annotations") exclude ("com.fasterxml.jackson.core","jackson-core") exclude ("com.fasterxml.jackson.core","jackson-databind")
libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"发布于 2018-05-21 09:43:32
要使用Jackson v2.9.5,您必须重写它的依赖性。如下所示:
name := "scala_spark_stream_metrices"
version := "1.0"
scalaVersion := "2.11.8"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5"
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kinesis-asl
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
// https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.2.3"
libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.12.0"这将确保用Jackson v2.9.5代替v2.6.7
希望能帮上忙!
https://stackoverflow.com/questions/50445330
复制相似问题