首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >石墨/碳/ Ceres节点重叠

石墨/碳/ Ceres节点重叠
EN

Stack Overflow用户
提问于 2012-11-13 02:53:59
回答 1查看 369关注 0票数 1

我正在使用石墨监测,使用碳和谷氨酸作为存储方法。我在纠正错误数据时遇到了一些问题。似乎(由于各种问题)我最终得到了重叠的文件。也就是说,由于Carbon / Ceres将数据存储为timestamp@interval.slice,因此我可以有两个或更多时间范围重叠的文件。

有两种重叠:

代码语言:javascript
复制
File A:  +------------+        orig file
File B:      +-----+           subset
File C:          +---------+   overlap

这造成了问题,因为现有的工具(ceres-maintenance、defrag和rollup)不能处理这些重叠。取而代之的是,他们跳过目录,继续前进。显然,这是一个问题。

EN

回答 1

Stack Overflow用户

发布于 2012-11-13 02:53:59

我已经创建了一个脚本来解决这个问题,如下所示:

对于子集,只需删除子集文件。对于重叠,请在下一个文件开始处对原始文件使用文件系统‘

  • 。虽然有可能切断重叠文件的开头并重命名它,但我建议这是充满危险的。

我发现有两种方法可以做到这一点:

  1. 遍历目录并遍历文件,边走边修复,查找文件子集,删除它们;
  2. 遍历目录并修复目录中的所有问题,然后再继续。到目前为止,这是更快的方法,因为目录遍历非常耗时。

代码:

代码语言:javascript
复制
#!/usr/bin/env python2.6
################################################################################

import io
import os
import time
import sys
import string
import logging
import unittest
import datetime
import random
import zmq
import json
import socket
import traceback
import signal
import select
import simplejson
import cPickle as pickle
import re
import shutil
import collections
from pymongo import Connection
from optparse import OptionParser
from pprint import pprint, pformat

################################################################################

class SliceFile(object):
    def __init__(self, fname):
        self.name       = fname
        basename        = fname.split('/')[-1]
        fnArray         = basename.split('@')
        self.timeStart  = int(fnArray[0])
        self.freq       = int(fnArray[1].split('.')[0])
        self.size       = None
        self.numPoints  = None
        self.timeEnd    = None
        self.deleted    = False

    def __repr__(self):
        out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
            self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints)
        return out

    def setVars(self):
        self.size       = os.path.getsize(self.name)
        self.numPoints  = int(self.size / 8)
        self.timeEnd    = self.timeStart + (self.numPoints * self.freq)

################################################################################

