首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >google数据流作业成本优化

google数据流作业成本优化
EN

Stack Overflow用户
提问于 2021-01-09 12:39:05
回答 2查看 1.1K关注 0票数 25

我已经为大小为100 GB的522个gzip文件运行了下面的代码,在解压缩后,它将是320 GB左右的数据和数据的protobuf格式,并将输出到GCS。我使用了n1标准机器和区域作为输入,输出都很小心,工作花费了我大约17美元,这是半小时的数据,所以我真的非常需要在这里做一些成本优化。

从以下查询中获得的成本

代码语言:javascript
复制
SELECT l.value AS JobID,  ROUND(SUM(cost),3) AS JobCost 
FROM `PROJECT.gcp_billing_data.gcp_billing_export_v1_{}` bill, 
UNNEST(bill.labels) l
WHERE service.description = 'Cloud Dataflow' and l.key = 'goog-dataflow-job-id' and 
extract(date from _PARTITIONTIME) > "2020-12-31"
GROUP BY 1

完整代码

代码语言:javascript
复制
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import csv
import base64
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
import logging
from io import StringIO
from google.cloud import storage
import json
###PROTOBUF CLASS
from otherfiles import processor_pb2

class ConvertToJson(beam.DoFn):
    def process(self, message, *args, **kwargs):
        import base64
        from otherfiles import processor_pb2
        from google.protobuf.json_format import MessageToDict
        from google.protobuf.json_format import MessageToJson
        import json
        if (len(message) >= 4):
            b64ProtoData = message[2]
            totalProcessorBids = int(message[3] if message[3] and message[3] is not None else 0);
            b64ProtoData = b64ProtoData.replace('_', '/')
            b64ProtoData = b64ProtoData.replace('*', '=')
            b64ProtoData = b64ProtoData.replace('-', '+')
            finalbunary = base64.b64decode(b64ProtoData)
            log = processor_pb2.ProcessorLogProto()
            log.ParseFromString(finalbunary)
            #print(log)
            jsonObj = MessageToDict(log,preserving_proto_field_name=True)
            jsonObj["totalProcessorBids"] = totalProcessorBids
            #wjdata = json.dumps(jsonObj)
            print(jsonObj)
            return [jsonObj]
        else:
            pass


class ParseFile(beam.DoFn):
    def process(self, element, *args, **kwargs):
        import csv
        for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
            #print (line)
            return [line]

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=False)
    parser.add_argument("--output", dest="output", required=False)
    parser.add_argument("--bucket", dest="bucket", required=True)
    parser.add_argument("--bfilename", dest="bfilename", required=True)
    app_args, pipeline_args = parser.parse_known_args()
    #pipeline_args.extend(['--runner=DirectRunner'])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    bucket_input=app_args.bucket
    bfilename=app_args.bfilename

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_input)
    blob = bucket.blob(bfilename)
    blob = blob.download_as_string()
    blob = blob.decode('utf-8')
    blob = StringIO(blob)
    pqueue = []
    names = csv.reader(blob)
    for i,filename in enumerate(names):
        if filename and filename[0]:
            pqueue.append(filename[0])

    with beam.Pipeline(options=pipeline_options) as p:
        if(len(pqueue)>0):        
            input_list=app_args.input
            output_list=app_args.output
            events = ( p | "create PCol from list" >> beam.Create(pqueue)
                     | "read files" >> beam.io.textio.ReadAllFromText()
                     | "Transform" >> beam.ParDo(ParseFile())
                     | "Convert To JSON" >> beam.ParDo(ConvertToJson())
                     | "Write to BQ" >> beam.io.WriteToBigQuery(
        table='TABLE',
        dataset='DATASET',
        project='PROJECT',
        schema="dataevent:STRING",
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
        custom_gcs_temp_location='gs://BUCKET/gcs-temp-to-bq/',
        method='FILE_LOADS'))

        ##bigquery failed rows  NOT WORKING so commented
        #(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS] | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
        ##WRITING TO GCS            
        #printFileConetent | "Write TExt" >> beam.io.WriteToText(output_list+"file_",file_name_suffix=".json",num_shards=1, append_trailing_newlines = True)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

这项工作花了大约49分钟。

