首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Cassandra Pycassa连接池,如何正确使用?

Cassandra Pycassa连接池,如何正确使用?
EN

Stack Overflow用户
提问于 2012-12-12 18:51:20
回答 1查看 1.4K关注 0票数 1

为了让Cassandra插件运行得更快,我使用了多线程,它工作得很好,但是如果我添加更多的线程,没有任何区别,我认为我没有生成更多的连接,我认为也许我应该使用pool.execute(f,*args,**kwargs),但我不知道如何使用它,文档相当少。到目前为止,这是我的代码。

代码语言:javascript
复制
import connect_to_ks_bp
from connect_to_ks_bp import ks_refs
import time
import pycassa
from datetime import datetime 
import json
import threadpool
pool = threadpool.ThreadPool(20)
count = 1
bench = open("benchCassp20_100000.txt", "w")

def process_tasks(lines):

    #let threadpool format your requests into a list
    requests = threadpool.makeRequests(insert_into_cfs, lines)

    #insert the requests into the threadpool
    for req in requests:
        pool.putRequest(req) 

    pool.wait()

def read(file):
    """read data from json and insert into keyspace"""
    json_data=open(file)
    lines = []
    for line in json_data:
        lines.append(line)
    print len(lines)
    process_tasks(lines)


def insert_into_cfs(line):
    global count
    count +=1
    if count > 5000:
            bench.write(str(datetime.now())+"\n")
            count = 1
    #print count
    #print kspool.checkedout()
    """
    user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet')
    user_name_cf = pycassa.ColumnFamily(kspool, 'UserName')
    tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet')
    user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower')
    """
    tweet_data = json.loads(line)
    """Format the tweet time as an epoch seconds int value"""
    tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000")
    tweet_time  = int(time.mktime(tweet_time))

    new_user_tweet(tweet_data['from_user_id'],tweet_time,tweet_data['id'])
    new_user_name(tweet_data['from_user_id'],tweet_data['from_user_name'])
    new_tweet(tweet_data['id'],tweet_data['text'],tweet_data['to_user_id'])

    if tweet_data['to_user_id'] != 0:
        new_user_follower(tweet_data['from_user_id'],tweet_data['to_user_id'])


""""4 functions below carry out the inserts into specific column families"""        
def new_user_tweet(from_user_id,tweet_time,id):
    ks_refs.user_tweet_cf.insert(from_user_id,{(tweet_time): id})

def new_user_name(from_user_id,user_name):
    ks_refs.user_name_cf.insert(from_user_id,{'username': user_name})

def new_tweet(id,text,to_user_id):
    ks_refs.tweet_cf.insert(id,{
    'text': text
    ,'to_user_id': to_user_id
    })  

def new_user_follower(from_user_id,to_user_id):
    ks_refs.user_follower_cf.insert(from_user_id,{to_user_id: 0})   

    read('tweets.json')
if __name__ == '__main__':

这只是另一个文件..

代码语言:javascript
复制
import pycassa
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily

"""This is a static class I set up to hold the global database connection stuff,
I only want to connect once and then the various insert functions will use these fields a lot"""
class ks_refs():
    pool = ConnectionPool('TweetsKS',use_threadlocal = True,max_overflow = -1)

    @classmethod
    def cf_connect(cls, column_family):
        cf = pycassa.ColumnFamily(cls.pool, column_family)
        return cf

ks_refs.user_name_cfo = ks_refs.cf_connect('UserName')
ks_refs.user_tweet_cfo = ks_refs.cf_connect('UserTweet')
ks_refs.tweet_cfo = ks_refs.cf_connect('Tweet')
ks_refs.user_follower_cfo = ks_refs.cf_connect('UserFollower')

#trying out a batch mutator whihc is supposed to increase performance
ks_refs.user_name_cf = ks_refs.user_name_cfo.batch(queue_size=10000)
ks_refs.user_tweet_cf = ks_refs.user_tweet_cfo.batch(queue_size=10000)
ks_refs.tweet_cf = ks_refs.tweet_cfo.batch(queue_size=10000)
ks_refs.user_follower_cf = ks_refs.user_follower_cfo.batch(queue_size=10000)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-01-01 05:09:52

以下是一些想法:

10,000的

  • 批处理大小太大了。尝试100。
  • 使用pool_size参数使您的ConnectionPool大小至少与线程数相同。默认值为5。仅当活动线程数随时间变化时,才应使用池溢出,而不是在线程数固定时使用。原因是这将导致大量不必要的新连接的打开和关闭,这是一个相当昂贵的过程。

在你解决了这些问题之后,看看这些:

  • 我对你使用的线程池库不太熟悉。确保如果您将插入到Cassandra的代码从图片中去掉,那么当您增加threads
  • Python的数量时,您会看到性能的提高,因为GIL本身限制了多少线程可能是有用的。通常情况下,它不应该达到最大值20,但如果您正在执行CPU密集型任务或需要大量Python解释的任务,则可能会出现这种情况。我在前面的观点中描述的测试也将涵盖这一点。可能会出现这种情况,您应该考虑使用multiprocessing模块,但需要对代码进行一些更改来处理这种情况(即,不在进程之间共享ConnectionPools、CFs或几乎任何其他内容)。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13838115

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档