class CeresOverlapFixup(object):

    def __del__(self):
        import datetime
        self.writeLog("Ending at %s" % (str(datetime.datetime.today())))
        self.LOGFILE.flush()
        self.LOGFILE.close()

    def __init__(self):
        self.verbose            = False
        self.debug              = False
        self.LOGFILE            = open("ceresOverlapFixup.log", "a")
        self.badFilesList       = set()
        self.truncated          = 0
        self.subsets            = 0
        self.dirsExamined       = 0            
        self.lastStatusTime     = 0

    def getOptionParser(self):
        return OptionParser()

    def getOptions(self):
        parser = self.getOptionParser()
        parser.add_option("-d", "--debug",      action="store_true",                 dest="debug",   default=False, help="debug mode for this program, writes debug messages to logfile." )
        parser.add_option("-v", "--verbose",    action="store_true",                 dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout." )
        parser.add_option("-b", "--basedir",    action="store",      type="string",  dest="basedir", default=None,  help="base directory location to start converting." )
        (options, args)     = parser.parse_args()
        self.debug          = options.debug
        self.verbose        = options.verbose
        self.basedir        = options.basedir
        assert self.basedir, "must provide base directory."

    # Examples:
    # ./updateOperations/1346805360@60.slice
    # ./updateOperations/1349556660@60.slice
    # ./updateOperations/1346798040@60.slice

    def getFileData(self, inFilename):
        ret = SliceFile(inFilename)
        ret.setVars()
        return ret

    def removeFile(self, inFilename):
        os.remove(inFilename)
        #self.writeLog("removing file: %s" % (inFilename))
        self.subsets += 1

    def truncateFile(self, fname, newSize):
        if self.verbose:
            self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize)))
        IFD = None
        try:
            IFD = os.open(fname, os.O_RDWR|os.O_CREAT)
            os.ftruncate(IFD, newSize)
            os.close(IFD)
            self.truncated += 1
        except:
            self.writeLog("Exception during truncate: %s" % (traceback.format_exc()))
        try:
            os.close(IFD)
        except:
            pass
        return

    def printStatus(self):
        now = self.getNowTime()
        if ((now - self.lastStatusTime) > 10):
            self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated))
            self.lastStatusTime = now

    def fixupThisDir(self, inPath, inFiles):

        # self.writeLog("Fixing files in dir: %s" % (inPath))
        if not '.ceres-node' in inFiles:
            # self.writeLog("--> Not a slice directory, skipping.")
            return

        self.dirsExamined += 1            

        sortedFiles = sorted(inFiles)
        sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ]
        lastFile    = None
        fileObjList = []
        for thisFile in sortedFiles:
            wholeFilename = os.path.join(inPath, thisFile)
            try:
                curFile = self.getFileData(wholeFilename)
                fileObjList.append(curFile)
            except:
                self.badFilesList.add(wholeFilename)
                self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc()))

        # name is timeStart, really.
        fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name)

        while fileObjList:

            self.printStatus()

            changes = False
            firstFile = fileObjList[0]
            removedFiles = []
            for curFile in fileObjList[1:]:
                if (curFile.timeEnd <= firstFile.timeEnd):
                    # have subset file. elim.
                    self.removeFile(curFile.name)
                    removedFiles.append(curFile.name)
                    self.subsets += 1
                    changes = True
                    if self.verbose:
                        self.writeLog("Subset file situation.  First=%s, overlap=%s" % (firstFile, curFile))
            fileObjList = [x for x in fileObjList if x.name not in removedFiles]
            if (len(fileObjList) < 2):
                break
            secondFile = fileObjList[1]

            # LT is right.  FirstFile's timeEnd is always the first open time after first is done.
            # so, first starts@100, len=2, end=102, positions used=100,101. second start@102 == OK.
            if (secondFile.timeStart < firstFile.timeEnd):
                # truncate first file.
                # file_A (last):    +---------+
                # file_B (curr):         +----------+ 
                # solve by truncating previous file at startpoint of current file.
                newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart)
                newFile_A_datapoints = int(newLenFile_A_seconds / firstFile.freq)
                newFile_A_bytes      = int(newFile_A_datapoints) * 8
                if (not newFile_A_bytes):
                    fileObjList = fileObjList[1:]
                    continue
                assert newFile_A_bytes, "Must have size.  newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes)
                self.truncateFile(firstFile.name, newFile_A_bytes)
                if self.verbose:
                    self.writeLog("Truncate situation.  First=%s, overlap=%s" % (firstFile, secondFile))
                self.truncated += 1
                fileObjList = fileObjList[1:]
                changes = True

            if not changes:
                fileObjList = fileObjList[1:]


    def getNowTime(self):
        return time.time()


    def walkDirStructure(self):

        startTime           = self.getNowTime()
        self.lastStatusTime = startTime
        updateStatsDict     = {}
        self.okayFiles      = 0
        emptyFiles          = 0 

        for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir):
            self.printStatus()
            self.fixupThisDir(thisPath, theseFiles)
            self.dirsExamined += 1

        endTime = time.time()
        # time.sleep(11)
        self.printStatus()
        self.writeLog( "now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime))
        self.writeLog( "Done.")


    def writeLog(self, instring):
        print instring
        print >> self.LOGFILE, instring
        self.LOGFILE.flush()

    def main(self):
        self.getOptions()
        self.walkDirStructure()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13349708

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档