我尝试过:( 1)对于avro,生成的模式需要在JSON中作为proto文件,并在下面的代码中尝试将字典转换为avro msg,但由于字典的大小更大,这需要时间。schema_separated=是一个avro模式,运行良好

代码语言:javascript
复制
      with beam.Pipeline(options=pipeline_options) as p:
          if(len(pqueue)>0):        
        input_list=app_args.input
        output_list=app_args.output
        p1 = p | "create PCol from list" >> beam.Create(pqueue)
        readListofFiles=p1 | "read files" >> beam.io.textio.ReadAllFromText()
        parsingProtoFile = readListofFiles | "Transform" >> beam.ParDo(ParseFile())
        printFileConetent = parsingProtoFile | "Convert To JSON" >> beam.ParDo(ConvertToJson())
      
        compressIdc=True
        use_fastavro=True 
        printFileConetent | 'write_fastavro' >> WriteToAvro(
        output_list+"file_",
        # '/tmp/dataflow/{}/{}'.format(
        #     'demo', 'output'),
        # parse_schema(json.loads(SCHEMA_STRING)),
        parse_schema(schema_separated),
        use_fastavro=use_fastavro,
        file_name_suffix='.avro',
        codec=('deflate' if compressIdc else 'null'),
    )
  1. 在主要代码中,我尝试将JSON记录作为字符串插入到bigquery表中,这样我就可以在bigquery中使用JSON函数来提取数据,这也不是很好,并且得到了下面的错误。 消息:“读取数据时出错,错误消息: JSON表遇到太多错误,放弃。行: 1;错误: 1。请查看errors[]集合以获得更多详细信息。”原因:“运行时无效”写入BQ/BigQueryBatchFileLoads/WaitForDestinationLoadJobs‘
  2. 尝试将上述JSON字典插入到bigquery,为表提供JSON模式,并且运行良好

现在的挑战是,在将proto反序列化为JSON之后的大小将增加一倍,成本将以数据流的形式通过处理数据的数量来计算。

我正在努力和阅读大量的工作,如果它有效,那么我可以使它稳定的生产。

样本JSON记录。

