首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >解析> 30 GB文件的最少时间

解析> 30 GB文件的最少时间
EN

Code Review用户
提问于 2015-03-04 20:11:20
回答 1查看 808关注 0票数 7

我的目标是:有效地将数据从亚马逊的S3转移到Amazon。

基本上,我使用下面的代码将我的S3上的所有CSV文件移动到Redshift。我解析了部分文件,构建了一个表结构,然后使用copy命令将数据加载到redshift中。

代码语言:javascript
复制
'''
Created on Feb 25, 2015
@author: Siddartha.Reddy
'''

import sys
from boto.s3 import connect_to_region
from boto.s3.connection import Location
import csv
import itertools
import psycopg2

''' ARGUMENTS TO PASS '''
AWS_KEY = sys.argv[1]
AWS_SECRET_KEY = sys.argv[2]
S3_DOWNLOAD_PATH = sys.argv[3]
REDSHIFT_SCHEMA = sys.argv[4]
TABLE_NAME = sys.argv[5]

UTILS = S3_DOWNLOAD_PATH.split('/')

class UTIL():

    global UTILS

    def bucket_name(self):
        self.BUCKET_NAME = UTILS[0]
        return self.BUCKET_NAME

    def path(self):
        self.PATH = ''
        offset = 0
        for value in UTILS:
            if offset == 0:
                offset += 1
            else:
                self.PATH = self.PATH + value + '/'
        return self.PATH[:-1]

def GETDATAINMEMORY():
    conn = connect_to_region(Location.USWest2,aws_access_key_id = AWS_KEY,
        aws_secret_access_key = AWS_SECRET_KEY,
        is_secure=False,host='s3-us-west-2.amazonaws.com'
        )
    ut = util()
    BUCKET_NAME = ut.bucket_name()
    PATH = ut.path()
    filelist = conn.lookup(BUCKET_NAME)

    ''' Fecth part of the data from S3 '''
    for path in filelist:
        if PATH in path.name:
            DATA = path.get_contents_as_string(headers={'Range': 'bytes=%s-%s' % (0,100000000)}) 

    return DATA

def TRAVERSEDATA():
    DATA = getdatainmemory()
    CREATE_TABLE_QUERY = 'CREATE TABLE ' + REDSHIFT_SCHEMA + '.' + TABLE_NAME + '( '
    JUNKED_OUT = DATA[3:]
    PROCESSED_DATA = JUNKED_OUT.split('\n')
    CSV_DATA = csv.reader(PROCESSED_DATA,delimiter=',')
    COUNTER,STRING,NUMBER = 0,0,0
    COLUMN_TYPE = []

    ''' GET COLUMN NAMES AND COUNT '''
    for line in CSV_DATA:
        NUMBER_OF_COLUMNS = len(line)
        COLUMN_NAMES = line
        break;

    ''' PROCESS COLUMN NAMES '''
    a = 0
    for REMOVESPACE in COLUMN_NAMES:
        TEMPHOLDER = REMOVESPACE.split(' ')
        temp1 = ''
        for x in TEMPHOLDER:
            temp1 = temp1 + x 
        COLUMN_NAMES[a] = temp1
        a = a + 1

    ''' GET COLUMN DATA TYPES '''
    # print(NUMBER_OF_COLUMNS,COLUMN_NAMES,COUNTER)
    # print(NUMBER_OF_COLUMNS)
    i,j,a= 0,500,0 
    while COUNTER < NUMBER_OF_COLUMNS:
        for COLUMN in itertools.islice(CSV_DATA,i,j+1):
            if COLUMN[COUNTER].isdigit():
                NUMBER = NUMBER + 1
            else:
                STRING = STRING + 1
        if NUMBER == 501:
            COLUMN_TYPE.append('INTEGER')
            # print('I CAME IN')
            NUMBER = 0
        else:
            COLUMN_TYPE.append('VARCHAR(2500)')
            STRING = 0
        COUNTER = COUNTER + 1
        # print(COUNTER)

    COUNTER = 0
    ''' BUILD SCHEMA '''
    while COUNTER < NUMBER_OF_COLUMNS:
        if COUNTER == 0:
            CREATE_TABLE_QUERY = CREATE_TABLE_QUERY + COLUMN_NAMES[COUNTER] + ' ' + COLUMN_TYPE[COUNTER] + ' NOT NULL,'
        else:
            CREATE_TABLE_QUERY = CREATE_TABLE_QUERY + COLUMN_NAMES[COUNTER] + ' ' + COLUMN_TYPE[COUNTER] + ' ,'
        COUNTER += 1
    CREATE_TABLE_QUERY = CREATE_TABLE_QUERY[:-2]+ ')'

    return CREATE_TABLE_QUERY

def COPY_COMMAND():
    S3_PATH = 's3://' + S3_DOWNLOAD_PATH
    COPY_COMMAND = "COPY "+REDSHIFT_SCHEMA+"."+TABLE_NAME+" from '"+S3_PATH+"' credentials 'aws_access_key_id="+AWS_KEY+";aws_secret_access_key="+AWS_SECRET_KEY+"' REGION 'us-west-2' csv delimiter ',' ignoreheader as 1 TRIMBLANKS maxerror as 500"
    return COPY_COMMAND

def S3TOREDSHIFT():
    conn = psycopg2.connect("dbname='xxx' port='5439' user='xxx' host='xxxxxx' password='xxxxx'")
    cursor = conn.cursor()
    cursor.execute('DROP TABLE IF EXISTS '+ REDSHIFT_SCHEMA + "." + TABLE_NAME)
    SCHEMA = TRAVERSEDATA()
    print(SCHEMA)
    cursor.execute(SCHEMA)
    COPY = COPY_COMMAND()
    print(COPY)
    cursor.execute(COPY)
    conn.commit()

S3TOREDSHIFT()

当前的挑战:

创建表格结构面临的挑战:

  1. 字段长度:现在我只是硬编码VARCHAR字段到2500。我的所有文件都>30 of,通过解析整个文件来计算字段的长度需要花费大量的处理时间。
  2. 确定列是否为null:我只是使用计数器变量硬编码第一列为NULL。(我的所有文件都有ID作为第一列)。我想知道是否有更好的办法。

我可以使用任何数据结构吗?我总是对学习新的方法来提高成绩感兴趣。

EN

回答 1

Code Review用户

发布于 2015-04-27 03:41:41

您的大写惯例让我觉得很奇怪,不只是对于Python,而且对于我曾经使用过的任何语言。它不仅难以阅读,而且大小写的LIKE_THIS通常都是Python中的常量,因此这种用法会使Python程序员感到困惑。关于代码格式的许多其他事情也是令人困惑或非常规的。请查看PEP-8。

您的函数traverse_data相当长。我建议尝试将''' One of these '''下的所有内容分解为自己的函数。还请注意,Python中的注释标记是#。语法''' like this '''用于字符串。Python允许您将其中之一作为函数或类中的第一件事,作为文档字符串。然后,您可以在命令行上交互地阅读文档。如果三重引号字符串不是函数或类中的第一件事,它只是一个字符串文字,它永远不会被分配给任何东西,然后消失。作为我个人的要求,请不要像这样记录你的代码。演出的成功可能微不足道,但这是奇怪和错误的。在函数或类的开头坚持文档字符串,并对不属于代码的部分进行定期注释。

更常见的情况是,如果您想要从命令行运行一个脚本,以便像这样编写它:

代码语言:javascript
复制
if __name__ == "__main__":
    # code currently inside s3_to_redshift
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/83221

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档