我要记录卡桑德拉的所有记录。目前,我正在使用akka-persistence-cassandra来流数据:
val querier =
PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val selectDistinctPersistenceIds = new SimpleStatement(
"SELECT DISTINCT persistence_id, partition_nr FROM messages")
.setFetchSize(100000)
querier.session.select(selectDistinctPersistenceIds).map { row =>
val id = row.getString(0)
id
}当记录的数量在150万左右时,这样做就很好了。但是,当记录的数量超过超过150万条记录时,我就会得到read timeout错误。
我正在使用:
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"编辑:错误日志:
com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:25)
...发布于 2021-10-27 10:56:27
我能够通过为cassandra-journal.socket.read-timeout-millis设置一个比默认值12000ms更高的值来解决这个问题。
cassandra-journal {
...
socket {
# the per-host read timeout in milliseconds. Should be higher than the timeout settings
# used on the Cassandra side.
read-timeout-millis = 30000
}发布于 2021-10-18 03:20:40
问题是,您的司机会话,设置它,根据您的需要。
可能与间隙超时或增加no的重试有关。和超时设置。
https://stackoverflow.com/questions/69603571
复制相似问题