首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将海量数据从一个红移表卸载到另一个表的策略?

将海量数据从一个红移表卸载到另一个表的策略?
EN

Stack Overflow用户
提问于 2016-03-09 23:46:17
回答 2查看 2.4K关注 0票数 0

我的公司每月收集大量关于我们的服务器使用情况的数据(大约有100亿行)。我的任务是将数据从这个初始表卸载到S3,然后将其复制到另一个集群中的一个表中。然后,这些数据将用于Tableau中的仪表板报告。

我遇到了一些问题,卸载(以及某种程度上的复制)步骤由于Unexpected error: The server is already closed.之类的错误而间歇性地失败,这使我认为这实际上是在超时。还有一些奇怪的行为,它在卸载步骤上搅动和挂起,在它失败后,我可以看到它将所有数据和清单文件卸载到桶中。

面对这些不确定性,我不得不寻找其他策略来分配任务。我对Spark非常感兴趣,目前正在使用pyspark学习它,我想知道我是否可以以某种方式减轻分布式处理的问题。是否可以将数据存储在ec2中并让Tableau从中提取数据?有什么办法分配卸载过程吗?

我将在下面的过程中包含代码,以便如果有什么瓶颈,我可以纠正它:

代码语言:javascript
复制
from datetime import datetime
import logging

import boto3
import psycopg2 as ppg2

from inst_utils import aws, misc_utils
from inst_config import config3

if __name__ == '__main__':
    logger = misc_utils.initialize_logger(config3.REQUESTS_USAGE_LOGFILE)

    # Unload step
    timestamp = datetime.now()
    month = timestamp.month
    year = timestamp.year

    s3_sesh = boto3.session.Session(**config3.S3_INFO)
    s3 = s3_sesh.resource('s3')
    fname = 'load_{}_{:02d}'.format(year, month)
    bucket_url = ('canvas_logs/agg_canvas_logs_user_agent_types/'
                  '{}/'.format(fname))
    unload_url = ('s3://{}/{}'.format(config3.S3_BUCKET, bucket_url))
    s3.Bucket(config3.S3_BUCKET).put_object(Key=bucket_url)
    table_name = 'requests_{}_{:02d}'.format(year, month - 1)
    logger.info('Starting unload.')
    try:
        with ppg2.connect(**config3.REQUESTS_POSTGRES_INFO) as conn:
            cur = conn.cursor()
            # TODO add sql the sql folder to clean up this program.
            unload = r'''
            unload ('select
                        user_id
                        ,course_id
                        ,request_month
                        ,user_agent_type
                        ,count(session_id)
                        ,\'DEV\' etl_requests_usage
                        ,CONVERT_TIMEZONE(\'MST\', getdate()) etl_datetime_local
                        ,\'agg_canvas_logs_user_agent_types\' etl_transformation_name
                        ,\'N/A\' etl_pdi_version
                        ,\'N/A\' etl_pdi_build_version
                        ,null etl_pdi_hostname
                        ,null etl_pdi_ipaddress
                        ,null etl_checksum_md5
                     from
                          (select distinct
                              user_id
                              ,context_id as course_id
                              ,date_trunc(\'month\', request_timestamp) request_month
                              ,session_id
                              ,case
                              when user_agent like \'%CanvasAPI%\' then \'api\'
                              when user_agent like \'%candroid%\' then \'mobile_app_android\'
                              when user_agent like \'%iCanvas%\' then \'mobile_app_ios\'
                              when user_agent like \'%CanvasKit%\' then \'mobile_app_ios\'
                              when user_agent like \'%Windows NT%\' then \'desktop\'
                              when user_agent like \'%MacBook%\' then \'desktop\'
                              when user_agent like \'%iPhone%\' then \'mobile\'
                              when user_agent like \'%iPod Touch%\' then \'mobile\'
                              when user_agent like \'%iPad%\' then \'mobile\'
                              when user_agent like \'%iOS%\' then \'mobile\'
                              when user_agent like \'%CrOS%\' then \'desktop\'
                              when user_agent like \'%Android%\' then \'mobile\'
                              when user_agent like \'%Linux%\' then \'desktop\'
                              when user_agent like \'%Mac OS%\' then \'desktop\'
                              when user_agent like \'%Macintosh%\' then \'desktop\'
                              else \'other_unknown\'
                              end as user_agent_type
                            from {}
                            where context_type = \'Course\')
                            group by
                              user_id
                              ,course_id
                              ,request_month
                              ,user_agent_type')
            to '{}'
            credentials 'aws_access_key_id={};aws_secret_access_key={}'
            manifest
            gzip
            delimiter '|'
            '''.format(
                table_name, unload_url, config3.S3_ACCESS, config3.S3_SECRET)
            cur.execute(unload)
            conn.commit()

    except ppg2.Error as e:
        logger.critical('Error occurred during transaction: {}'.format(e))
        raise Exception('{}'.format(e))

    logger.info('Starting copy process.')
    schema_name = 'ods_canvas_logs'
    table_name = 'agg_canvas_logs_user_agent_types'

    manifest_url = unload_url + 'manifest'
    logger.info('Manifest url: {}'.format(manifest_url))
    load = aws.RedshiftLoad(schema_name,
                            table_name,
                            manifest_url,
                            config3.S3_INFO,
                            config3.REDSHIFT_POSTGRES_INFO_PROD,
                            config3.REDSHIFT_POSTGRES_INFO,
                            safe_load=True,
                            truncate=True
                            )
    load.execute()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-03-10 14:45:32

FWIW,我认为您可能应该发布另一个问题,详细介绍您尝试过的UNLOAD

在卸载整个表时,我发现UNLOAD工作得更好,例如,不使用查询。

尝试使用要卸载的数据子集创建一个临时表,然后对整个表进行UNLOAD,然后删除临时表。

代码语言:javascript
复制
CREATE TEMP TABLE a AS SELECT b FROM c WHERE d = e;
UNLOAD (SELECT * FROM a) TO 's3://bucket' CREDENTIALS … ;
DROP TABLE a;

关于你上述的实际问题,我认为你在这方面不会取得太大的成功。瓶颈将不是Spark或Python,而仅仅是Redshift根本不是为返回大量行而设计的。

票数 3
EN

Stack Overflow用户

发布于 2016-03-14 05:52:12

我同意@是多余的,也是最有可能引发麻烦的原因,因为它们迫使Redshift在复制之前在单个领导人节点上执行整个数据集的排序。

Redshift的COPY命令的最大好处是,如果查询允许,每个节点都可以与其他节点并行卸载自己的数据。因此,如果您有10个节点,那么所有10个节点都可以创建S3连接(多个节点)并开始输出数据。

在您的情况下,通过具有这种不同,您基本上禁用了这一点,因为需要首先重新计算所有数据。

因此,我和其他人一起说,最好还是按原样转储整个表(对集群的负担会更快、更少),或者根据日期范围进行简单的增量上传,可能会有其他一些简单的条件(比如context_type = \'Course\') )。只要没有/DISTINCT/ORDER BY组,就应该并行运行,而且运行速度非常快。

使用Spark不会有什么区别,因为它只需要先通过SQL连接来提取数据。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35904944

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档