代码语言:javascript
复制
{'timestamp': '1609286400', 'bidResponseId': '5febc300000115cd054b9fd6840a5af1', 'aggregatorId': '1', 'userId': '7567d74e-2e43-45f4-a42a-8224798bb0dd', 'uniqueResponseId': '', 'adserverId': '1002418', 'dataVersion': '1609285802', 'geoInfo': {'country': '101', 'region': '122', 'city': '11605', 'timezone': '420'}, 'clientInfo': {'os': '4', 'browser': '1', 'remoteIp': '36.70.64.0'}, 'adRequestInfo': {'requestingPage': 'com.opera.mini.native', 'siteId': '557243954', 'foldPosition': '2', 'adSlotId': '1', 'isTest': False, 'opType': 'TYPE_LEARNING', 'mediaType': 'BANNER'}, 'userSegments': [{'id': '2029660', 'weight': -1.0, 'recency': '1052208'}, {'id': '2034588', 'weight': -1.0, 'recency': '-18101'}, {'id': '2029658', 'weight': -1.0, 'recency': '744251'}, {'id': '2031067', 'weight': -1.0, 'recency': '1162398'}, {'id': '2029659', 'weight': -1.0, 'recency': '862833'}, {'id': '2033498', 'weight': -1.0, 'recency': '802749'}, {'id': '2016729', 'weight': -1.0, 'recency': '1620540'}, {'id': '2034584', 'weight': -1.0, 'recency': '111571'}, {'id': '2028182', 'weight': -1.0, 'recency': '744251'}, {'id': '2016726', 'weight': -1.0, 'recency': '1620540'}, {'id': '2028183', 'weight': -1.0, 'recency': '744251'}, {'id': '2028178', 'weight': -1.0, 'recency': '862833'}, {'id': '2016722', 'weight': -1.0, 'recency': '1675814'}, {'id': '2029587', 'weight': -1.0, 'recency': '38160'}, {'id': '2028177', 'weight': -1.0, 'recency': '862833'}, {'id': '2016719', 'weight': -1.0, 'recency': '1675814'}, {'id': '2027404', 'weight': -1.0, 'recency': '139031'}, {'id': '2028172', 'weight': -1.0, 'recency': '1052208'}, {'id': '2028173', 'weight': -1.0, 'recency': '1052208'}, {'id': '2034058', 'weight': -1.0, 'recency': '1191459'}, {'id': '2016712', 'weight': -1.0, 'recency': '1809526'}, {'id': '2030025', 'weight': -1.0, 'recency': '1162401'}, {'id': '2015235', 'weight': -1.0, 'recency': '139031'}, {'id': '2027712', 'weight': -1.0, 'recency': '139031'}, {'id': '2032447', 'weight': -1.0, 'recency': '7313670'}, {'id': '2034815', 'weight': -1.0, 'recency': '586825'}, {'id': '2034811', 'weight': -1.0, 'recency': '659366'}, {'id': '2030004', 'weight': -1.0, 'recency': '139031'}, {'id': '2027316', 'weight': -1.0, 'recency': '1620540'}, {'id': '2033141', 'weight': -1.0, 'recency': '7313670'}, {'id': '2034736', 'weight': -1.0, 'recency': '308252'}, {'id': '2029804', 'weight': -1.0, 'recency': '307938'}, {'id': '2030188', 'weight': -1.0, 'recency': '3591519'}, {'id': '2033449', 'weight': -1.0, 'recency': '1620540'}, {'id': '2029672', 'weight': -1.0, 'recency': '1441083'}, {'id': '2029664', 'weight': -1.0, 'recency': '636630'}], 'perfInfo': {'timeTotal': '2171', 'timeBidInitialize': '0', 'timeProcessDatastore': '0', 'timeGetCandidates': '0', 'timeAdFiltering': '0', 'timeEcpmComputation': '0', 'timeBidComputation': '0', 'timeAdSelection': '0', 'timeBidSubmit': '0', 'timeTFQuery': '0', 'timeVWQuery': '8'}, 'learningPercent': 0.10000000149011612, 'pageLanguageId': '0', 'sspUserId': 'CAESECHFlNeuUm16IYThguoQ8ck_1', 'minEcpm': 0.12999999523162842, 'adSpotId': '1', 'creativeSizes': [{'width': '7', 'height': '7'}], 'pageTypeId': '0', 'numSlots': '0', 'eligibleLIs': [{'type': 'TYPE_OPTIMIZED', 'liIds': [{'id': 44005, 'reason': '12', 'creative_id': 121574, 'bid_amount': 8.403361132251052e-08}, {'id': 46938, 'reason': '12', 'creative_id': 124916, 'bid_amount': 8.403361132251052e-06}, {'id': 54450, 'reason': '12', 'creative_id': 124916, 'bid_amount': 2.0117618771650174e-05}, {'id': 54450, 'reason': '12', 'creative_id': 135726, 'bid_amount': 2.4237295484638312e-05}]}, {'type': 'TYPE_LEARNING'}], 'bidType': 4, 'isSecureRequest': True, 'sourceType': 3, 'deviceBrand': 82, 'deviceModel': 1, 'sellerNetworkId': 12814, 'interstitialRequest': False, 'nativeAdRequest': True, 'native': {'mainImg': [{'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}, {'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}, {'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}, {'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}], 'iconImg': [{'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}, {'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}, {'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}, {'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}], 'logoImg': [{'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}, {'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}, {'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}, {'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}]}, 'throttleWeight': 1, 'isSegmentReceived': False, 'viewability': 46, 'bannerAdRequest': False, 'videoAdRequest': False, 'mraidAdRequest': True, 'jsonModelCallCount': 0, 'totalProcessorBids': 1}

有人能帮我吗?

PFA截图也可供参考

EN

回答 2

Stack Overflow用户

发布于 2021-04-06 21:50:14

这里我的建议是使用Java来执行转换。

在Java中,您可以像这样将Protobuf转换为Avro:用apache beam在地板上编写原型对象

一旦完成了这些操作,就可以使用AvroIO将数据写入文件。

Java比Python具有更高的性能,将为您节省计算资源。由于这项工作非常简单,并且不需要任何特殊的Python库,所以我强烈建议您尝试使用Java。

票数 1
EN

Stack Overflow用户

发布于 2021-11-21 13:31:53

只是想让你注意到"FlexRS“,如果你还没有看过这个。这使用了可抢占的虚拟机(VM)实例,这样就可以降低成本。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65642725

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档