我正在尝试将数据库数据加载到hazelcast jet地图实例中
public void sourceStage(JetInstance jet, Pipeline pipeLine){
SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
dataSource.setDriver(new org.postgresql.Driver());
dataSource.setUrl("jdbc:postgresql://localhost/development");
dataSource.setUsername("postgres");
dataSource.setPassword("root");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String sql1 = "SELECT id1 , id2 FROM public.tbl_data where name='india'";
jdbcTemplate.query(sql1, new RowMapper<Object>() {
@Override
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
jet.getMap("t1").put(rs.getString("id1"), rs.getString("id2"));
return null;
}
});
}并再次尝试使用与管道中的源相同的地图
pipeLine.readFrom(Sources.map("t1")).map(map -> {
System.err.println(map.getKey() + "---" + map.getValue());
return map;
}).writeTo(Sinks.logger());我发现我不能读取数据作为来源。但是如果检查jet.getMap("t1").size(),我得到的是大小值。
发布于 2021-09-17 07:26:36
要加载数据库数据,请使用具有JDBC source和IMap Sink的管道。将两者混合到一个阶段打破了Jet引擎的并行执行模型。
https://stackoverflow.com/questions/69216664
复制相似问题