首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >WSO2BAM REST流输入到BAM/Cassandra;不能使用单元查询访问EVENT_KS数据?

WSO2BAM REST流输入到BAM/Cassandra;不能使用单元查询访问EVENT_KS数据?
EN

Stack Overflow用户
提问于 2014-06-17 11:37:29
回答 2查看 314关注 0票数 0

这个问题的背景本质上是一个名为Sachini Jayase卡拉撰写的文章 @ WSO2的,它使用了不同的报告框架和WSO2业务活动监视器。我的做法大致相同,但使用REST来定义数据流并调用REST将数据推送到BAM中。然后使用单元查询来获取数据。但是,我似乎遗漏了一些东西,因为没有显示属性数据。因此产生了查询。

目前使用REST,它通过基于Perl的守护进程调用。这使用以下流定义和有效负载调用REST:

代码语言:javascript
复制
{
  "name":"currentcostRealtime2.stream",
  "version": "1.0.6",
  "nickName": "Currentcost Realtime",
  "description": "This is the Currentcost realtime stream",
  "payloadData":[
    {
      "name":"sensor",
      "type":"INT"
    },
    {
      "name":"temp",
      "type":"FLOAT"
    },
    {
      "name":"timestamp",
      "type":"STRING"
    },
    {
      "name":"watt",
      "type":"INT"
    }
  ]
}

。。和有效载荷定义..。

代码语言:javascript
复制
[
 {
   "payloadData" : [SENSOR, TEMP, "TIMESTAMP", WATT] ,
 }
]

我应该注意到,有效载荷是在提交之前替换的;例如,提交的实际有效载荷看起来如下:

代码语言:javascript
复制
[
 {
   "payloadData" : [1, 18.7, "2014-06-15 16:15:56", 1] ,
 }
]

查询执行时没有明显的问题,但我现在对BAM中的HIVE查询有问题,它为我提供条目输出,而不是值。例如,现在尝试执行以下单元查询:

代码语言:javascript
复制
CREATE TABLE IF NOT EXISTS CurrentCostDataTemp ( sensor INT, temp FLOAT, ts TIMESTAMP, watt INT ) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
    "cassandra.port" = "9160",
    "cassandra.ks.name" = "EVENT_KS",
    "cassandra.ks.username" = "admin",
    "cassandra.ks.password" = "admin",
    "cassandra.cf.name" = "currentcostRealtime2_stream",
    "cassandra.columns.mapping" = "payload_sensor, payload_temp, payload_timestamp, payload_watt" );

select * from CurrentCostDataTemp;                                  

。。但是这只给出了以下内容(见下面的具体图片)--例如,没有显示属性级别的数据。但是,显然存在EVENT_KS条目,因为它输出了4行。因此,问题是我如何引用数据来提取值,或者这里还有什么我不知道的事情?:

代码语言:javascript
复制
key sensor  temp    ts  watt
1402816273765::192.168.1.106::9443::52              
1402815283659::192.168.1.106::9443::51              
1402815238323::192.168.1.106::9443::49              
1402815280532::192.168.1.106::9443::50              

通过向Cqlsh查询,已经验证了数据在Cassandra中-请参见以下内容:

代码语言:javascript
复制
cqlsh:EVENT_KS> select * from "currentcostRealtime_stream";

 key                                    | Description                             | Name                       | Nick_Name            | StreamId                         | Timestamp     | Version | meta_ipAdd | payload_sensor | payload_temp | payload_timestamp   | payload_watt
----------------------------------------+-----------------------------------------+----------------------------+----------------------+----------------------------------+---------------+---------+------------+----------------+--------------+---------------------+--------------
 1402815283659::192.168.1.106::9443::51 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815283659 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 14:54:43 |            1
 1402815238323::192.168.1.106::9443::49 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815238323 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 14:53:58 |            1
 1402815280532::192.168.1.106::9443::50 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815280532 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 14:54:40 |            1
 1402816273765::192.168.1.106::9443::52 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402816273765 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 15:11:13 |            1

(4 rows)

cqlsh:EVENT_KS>

很可能只是我监督过的一个小问题,但如果其他人看到了这件事,也能做出回应,那就太好了。

当向MySQL DB外部添加远程表定义时,表和所有表都是创建的,但问题似乎是如何访问EVENT_KS表本身中的属性数据,并通过EVENT_KS脚本创建和访问该属性数据。

提前感谢!

/Jorgen

更新--周四19日--解决了的问题,并给出了一些提示。下面的代码现在运行良好,非常好。非常感谢你们的回应。

代码语言:javascript
复制
drop table CurrentCostDataTemp10;
drop table CurrentCostDataTemp_Summary10;

CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT ) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
  "cassandra.port" = "9160",
  "cassandra.ks.name" = "EVENT_KS",
  "cassandra.ks.username" = "<USER>",
  "cassandra.ks.password" = "<PASSWORD>",
  "cassandra.cf.name" = "currentcostsimple5_stream",
  "cassandra.columns.mapping" = ":key, payload_sensor, Timestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt" );

CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp_Summary10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT ) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
  'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
  'mapred.jdbc.url' = 'jdbc:mysql://127.0.0.1:8889/currentcost' ,
  'mapred.jdbc.username' = '<USER>',
  'mapred.jdbc.password' = '<PASSWORD>',
  'hive.jdbc.update.on.duplicate'= 'true',
  'hive.jdbc.primary.key.fields' = 'messageRowID',
  'hive.jdbc.table.create.query' = 'CREATE TABLE CurrentCostDataTemp1 ( messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, payload_sensor TINYINT(4), messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql DATETIME, payload_watt INT ) ');

insert overwrite table CurrentCostDataTemp_Summary10 select messageRowID, payload_sensor, messageTimestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt FROM CurrentCostDataTemp10;

在WSO2业务活动监视器中使用不同的报告框架。萨奇尼·贾亚塞卡拉( Sachini Jayase卡拉)

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-06-19 06:23:06

我已将你的询问修改如下。请试试看。

代码语言:javascript
复制
CREATE external TABLE IF NOT EXISTS CurrentCostDataTemp ( key string, sensor INT, temp FLOAT, ts TIMESTAMP, watt INT ) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
    "cassandra.port" = "9160",
    "cassandra.ks.name" = "EVENT_KS",
    "cassandra.ks.username" = "admin",
    "cassandra.ks.password" = "admin",
    "cassandra.cf.name" = "currentcostRealtime2_stream",
    "cassandra.columns.mapping" = ":key,payload_sensor, payload_temp, payload_timestamp, payload_watt" );

select * from CurrentCostDataTemp;  
票数 0
EN

Stack Overflow用户

发布于 2014-06-17 23:13:16

尝试更改脚本的第一行,如下所示。

如果不存在EXTERNAL表,则创建CurrentCostDataTemp (key STRING、传感器INT、临时浮动、ts时间戳、瓦特INT)

(如果key STRING部件出现错误,请删除它。)

注意:您可能必须在上面运行DROP TABLE CurrentCostDataTemp之前运行它,如果已经创建了它,则在您之前运行它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/24262771

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档