这个问题的背景本质上是一个名为Sachini Jayase卡拉撰写的文章 @ WSO2的,它使用了不同的报告框架和WSO2业务活动监视器。我的做法大致相同,但使用REST来定义数据流并调用REST将数据推送到BAM中。然后使用单元查询来获取数据。但是,我似乎遗漏了一些东西,因为没有显示属性数据。因此产生了查询。
目前使用REST,它通过基于Perl的守护进程调用。这使用以下流定义和有效负载调用REST:
{
"name":"currentcostRealtime2.stream",
"version": "1.0.6",
"nickName": "Currentcost Realtime",
"description": "This is the Currentcost realtime stream",
"payloadData":[
{
"name":"sensor",
"type":"INT"
},
{
"name":"temp",
"type":"FLOAT"
},
{
"name":"timestamp",
"type":"STRING"
},
{
"name":"watt",
"type":"INT"
}
]
}。。和有效载荷定义..。
[
{
"payloadData" : [SENSOR, TEMP, "TIMESTAMP", WATT] ,
}
]我应该注意到,有效载荷是在提交之前替换的;例如,提交的实际有效载荷看起来如下:
[
{
"payloadData" : [1, 18.7, "2014-06-15 16:15:56", 1] ,
}
]查询执行时没有明显的问题,但我现在对BAM中的HIVE查询有问题,它为我提供条目输出,而不是值。例如,现在尝试执行以下单元查询:
CREATE TABLE IF NOT EXISTS CurrentCostDataTemp ( sensor INT, temp FLOAT, ts TIMESTAMP, watt INT )
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160",
"cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "admin",
"cassandra.ks.password" = "admin",
"cassandra.cf.name" = "currentcostRealtime2_stream",
"cassandra.columns.mapping" = "payload_sensor, payload_temp, payload_timestamp, payload_watt" );
select * from CurrentCostDataTemp; 。。但是这只给出了以下内容(见下面的具体图片)--例如,没有显示属性级别的数据。但是,显然存在EVENT_KS条目,因为它输出了4行。因此,问题是我如何引用数据来提取值,或者这里还有什么我不知道的事情?:
key sensor temp ts watt
1402816273765::192.168.1.106::9443::52
1402815283659::192.168.1.106::9443::51
1402815238323::192.168.1.106::9443::49
1402815280532::192.168.1.106::9443::50 通过向Cqlsh查询,已经验证了数据在Cassandra中-请参见以下内容:
cqlsh:EVENT_KS> select * from "currentcostRealtime_stream";
key | Description | Name | Nick_Name | StreamId | Timestamp | Version | meta_ipAdd | payload_sensor | payload_temp | payload_timestamp | payload_watt
----------------------------------------+-----------------------------------------+----------------------------+----------------------+----------------------------------+---------------+---------+------------+----------------+--------------+---------------------+--------------
1402815283659::192.168.1.106::9443::51 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815283659 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 14:54:43 | 1
1402815238323::192.168.1.106::9443::49 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815238323 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 14:53:58 | 1
1402815280532::192.168.1.106::9443::50 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815280532 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 14:54:40 | 1
1402816273765::192.168.1.106::9443::52 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402816273765 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 15:11:13 | 1
(4 rows)
cqlsh:EVENT_KS>很可能只是我监督过的一个小问题,但如果其他人看到了这件事,也能做出回应,那就太好了。
当向MySQL DB外部添加远程表定义时,表和所有表都是创建的,但问题似乎是如何访问EVENT_KS表本身中的属性数据,并通过EVENT_KS脚本创建和访问该属性数据。
提前感谢!
/Jorgen
更新--周四19日--解决了的问题,并给出了一些提示。下面的代码现在运行良好,非常好。非常感谢你们的回应。
drop table CurrentCostDataTemp10;
drop table CurrentCostDataTemp_Summary10;
CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT )
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160",
"cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "<USER>",
"cassandra.ks.password" = "<PASSWORD>",
"cassandra.cf.name" = "currentcostsimple5_stream",
"cassandra.columns.mapping" = ":key, payload_sensor, Timestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt" );
CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp_Summary10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT )
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
'mapred.jdbc.url' = 'jdbc:mysql://127.0.0.1:8889/currentcost' ,
'mapred.jdbc.username' = '<USER>',
'mapred.jdbc.password' = '<PASSWORD>',
'hive.jdbc.update.on.duplicate'= 'true',
'hive.jdbc.primary.key.fields' = 'messageRowID',
'hive.jdbc.table.create.query' = 'CREATE TABLE CurrentCostDataTemp1 ( messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, payload_sensor TINYINT(4), messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql DATETIME, payload_watt INT ) ');
insert overwrite table CurrentCostDataTemp_Summary10 select messageRowID, payload_sensor, messageTimestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt FROM CurrentCostDataTemp10;在WSO2业务活动监视器中使用不同的报告框架。萨奇尼·贾亚塞卡拉( Sachini Jayase卡拉)
发布于 2014-06-19 06:23:06
我已将你的询问修改如下。请试试看。
CREATE external TABLE IF NOT EXISTS CurrentCostDataTemp ( key string, sensor INT, temp FLOAT, ts TIMESTAMP, watt INT )
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160",
"cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "admin",
"cassandra.ks.password" = "admin",
"cassandra.cf.name" = "currentcostRealtime2_stream",
"cassandra.columns.mapping" = ":key,payload_sensor, payload_temp, payload_timestamp, payload_watt" );
select * from CurrentCostDataTemp; 发布于 2014-06-17 23:13:16
尝试更改脚本的第一行,如下所示。
如果不存在EXTERNAL表,则创建CurrentCostDataTemp (key STRING、传感器INT、临时浮动、ts时间戳、瓦特INT)
(如果key STRING部件出现错误,请删除它。)
注意:您可能必须在上面运行DROP TABLE CurrentCostDataTemp之前运行它,如果已经创建了它,则在您之前运行它。
https://stackoverflow.com/questions/24262771
复制相似问题