首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Git zip/tar文件实现预提交和签出后挂钩。

Git zip/tar文件实现预提交和签出后挂钩。
EN

Stack Overflow用户
提问于 2020-12-22 04:05:39
回答 1查看 355关注 0票数 4

我经常使用一个工具(Amesim)将其文件打包到一个未压缩的tar文件中。对于版本控制,我通常将文件命名为file1_Rev01.ame,并使用更改进行迭代。当我是唯一的用户时,这是可行的,但最近我更经常地共享文件/模型。尝试共享这些模型是痛苦的,它们通常包括相当大的结果(gbs的数据),并且跟踪不同版本之间的更改,如果很难的话,除非在模型中对每个更改严格添加文本。(Amesim是一个像Simulink这样的工具。)

我一直在阅读git钩子和git过滤器,但我不知道如何更好地管理tarball的版本控制。

假设我有"my_file.tar“文件,它由a.txt、b.model、c.data和d.results组成。

从应用程序方面来说,我将分阶段"my_file.tar“并提交一个提交”模型更新“。在不更改git的情况下,这将跟踪二进制文件的更改。这是不可读的,而且占用很大的空间。如果包括结果,则文件相当大。克隆回购将是一个挑战,如果结果是持续存储。

在我的第一次尝试中,我尝试使用预提交和后签出挂钩。

在提交时,我的预提交钩子将"my_file.tar“解压缩到目录”my_file_tar“中。它移除运行模型时产生的*.results文件。这是不必要的跟踪,并节省了大量的空间(gbs)。

当我拉出模型时,post签出将搜索任何带有_tar和tar的文件夹,并将它们重命名为my_file.tar。

一般情况下,这是可行的。但是,我应该如何处理my_file.tar和未压缩文件夹?如果我在签出后自动删除未压缩的文件夹,git将声明我有重要的更改要跟踪。是否每次都需要将文件夹添加/移除到.gitignore?此外,tar文件将永远不会显示它被跟踪,因为我在预提交代码中删除了它。我能做些什么来清理这个过程?我该如何以不同的方式处理这件事?

参考文献:

对于这段代码,.ame是一个tar文件。

预承诺

代码语言:javascript
复制
#!/usr/bin/env python

import argparse
import os
import tarfile
import zipfile
import subprocess

def parse_args():
    pass

def log_file(log_item):
    cwd = os.getcwd()
    file = open("MyFile.txt", "a") # Open file in append mode
    file.write(log_item + '\n')
    return 1
    
