首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Google App Engine:在数据存储上使用大查询?

Google App Engine:在数据存储上使用大查询?
EN

Stack Overflow用户
提问于 2012-06-10 14:42:19
回答 6查看 7.3K关注 0票数 15

有一个GAE数据存储类型,其中有几个100‘2000个对象。我想做几个相关的查询(涉及计数查询)。大型查询似乎非常适合做这件事。

目前是否有使用大查询查询实时AppEngine数据存储区的简单方法?

EN

回答 6

Stack Overflow用户

回答已采纳

发布于 2012-06-10 23:19:47

您不能直接在DataStore实体上运行BigQuery,但您可以编写一个映射器管道,该管道从DataStore中读取实体,将它们写入到Google Cloud Storage中的CSV,然后将其摄取到BigQuery中-您甚至可以自动执行此过程。下面是一个仅将Mapper API类用于DataStore to CSV步骤的示例:

代码语言:javascript
复制
import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle

from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp

from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template

from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users

from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op

from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials


#Number of shards to use in the Mapper pipeline
SHARDS = 20

# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'

# DataStore Model
class YourEntity(db.Expando):
  field1 = db.StringProperty() # etc, etc

ENTITY_KIND = 'main.YourEntity'


class MapReduceStart(webapp.RequestHandler):
  """Handler that provides link for user to start MapReduce pipeline.
  """
  def get(self):
    pipeline = IteratorPipeline(ENTITY_KIND)
    pipeline.start()
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
    logging.info('Redirecting to: %s' % path)
    self.redirect(path)


class IteratorPipeline(base_handler.PipelineBase):
  """ A pipeline that iterates through datastore
  """
  def run(self, entity_type):
    output = yield mapreduce_pipeline.MapperPipeline(
      "DataStore_to_Google_Storage_Pipeline",
      "main.datastore_map",
      "mapreduce.input_readers.DatastoreInputReader",
      output_writer_spec="mapreduce.output_writers.FileOutputWriter",
      params={
          "input_reader":{
              "entity_kind": entity_type,
              },
          "output_writer":{
              "filesystem": "gs",
              "gs_bucket_name": GS_BUCKET,
              "output_sharding":"none",
              }
          },
          shards=SHARDS)


def datastore_map(entity_type):
  props = GetPropsFor(entity_type)
  data = db.to_dict(entity_type)
  result = ','.join(['"%s"' % str(data.get(k)) for k in props])
  yield('%s\n' % result)


def GetPropsFor(entity_or_kind):
  if (isinstance(entity_or_kind, basestring)):
    kind = entity_or_kind
  else:
    kind = entity_or_kind.kind()
  cls = globals().get(kind)
  return cls.properties()


application = webapp.WSGIApplication(
                                     [('/start', MapReduceStart)],
                                     debug=True)

def main():
  run_wsgi_app(application)

if __name__ == "__main__":
  main()

如果将此代码附加到IteratorPipeline类:yield CloudStorageToBigQuery(output)的末尾,则可以将生成的csv文件句柄通过管道传递到BigQuery摄取管道中……如下所示:

代码语言:javascript
复制
class CloudStorageToBigQuery(base_handler.PipelineBase):
  """A Pipeline that kicks off a BigQuery ingestion job.
  """
  def run(self, output):

# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'

# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)

jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
    '%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
                    body=build_job_data(table_name,files)).execute()
logging.info(result)

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      # put your schema here
                      "fields": fields
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": DATASET_ID,
                      "tableId": table_name,
                      },
                  }
              }
          }
票数 17
EN

Stack Overflow用户

发布于 2013-12-03 18:15:16

使用新的(从2013年9月开始) streaming inserts api,你可以将应用程序中的记录导入到BigQuery中。

数据在BigQuery中立即可用,因此这应该可以满足您的实时需求。

虽然这个问题现在有点陈旧,但对于遇到这个问题的任何人来说,这可能是一个更容易的解决方案。

不过,目前在本地开发服务器上运行这一功能充其量也是不完整的。

票数 7
EN

Stack Overflow用户

发布于 2012-10-20 02:34:36

我们正在做一个可信的测试程序,通过两个简单的操作从数据存储迁移到BigQuery:

  1. 使用数据存储区管理员的backup functionality
  2. Import备份将数据存储区直接备份到BigQuery

它会自动为您处理模式。

更多信息(申请):https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/10966841

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档