我正在跟踪文档,以尝试如何通过直接从IMap中查找来丰富无界流。我有两张地图:
Map<String, Product> (ProductId as key)Map<String, Seller> (SellerId as key)Product和Seller都是非常简单的类:
public class Product implements DataSerializable {
String productId;
String sellerId;
int price;
...
public class Seller implements DataSerializable {
String sellerId;
int revenue;
...我有两个数据生成器不断将数据推送到这两张地图上。这两个映射都启用了事件日志。我已经证实了这一事件-日志运行良好。
我想用Product映射丰富Seller映射的流事件。下面是我的代码片段:
IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
.withoutTimestamps()
.groupingKey(Product::getSellerId)
.mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
.drainTo(getSink());
try {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
jobConfig.setName(Constants.BASIC_TASK);
Job job = jetClient.newJob(p, jobConfig);
} finally {
jetClient.shutdown();
}提交作业时,我得到以下错误:
com.hazelcast.spi.impl.operationservice.impl.Invocation - 172.31.33.212:80 jet异步执行执行回调失败: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService',identityHash=1939050026,partitionId=70,replicaIndex=0,callId=-37944,invocationTime=1570410704479 (2019-10-07年:11:44.479),等待时间=-1,callTimeout=60000,name=sellerMap},tryCount=250,tryPauseMillis=500,invokeCount=1,callTimeoutMillis=60000,firstInvocationTimeMs=1570410704479,firstInvocationTime=‘2019-10-01:11:44.479’,lastHeartbeatMillis=0,lastHeartbeatTime='1970-01-01 : 00:00:00.000',target=172.31.33.212:80,pendingResponse={VOID},backupsAcksExpected=0,backupsAcksReceived=0,connection=null}
我试图在集群中放置一个和两个实例,并得到相同的错误消息。我不知道根本原因是什么。
发布于 2019-10-14 12:47:55
您的问题似乎是一个ClassNotFoundException,即使您向作业添加了适当的类。存储在IMap中的对象独立于Jet作业,当事件日志源请求它们时,Jet的IMap代码试图反序列化它们,但失败了,因为Jet的类路径中没有您的域模型类。
要继续前进,请将IMap中使用的类添加到Jet的类路径中。我们正在寻找一个解决方案,将取消这一要求。
日志输出中没有异常堆栈跟踪的原因是,在没有显式添加更灵活的日志记录模块(如java.util.logging )时,您最终会使用默认的Log4j设置。
下一个版本的喷气机的包装将改善这方面。在此之前,您可以执行以下步骤:
lib目录并将Log4j下载到其中:
$ cd lib $ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jarbin/common.sh将模块添加到类路径。在文件的末尾有一行
CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH“
您可以复制这一行,并将hazelcast-jet-3.1替换为log4j-1.2.17。commons.sh的末尾,有一个多行命令来构造JAVA_OPTS变量。将"-Dhazelcast.logging.type=log4j"和"-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties"添加到列表中。log4j.properties目录中创建一个文件config:log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n
# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info
log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdouthttps://stackoverflow.com/questions/58262754
复制相似问题