def get_staged_ame_files():
    '''Request a list of staged files from git and return a list of *.ame files

    This function opens a subprocess with git, requests a list of names in the git staged list. It will return a list of strings.
    '''
    out = subprocess.Popen(['git', 'diff', '--staged', '--name-only'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, stderr = out.communicate()
    # Separate output by newlines
    # staged_files = stdout.split(b'\n') # split as bytes
    
    # filter for files with .ame 
    staged_files = stdout.decode('utf-8').split('\n') # split as strings
    # Create list of *just* amesim files
    staged_ame_files = []
    for entry in staged_files:
        if entry.endswith(".ame"):
            staged_ame_files.append(entry)
    
    if not staged_ame_files:
        return None
    else:
        return staged_ame_files

def extract_ame_files(file_list):
    folder_list = []
    for list_item in file_list:
        # If file exists, extract it. Else continue.
        if os.path.isfile(list_item):
            tar = tarfile.open(list_item, "r:")
            folder_name = list_item[0:-4] + "_ame"
            folder_list.append(folder_name)
            tar.extractall(path = folder_name)
            tar.close()
            log_file(folder_name)
        else:
            print("File {} does not exist.".format(list_item))
            
    return folder_list
    

def cleanup_ame_ignored_files(folder_list):
    '''Removes unecessary files from the folder. 
    
    '''
    for folder in folder_list:
        file_list = os.listdir(folder)
        for file in file_list:
            if item.endswith(".results"):
                os.remove(item)
            if item.endswith(".exe"):
                os.remove(item)
    return 1


def git_add_ame_folders(folders):
    # Add *_ame folders to git stage
    for folder in folders:
        out = subprocess.Popen(['git', 'add', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        # The -u will capture removed files?
        out = subprocess.Popen(['git', 'add', '-u', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        
        log_file(stdout.decode('utf-8'))
    return 1
    
def remove_ame_from_staging(file_list):
    # Loop through any staged ame files.
    for file in file_list:
        out = subprocess.Popen(['git', 'rm', '--cached', file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
    return 1

def main(args=None):
    # if file name is *.ame
    # extract *.ame as a tar of the same name into a folder of the same name + _ame
    # delete .results file
    # don't commit .ame file 
    
    # Search for files we want to process in the staged list
    # These will only be *.ame files.
    staged_ame_files = get_staged_ame_files()
    if not staged_ame_files:
        # If its empty, there's nothing to do. End the function.
        return 0
    
    # We're not empty, lets extract each one.
    folder_list = extract_ame_files(staged_ame_files)
    
    # Delete all .results files in each extracted folder  path
    
    # Stage all files in each folder path 
    git_add_ame_folders(folder_list)
    
    # Unstage the .ame file
    remove_ame_from_staging(staged_ame_files)
    return 1

if __name__ == "__main__":
    args = parse_args()
    main(args)

和结帐

代码语言:javascript
复制
#!/usr/bin/env python

import argparse
import os
import tarfile
import zipfile
import subprocess
import shutil
#from shutil import rmtree # Delete directory trees

def parse_args():
    pass

def log_file(log_item):
    cwd = os.getcwd()
    file = open("MyFile2.txt", "a") # Open file in append mode
    file.write(log_item + '\n')
    return 1
    
def compress_ame_files(folder_list):
    for list_item in folder_list:
        log_file("We're on item {}".format(list_item))
        file_name = list_item[0:-4] + ".ame"
        log_file("Tar file name {}".format(file_name))
        # Delete the file if it exists first.
        os.remove(file_name)
        with tarfile.open(file_name, "w:") as tar:
            tar.add(list_item, arcname=os.path.basename('../'))
    return 1
    

def cleanup_ame_ignored_files(folder_list):
    '''Removes unecessary files from the folder. 
    
    '''
    for folder in folder_list:
        file_list = os.listdir(folder)
        for file in file_list:
            if item.endswith(".results"):
                os.remove(item)
            if item.endswith(".exe"):
                os.remove(item)
    return 1


def git_add_ame_folders(folders):
    # Add *_ame folders to git stage
    for folder in folders:
        out = subprocess.Popen(['git', 'add', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        # The -u will capture removed files?
        out = subprocess.Popen(['git', 'add', '-u', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        
        #log_file(stdout.decode('utf-8'))
    return 1
    
def remove_ame_from_staging(file_list):
    # Loop through any staged ame files.
    for file in file_list:
        out = subprocess.Popen(['git', 'rm', '--cached', file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
    return 1

def fast_scandir(dirname):
    # https://stackoverflow.com/questions/973473/getting-a-list-of-all-subdirectories-in-the-current-directory?rq=1
    subfolders= [f.path for f in os.scandir(dirname) if f.is_dir()]
    for dirname in list(subfolders):
        subfolders.extend(fast_scandir(dirname))
    return subfolders

def delete_ame_folders(folders):
    for folder in folders:
        try:
            shutil.rmtree(folder)
        except OSError as e:
            print("Error: %s : %s" % (dir_path, e.strerror))
    return 1
    
#def main(args=None):
def main(lines):
    print("Post checkout running.")
    # find folders with the name _ame
    #log_file("We're running.")
    folder_list = []
    for folder in fast_scandir(os.getcwd()):
        if folder.endswith("_ame"):
            #log_file("Found folder {}.".format(folder))
            folder_list.append(os.path.join(os.getcwd(), folder))
    # tar each folder up and rename with .ame
    compress_ame_files(folder_list)
    
    # Delete the folders
    #delete_ame_folders(folder_list)

    return 1

if __name__ == "__main__":
    args = parse_args()
    main(args)
EN

回答 1

Stack Overflow用户

发布于 2020-12-30 17:46:53

这个答案中的代码实现了git过滤器,而不是问题中的预提交钩子和后置签出钩子。过滤器的优点是它只操作一个文件。不需要单独跟踪和提交/拖动其他文件。相反,与Zippey一样,它创建了一个未压缩的数据流,并在此过程中删除了不必要的文件。

注意:不要使用打印语句,因为它会扰乱git过滤器中的stdout流。,这是一个痛苦的教训。

注: CRLF和LF结局是个问题。当从第一次git提取解码时,我必须清除行尾,因为Sourcetree/Git转换为windows格式.

对解决方案的讨论:

因为我使用的文件是未压缩的tar,Zippey解决方案没有直接应用。Zippey只适用于压缩文件。我用tar文件实现了Zippey的技术。

在提交时,应用一个干净的文件处理程序对tar文件进行“编码”。encode函数接受每个文件并记录数据的长度、数据的原始长度(如果是二进制的话)、存储方式(ascii或二进制)和文件名。

编码脚本以未压缩格式将所有文件流到同名的单个文件中。二进制文件是base64编码成一行,使差异更容易阅读。

在编码期间,避免使用特定扩展名的文件(如结果文件)。

在拉时,一个污迹过滤器通过使用四个元标签来读取信息来解压文件。每个文件都被处理并添加到tar文件对象中,最后写入一个tar文件。

就像Zippey一样,在存储库的一个新的克隆上,一个编码的文件被提取出来,这对我的工具来说是不可读的。所以Clone安装程序会查找被编码的*.ame文件并对它们进行解码,并设置适当的git过滤器。

当我在linux和windows机器上工作时,git倾向于在签出时添加CRLF,脚本确保在编码之前删除CRLF,在解码前从编码文件中删除CRLF。

amefilter.py

代码语言:javascript
复制
import tarfile
import sys
import io
import base64
import string
import tempfile
import os.path

DEBUG_AME_FILTER = False
NAME = 'Amesim_Git'
ENCODING = 'UTF-8'

W_EOL = b'\r\n'
U_EOL = b'\n'

# decompress these defined files
AME_EXTENSIONS = ['.amegp', '.cir', '.sad', '.units', '.views', '.xml']
ASCII_EXTENSIONS = ['.txt', '.py']
# Do not include these files in tracking. 
EXCLUDE = ['.results']

def debug(msg):
    '''Print debug message'''
    if DEBUG_AME_FILTER:
        sys.stderr.write('{0}: debug: {1}\n'.format(NAME, msg))

def error(msg):
    '''Print error message'''
    sys.stderr.write('{0}: error: {1}\n'.format(NAME, msg))

def init():
    '''Initialize writing; set binary mode for windows'''
    debug("Running on {}".format(sys.platform))
    if sys.platform.startswith('win'):
        import msvcrt
        debug("Enable Windows binary workaround")
        msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)

def encode(input, output):
    '''Encode into special VCS friendly format from input to output'''
    debug("ENCODE was called")
    # Create a temporary file based off of the input AME file
    # This lets tarfile access a binary file object
    tfp = tempfile.TemporaryFile(mode='w+b')
    # Write contents into temporary file
    tfp.write(input.read())
    tfp.seek(0)  # Make sure tarfile reads from the start, otherwise object is empty
    tar = tarfile.open(fileobj=tfp, mode='r:')
    # Loop through objects within tar file
    for name in tar.getnames():
        # Get the file name of each object.
        tarinfo = tar.getmember(name)
        if tarinfo.isdir():
            continue # Skip folders, not sure how to handle encode/decode yet.
        data = tar.extractfile(name).read()
        
        # List of ASCII files to decode and version control
        text_extensions = list(set(AME_EXTENSIONS).union(set(ASCII_EXTENSIONS)))
        
        # Isolate extension.
        extension = os.path.splitext(name)[1][1:].strip().lower()
        # Amesim may store batched simulations as *.results.1, *.results.2, remove numeric endings and identify the real ending.
        if extension.isnumeric():
            root_name = os.path.splitext(name)[0][0:]
            real_extension = os.path.splitext(root_name)[1][1:].strip().lower()
            if real_extension in EXCLUDE:
                continue  # Skip excluded extensions
            
        if extension in EXCLUDE:
            continue  # Skip excluded extensions.
            
        # Encode the defined extensions in UTF-8
        try:
            # Check if text data
            data.decode(ENCODING)
            data = data.replace(W_EOL, U_EOL)  # Fix line endings
            try:
                strdata = map(chr, data)
            except TypeError:
                strdata = data
            if extension not in text_extensions and not all(c in string.printable for c in strdata):
                # File is not ascii, append binary file.
                raise UnicodeDecodeError(ENCODING, "".encode(ENCODING), 0, 1, "Artificial exception")

            # Encode
            debug("Appending text file '{}'".format(name))
            mode = 'A'  # ASCII Mode
            output.write("{}|{}|{}|{}\n".format(len(data), len(data), mode, name).encode(ENCODING))
            output.write(data)
            output.write("\n".encode(ENCODING)) # Separation from next meta line
        except UnicodeDecodeError:
            # Binary data
            debug("Appending binary file '{}'".format(name))
            mode = 'B'  # Binary Mode
            raw_len = len(data)
            data = base64.b64encode(data)
            output.write("{}|{}|{}|{}\n".format(len(data), raw_len, mode, name).encode(ENCODING))
            output.write(data)
            output.write("\n".encode(ENCODING))  # Separation from next meta line
    tar.close()

def decode(input, output):
    '''Decode from special VCS friendly format from input to output'''
    debug("DECODE was called")
    tfp = tempfile.TemporaryFile(mode='w+b')
    tar = tarfile.open(fileobj=tfp, mode='w:')
    #input = io.open(input, 'rb')
    while True:
        meta = input.readline().decode(ENCODING)
        if not meta:
            break
        #print(meta)
        (data_len, raw_len, mode, name) = [t(s) for (t, s) in zip((int, int, str, str), meta.split('|'))]
        #print('Data length:{}'.format(data_len))
        #print('Mode: {}'.format(mode))
        #print('Name: {}'.format(name))
        if mode == 'A':
            #print('Appending ascii data')
            debug("Appending text file '{}'".format(name))
            #https://stackoverflow.com/questions/740820/python-write-string-directly-to-tarfile
            info = tarfile.TarInfo(name=name.rstrip())
            info.size = raw_len
            raw_data = input.read(data_len)
            binary_data = io.BytesIO(raw_data)
            # Add each file object to our tarball
            tar.addfile(tarinfo=info, fileobj=binary_data)
            input.read(1) # Skip last '\n'
        elif mode == 'B':
            #print('Appending binary data')
            debug("Appending binary file '{}'".format(name.rstrip()))

            info = tarfile.TarInfo(name=name.rstrip())
            info.size = raw_len
            raw_data = input.read(data_len)
            decoded_data = base64.b64decode(raw_data)
            binary_data = io.BytesIO(decoded_data)
            tar.addfile(tarinfo=info, fileobj=binary_data)
            input.read(1) # Skip last '\n'
        else:
            # Should never reach here
            tar.close()
            tfp.close()
            error('Illegal mode "{}"'.format(mode))
            sys.exit(1)

    # Flush all writes
    tar.close()

    # Write output
    tfp.seek(0) # Go to the start of our temporary file
    output.write(tfp.read())
    tfp.close()

def main():
    '''Main program'''
    #import codecs
    #sys.stdout = codecs.getwriter('utf8')(sys.stdout)
    init()
    input = io.open(sys.stdin.fileno(), 'rb')
    output = io.open(sys.stdout.fileno(), 'wb')
    if len(sys.argv) < 2 or sys.argv[1] == '-' or sys.argv[1] == '--help':
        # This is wrong
        sys.stdout.write("{}\nTo encode: 'python ame_filter.py e'\nTo decode: 'python ame_filter.py d'\nAll files read from stdin and printed to stdout\n".format(NAME))
    elif sys.argv[1] == 'e':
        encode(input, output)
    elif sys.argv[1] == 'd':
        decode(input, output)
    else:
        error("Illegal argument '{}'. Try --help for more information".format(sys.argv[1]))
        sys.exit(1)

        
if __name__ == '__main__':
    main()

Clone_Setup.py

代码语言:javascript
复制
#!/usr/bin/env python
'''
Clone_Setup.py initializes the git environment. 
Each time a new instance of the repository is generated, these commands must 
be run.

'''
import os
import sys
import io
import subprocess
import ame_filter as amef
import tempfile
import shutil

# replacement strings
W_EOL = b'\r\n'
U_EOL = b'\n'

def setup_git():
    os.system("git config filter.ame_filter.smudge \"./ame_filter.py d\"")
    os.system("git config filter.ame_filter.clean \"./ame_filter.py e\"")
    
    '''
        Create .gitattributes programmatically. 
        Add these lines if they do not exist
    '''
    items = ["*.ame filter=ame_filter", "*.ame diff"]
    try:
        with open(".gitattributes", "x") as f:
            for item in items:
                f.write(item + "\n")
    except:
        with open(".gitattributes", "r+") as f:
            for item in items:
                f.seek(0)
                line_found = any(item in line for line in f)
                if not line_found:
                    f.seek(0, os.SEEK_END)
                    f.write("\n" + item)
    
    '''
        Create .gitignore programmatically. 
        Add these lines if they do not exist.
    '''
    items = ["*.gra",
             "*.res",
             "*.req",
             "*.pyc",
             "*.results",
             "*.results.*"
             ]
    
    try:
        with open(".gitignore", "x") as f:
            for item in items:
                f.write(item + "\n")
    except:
        with open(".gitignore", "r+") as f:
            for item in items:
                f.seek(0)
                line_found = any(item in line for line in f)
                if not line_found:
                    f.seek(0, os.SEEK_END)
                    f.write("\n" + item)
        

''' Search for AME files '''
def find_ame_files():
    out = subprocess.Popen(['git', 'ls-files'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, stderr = out.communicate()
    # Separate output by newlines
    # filter for files with .ame 
    git_files = stdout.decode('utf-8').split('\n') # split as strings
    # Create list of *just* amesim files
    ame_files = [entry for entry in git_files if entry.endswith(".ame")]
    ''' #  Equivalent code block
    ame_files = []
    for entry in git_files:
        if entry.endswith(".ame"):
            ame_files.append(entry)
    '''
    return ame_files

def decode_ame_files(ame_files):
    for file in ame_files:
        input = io.open(file, 'rb')
        tfp = tempfile.TemporaryFile(mode='w+b')
        # Write contents into temporary file
        tfp.write(input.read().replace(W_EOL, U_EOL))
        tfp.seek(0)
        input.close()
        output = io.open(file+'~', 'wb')
        try:
            amef.decode(tfp, output)
            output.close()
            shutil.move(file+'~', file)
        except:
            print("File is already decoded. Returning to normal.")
            output.close()
        finally:
            os.remove(file+'~')
            
            

def main():
    '''Main program'''
    print("Setting up git.")
    setup_git()
    print("Finding ame files.")
    ame_files = find_ame_files()
    print(ame_files)
    print("Decoding ame files.")
    decode_ame_files(ame_files)
    
        
if __name__ == '__main__':
    main()
    # Keep console open to view messages on windows machines.
    if os.name == 'nt':
        input("Press enter to exit")
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65403132

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档