首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pycassa,threadpool,“Thread-3线程中的异常(很可能在解释器关闭期间引发):”

Pycassa,threadpool,“Thread-3线程中的异常(很可能在解释器关闭期间引发):”
EN

Stack Overflow用户
提问于 2012-12-12 03:09:34
回答 1查看 2K关注 0票数 1

我在试着用pycassa加速插入Cassandra。我听说使用多线程和打开多个连接可以大大提高速度。我插入了大量json格式的tweet。我的代码在这里工作了一段时间,然后线程开始抛出异常并停止,看起来线程越多,它停止工作的速度就越快……我猜问题出在到cassandra的连接上,与连接池有关。有什么想法吗?

编辑:所有线程抛出“线程中的异常-3(很可能在解释器关闭时引发):”

代码语言:javascript
复制
import time
import pycassa
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily
from datetime import datetime 
import json
import threadpool
pool = threadpool.ThreadPool(4)
kspool = ConnectionPool('TweetsKS',use_threadlocal = True)

def process_tasks(lines):

    #let threadpool format your requests into a list
    requests = threadpool.makeRequests(insert_into_cfs, lines)

    #insert the requests into the threadpool
    for req in requests:
        pool.putRequest(req) 


def read(file):
    bench = open("bench.txt", "w")
    bench.write(str(datetime.now())+"\n")
    """read data from json and insert into keyspace"""
    json_data=open(file)
    lines = []
    for line in json_data:
        lines.append(line)
    process_tasks(lines)


def insert_into_cfs(line):

    user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet')
    user_name_cf = pycassa.ColumnFamily(kspool, 'UserName')
    tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet')
    user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower')

    tweet_data = json.loads(line)
    """Format the tweet time as an epoch seconds int value"""
    tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000")
    tweet_time  = int(time.mktime(tweet_time))

    new_user_tweet(user_tweet_cf,tweet_data['from_user_id'],tweet_time,tweet_data['id'])
    new_user_name(user_name_cf,tweet_data['from_user_id'],tweet_data['from_user_name'])
    new_tweet(tweet_cf,tweet_data['id'],tweet_data['text'],tweet_data['to_user_id'])

    if tweet_data['to_user_id'] != 0:
        new_user_follower(user_follower_cf,tweet_data['from_user_id'],tweet_data['to_user_id'])


"""4 functions below carry out the inserts into specific column families"""     
def new_user_tweet(user_tweet_cf,from_user_id,tweet_time,id):
    user_tweet_cf.insert(from_user_id,{(tweet_time): id})

def new_user_name(user_name_cf,from_user_id,user_name):
    user_name_cf.insert(from_user_id,{'username': user_name})

def new_tweet(tweet_cf,id,text,to_user_id):
    tweet_cf.insert(id,{
    'text': text
    ,'to_user_id': to_user_id
    })  

def new_user_follower(user_follower_cf,from_user_id,to_user_id):
    user_follower_cf.insert(from_user_id,{to_user_id: 0})   

if __name__ == '__main__':
    read('tweets.json')
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2012-12-12 18:04:40

好吧,这里的问题是我对线程池的使用。我在pool.putRequest(req)之后需要pool.wait (在循环之外),我的主线程在其他线程之前完成,它们不是守护进程。

有了两个线程,我的Cassandra插件大约快了一倍…但是你猜怎么了!?它仍然比MySQL慢!!在6个线程中,它几乎是相同的…我想还需要更多的修修补补!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13826843

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档