首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >AWS动态分析-按流分组

AWS动态分析-按流分组
EN

Stack Overflow用户
提问于 2017-11-20 10:23:11
回答 1查看 1.9K关注 0票数 1

我需要一些关于我的AWS运动分析功能的帮助。

我有一个包含以下数据的流:

代码语言:javascript
复制
hubId (Integer)
datetime (timestamp)
fid (varchar)
path (varchar)

我想将这些数据聚合到另一个流中,计算每小时的行数(页面浏览量)和每小时不同的fid数(访问者),按hubId分组。

目的地流:

代码语言:javascript
复制
profilesite_id(Integer) = hubId from source stream
datetime (timestamp)
visitors (Integer)
pageviews (Integer)

所以在MySQL中,我的函数是这样的:

代码语言:javascript
复制
SELECT hubId, CONCAT_WS(':', SUBSTR(datetime, 1, 13), '00:00') datetime, COUNT(*) pageviews, COUNT(DISTINCT(fid)) visitors
FROM tableStream
WHERE timestamp >= CURDATE()
GROUP BY hubId, CONCAT_WS(':', SUBSTR(datetime, 1, 13), '00:00');

我试着把这个请求转换成动态分析,但这很难(第一次对我来说.(对不起:)。

  • CURDATE()函数在Kinesis中不工作
  • CONCAT_WS也

所以目前我有这个动态分析功能:

代码语言:javascript
复制
CREATE OR REPLACE STREAM "bore_agg" (profilsite_id SMALLINT, datetime TIMESTAMP, visitors INT, pageviews INT);

-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "bore_agg"

-- Select all columns from source stream
SELECT 
    SOURCE_SQL_STREAM_001."hubId" profilsite_id, 
    CHAR_TO_TIMESTAMP('yyyy-MM-DD hh:mm:ss', TIMESTAMP_TO_CHAR('YYYY-MM-DD HH:00:00', SOURCE_SQL_STREAM_001."datetime")) datetime, 
    COUNT(DISTINCT(SOURCE_SQL_STREAM_001."fid")) visitors, 
    COUNT(*) pageviews
FROM SOURCE_SQL_STREAM_001
WHERE SOURCE_SQL_STREAM_001."datetime" >= CHAR_TO_TIMESTAMP('yyyy-MM-DD hh:mm:ss', TIME_TO_CHAR('YYYY-MM-DD HH:00:00', CURRENT_TIME))
GROUP BY 
    SOURCE_SQL_STREAM_001."hubId", 
    CHAR_TO_TIMESTAMP('yyyy-MM-DD hh:mm:ss', TIMESTAMP_TO_CHAR('YYYY-MM-DD HH:00:00', SOURCE_SQL_STREAM_001."datetime"));

但我犯了这个错误,我真的不明白该怎么做:

您的SQL代码中有一个错误,在更新应用程序时出现了一个问题。错误消息:创建或替换"STREAM_PUMP“以插入"bore_agg”选择SOURCE_SQL_STREAM_001."hubId“profilsite_id,CHAR_TO_TIMESTAMP(‘YYYY DD hh:mm:ss',TIMESTAMP_TO_CHAR('YYYY-MM-DD :00:00’,SOURCE_SQL_STREAM_001.”CREATE“)日期时间,COUNT(SOURCE_SQL_STREAM_001.”fid“)访问者,统计(*) SOURCE_SQL_STREAM_001 WHERE SOURCE_SQL_STREAM_001."datetime“>= CHAR_TO_TIMESTAMP(‘YYYY DD hh:mm:ss',TIME_TO_CHAR(YYYY-MM-DD HH:00:00',CURRENT_TIME)组,”hubId“,CHAR_TO_TIMESTAMP(’YYYY DD HH:MM:ss‘),TIMESTAMP_TO_CHAR(’YYYY DD HH:00:00‘),(SOURCE_SQL_STREAM_001.“日期时间”)。SQL错误消息:从第9行第1列到第11行,第120列:不能聚合无限流:未指定GROUP子句或不包含任何单调表达式。

有人能让我朝正确的方向走吗?

(预先谢谢:)

托马斯

EN

回答 1

Stack Overflow用户

发布于 2019-11-20 09:10:03

我知道这已经有一段时间了,自从亚当(唯一的)回应在这里。所以,以防万一,就像Adam指出的那样,数据分析流可以是一个“无限”的输入。因此,您需要告诉在哪里停止;即“聚合数据的最后一分钟或小时的我的流”。因此,在这个示例中(下面的代码),它将聚合流的传入数据,直到指定的分钟或小时。

注意:请记住,首先需要创建一个具有相同结构的流(返回的列数和数据类型),然后在新流中运行"INSERT-SELECT“,方法是创建一个泵,它将是扫描传入数据并返回结果(将在初始步骤中插入到流中)的过程。

示例:

代码语言:javascript
复制
-- ** Aggregate (COUNT, AVG, etc.) + Tumbling Time Window **
-- Performs function on the aggregate rows over a 10 second tumbling window for a specified column. 
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (ingest_time TIMESTAMP, vendorid int, count_vs_time int);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

-- Query 1):
-- Group by VendorID over the last 60 seconds of the stream.
SELECT STREAM STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS ingest_time, "vendorid", COUNT(*)
FROM "SOURCE_SQL_STREAM_001" 
GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), "vendorid";


--Query 2)
-- Group by VendorID and count, over the last hour of the stream.
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (hour_range TIMESTAMP, vendorid int, count_last_hr int);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO HOUR) AS hour_range, "vendorid", COUNT(*) as count_last_hr
FROM "SOURCE_SQL_STREAM_001" 
GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO HOUR), "vendorid";

HTH。

卡洛斯。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47389775

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档