首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spark-streaming和连接池实现

spark-streaming和连接池实现
EN

Stack Overflow用户
提问于 2015-05-26 14:14:20
回答 2查看 10.2K关注 0票数 9

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams上的spark流媒体网站提到了以下代码:

代码语言:javascript
复制
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

我曾尝试使用org.apache.commons.pool2实现此功能,但使用预期的java.io.NotSerializableException运行应用程序失败:

代码语言:javascript
复制
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 ...

我想知道实现一个可序列化的连接池有多现实。有没有人成功地做到了这一点?

谢谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-06-02 18:17:18

为了解决这个“本地资源”问题,我们需要一个单例对象--即保证在JVM中实例化一次且只实例化一次的对象。幸运的是,Scala object提供了开箱即用的功能。

第二件要考虑的事情是,这个单例将为在托管它的同一JVM上运行的所有任务提供一个服务,因此,它必须负责并发性和资源管理。

让我们试着勾画(*)这样的服务:

代码语言:javascript
复制
class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
   def release() = pool.returnObject(socket)
}

// singleton object 
object SocketPool {
    var hostPortPool:Map[(String, Int),ObjectPool] = Map()
    sys.addShutdownHook{
        hostPortPool.values.foreach{ // terminate each pool } 
    }

    // factory method
    def apply(host:String, port:String): ManagedSocket = {
        val pool = hostPortPool.getOrElse{(host,port), {
            val p = ??? // create new pool for (host, port)
            hostPortPool += (host,port) -> p
            p
        }
        new ManagedSocket(pool, pool.borrowObject)
    }
}

然后用法变成:

代码语言:javascript
复制
val host = ???
val port = ???
stream.foreachRDD { rdd =>
    rdd.foreachPartition { partition => 
        val mSocket = SocketPool(host, port)
        partition.foreach{elem => 
            val os = mSocket.socket.getOutputStream()
            // do stuff with os + elem
        }
        mSocket.release()
    }
}

我假设问题中使用的GenericObjectPool负责并发性。否则,需要使用某种形式的同步来保护对每个pool实例的访问。

(*)为说明如何设计此类对象的想法而提供的代码-需要额外的工作才能转换为工作版本。

票数 14
EN

Stack Overflow用户

发布于 2015-05-26 15:15:46

下面的答案是错误的!我把答案留在这里供参考,但由于以下原因,答案是错误的。socketPool被声明为lazy val,因此它将在每次第一次访问请求时被实例化。由于SocketPool case类不是Serializable,这意味着它将在每个分区中被实例化。这使得连接池变得无用,因为我们希望跨分区和RDDs保持连接。无论这是作为伴生对象实现还是作为case类实现,都没有区别。底线是:连接池必须是Serializable,而apache commons池不是。

代码语言:javascript
复制
import java.io.PrintStream
import java.net.Socket

import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory}
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.spark.streaming.dstream.DStream

/**
 * Publish a Spark stream to a socket.
 */
class PooledSocketStreamPublisher[T](host: String, port: Int)
  extends Serializable {

    lazy val socketPool = SocketPool(host, port)

    /**
     * Publish the stream to a socket.
     */
    def publishStream(stream: DStream[T], callback: (T) => String) = {
        stream.foreachRDD { rdd =>

            rdd.foreachPartition { partition =>

                val socket = socketPool.getSocket
                val out = new PrintStream(socket.getOutputStream)

                partition.foreach { event =>
                    val text : String = callback(event)
                    out.println(text)
                    out.flush()
                }

                out.close()
                socketPool.returnSocket(socket)

            }
        }
    }

}

class SocketFactory(host: String, port: Int) extends BasePooledObjectFactory[Socket] {

    def create(): Socket = {
        new Socket(host, port)
    }

    def wrap(socket: Socket): PooledObject[Socket] = {
        new DefaultPooledObject[Socket](socket)
    }

}

case class SocketPool(host: String, port: Int) {

    val socketPool = new GenericObjectPool[Socket](new SocketFactory(host, port))

    def getSocket: Socket = {
        socketPool.borrowObject
    }

    def returnSocket(socket: Socket) = {
        socketPool.returnObject(socket)
    }

}

您可以按如下方式调用它:

代码语言:javascript
复制
val socketStreamPublisher = new PooledSocketStreamPublisher[MyEvent](host = "10.10.30.101", port = 29009)
socketStreamPublisher.publishStream(myEventStream, (e: MyEvent) => Json.stringify(Json.toJson(e)))
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30450763

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档