首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >没有webservice的大查询cron作业

没有webservice的大查询cron作业
EN

Stack Overflow用户
提问于 2017-10-13 22:08:58
回答 2查看 219关注 0票数 2

是否可以运行脚本处理用户数据而不运行google应用程序引擎webservice?

对于较小的脚本,它运行得很好,但是当我的脚本持续大约40分钟时,我得到了错误:DeadlineExceededError

我的临时修正是在windows VM和命令行上使用windows调度程序和python脚本。

编辑:添加的代码

代码语言:javascript
复制
jobs = []
jobs_status = []
jobs_error = []
# The project id whose datasets you'd like to list
PROJECT_NUMBER = 'project'
scope = ('https://www.googleapis.com/auth/bigquery',
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/drive',
         'https://spreadsheets.google.com/feeds')

credentials = ServiceAccountCredentials.from_json_keyfile_name('client_secrets.json', scope)

# Create the bigquery api client
service = googleapiclient.discovery.build('bigquery', 'v2', credentials=credentials)

def load_logs(source):
    body = {"rows": [
        {"json": source}
    ]}

    response = service.tabledata().insertAll(
        projectId=PROJECT_NUMBER,
        datasetId='test',
        tableId='test_log',
        body=body).execute()
    return response

def job_status():
    for job in jobs:
        _jobId = job['jobReference']['jobId']
        status = service.jobs().get(projectId=PROJECT_NUMBER, jobId=_jobId).execute()
        jobs_status.append(status['status']['state'])
        if 'errors' in status['status'].keys():
            query = str(status['configuration']['query']['query'])
            message = str(status['status']['errorResult']['message'])
            jobs_error.append({"query": query, "message": message})
    return jobs_status


def check_statues():
    while True:
        if all('DONE' in job for job in job_status()):
            return


def insert(query, tableid, disposition):
    job_body = {
     "configuration": {
      "query": {
       "query": query,
       "useLegacySql": True,
       "destinationTable": {
        "datasetId": "test",
        "projectId": "project",
        "tableId": tableid
       },
       "writeDisposition": disposition
      }
     }
    }

    r = service.jobs().insert(
        projectId=PROJECT_NUMBER,
        body=job_body).execute()
    jobs.append(r)
    return r



class MainPage(webapp2.RequestHandler):
    def get(self):
        query = "SELECT * FROM [gdocs_users.user_empty]"
        insert(query, 'users_data_p1', "WRITE_TRUNCATE")
        check_statues()
        query = "SELECT * FROM [gdocs_users.user_empty]"
        insert(query, 'users_data_p2', "WRITE_TRUNCATE")
        query = "SELECT * FROM [gdocs_users.user_%s]"
        for i in range(1, 1000):
            if i <= 600:
                insert(query % str(i).zfill(4), 'users_data_p1', "WRITE_APPEND")
            else:
                insert(query % str(i).zfill(4), 'user_data_p2', "WRITE_APPEND")
        for error in jobs_error:
            load_logs(error)


app = webapp2.WSGIApplication([
    ('/', MainPage),
], debug=True)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-10-14 14:58:36

默认情况下,App服务使用自动缩放,它对HTTP请求有60秒的限制,对任务队列请求有10分钟的限制。如果将服务更改为使用基本或手动缩放,则任务队列请求最多可以运行24小时。

听起来,这项工作可能只需要一个实例,所以除了默认服务之外,还可能创建第二个服务。在子文件夹中,创建具有以下bqservice设置的app.yaml文件夹,这些设置使用基本缩放,最大为一个实例:

代码语言:javascript
复制
# bqsservice/app.yaml
# Possibly use a separate service for your BQ code than
# the rest of your app:
service: bqservice
runtime: python27
api_version: 1
# Keep low memory/cost B1 class?
instance_class: B1
# Limit max services to 1 to keep costs down. There is an
# 8 instance hour limit to the free tier. This option still
# scales to 0 when not in use.
basic_scaling:
  max_instances: 1

# Handlers:
handlers:
- url: /.*
  script: main.app

然后在同一个服务中创建一个cron.yaml来安排您的脚本运行。使用上面的示例配置,您可以将BigQuery逻辑放入一个main.py文件中,其中定义了一个WSGI应用程序:

代码语言:javascript
复制
# bqservice/main.py
import webapp2

class CronHandler(webapp2.RequestHandler):

    def post(self):
       # Handle your cron work
       # ....

app = webapp2.WSGIApplication([
    #('/', MainPage),  # If you needed other handlers
    ('/mycron', CronHandler),
], debug=True)

如果您不打算将App应用程序用于其他任何事情,则可以将所有这些工作到默认服务中。如果在默认服务之外执行此操作,则需要首先将某些内容部署到默认服务,即使它只是一个带有静态文件的简单app.yaml

票数 2
EN

Stack Overflow用户

发布于 2017-10-13 22:28:59

大多数BigQuery操作都可以异步运行。你能给我们看看你的密码吗?

例如,从Python文档:

代码语言:javascript
复制
def query(query):
    client = bigquery.Client()
    query_job = client.run_async_query(str(uuid.uuid4()), query)

    query_job.begin()
    query_job.result()  # Wait for job to complete

这是一个异步作业,代码选择等待查询完成。而不是等待,而是在begin()之后获得作业id。您可以为一个任务排队,以便稍后使用任务队列运行,以检查该作业的结果。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46738644

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档