首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >整理一系列的尝试..。除了陈述?

整理一系列的尝试..。除了陈述?
EN

Stack Overflow用户
提问于 2018-10-26 09:54:33
回答 3查看 103关注 0票数 2

我已经编写了一个脚本来将数据从Kustomer传输到我们的数据库,虽然它工作得很好,但它有点混乱,我想知道是否有一个更优雅的解决方案。我把结果行定义为字典,然后推入到MySQL,但当这些值中的一些值在JSON中始终不可用时,麻烦的部分就出现了。

这导致了针对每个数据点的try / not语句,这些语句可能丢失也可能没有丢失。

有更好的方法吗?下面的代码。

代码语言:javascript
复制
    try:
        record_data = {
            'id': record['id'],
            'created_at': str(datetime.strptime(record['attributes']['createdAt'], '%Y-%m-%dT%H:%M:%S.%fZ'))[:-7],
            'last_activity_at': str(datetime.strptime(record['attributes']['lastActivityAt'], '%Y-%m-%dT%H:%M:%S.%fZ'))[:-7],
            'first_marked_done': None,
            'last_marked_done': None,
            'assigned_team': record['attributes']['assignedTeams'][0] if record['attributes']['assignedTeams'] != [] else None,
            'conversation_type': None,
            'conversation_category': None,
            'conversation_subcategory': None,
            'message_count': record['attributes']['messageCount'],
            'note_count': record['attributes']['noteCount'],
            'satisfaction': record['attributes']['satisfaction'],
            'status': None,
            'email': 1 if len(list(filter(lambda x: x == 'email', record['attributes']['channels']))) > 0 else 0,
            'chat': 1 if len(list(filter(lambda x: x == 'chat', record['attributes']['channels']))) > 0 else 0,
            'priority': record['attributes']['priority'],
            'direction': 'outbound' if record['attributes']['direction'] == 'out' else 'in',
            'nlp_score': None,
            'nlp_sentiment': None,
            'waiting_for': None,
            'sla_breach': None,
            'sla_status': None,
            'breached_sla': None,
            'breached_at': None
        }
        try:
            record_data['status'] = record['attributes']['status']
        except KeyError:
            pass
        try:
            record_data['conversation_type'] = record['attributes']['custom']['typeStr']
            record_data['conversation_category'] = str(record['attributes']['custom']['categoryTree']).split('.')[0]
            record_data['conversation_subcategory'] = str(record['attributes']['custom']['categoryTree']).split('.')[1] if len(str(record['attributes']['custom']['categoryTree']).split('.')) > 1 else None
        except KeyError:
            pass
        try:
            record_data['waiting_for'] = record['attributes']['custom']['typeStr']
        except KeyError:
            pass
        try:
            record_data['first_marked_done'] = str(datetime.strptime(record['attributes']['firstDone']['createdAt'], '%Y-%m-%dT%H:%M:%S.%fZ'))[:-7]
            record_data['last_marked_done'] = str(datetime.strptime(record['attributes']['lastDone']['createdAt'], '%Y-%m-%dT%H:%M:%S.%fZ'))[:-7]

        except KeyError:
            pass
        try:
            record_data['sla_breach'] = 0 if record['attributes']['sla']['breached'] is False else 1
            record_data['sla_status'] = record['attributes']['sla']['status']
            if record_data['sla_breach'] == 1:
                try:
                    record_data['breached_sla'] = record['attributes']['sla']['breach']['metric']
                    record_data['breached_at'] = record['attributes']['sla']['breach']['at']
                except KeyError:
                    for m in record['attributes']['sla']['metrics']:
                        try:
                            if record['attributes']['sla']['metrics'][m]['breachAt'] == record['attributes']['sla']['summary']['firstBreachAt']:
                                record_data['breached_sla'] = m
                                record_data['breached_at'] = str(datetime.strptime(record['attributes']['sla']['summary']['firstBreachAt'], '%Y-%m-%dT%H:%M:%S.%fZ'))[:-7]
                        except KeyError:
                            pass
        except KeyError:
            record_data['sla_breach'] = 0
        print(record_data)
        self.db.insert_update(KustomerConversations(**record_data))

    except KeyError:
        pass
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-10-26 10:10:19

首先,在可能的情况下,您应该尝试使用具有指定默认值的dict.get。接下来,您可以考虑使用contextmanager使代码更加简洁。考虑到这一点:

代码语言:javascript
复制
try:
    record_data['status'] = record['attributes']['status']
except KeyError:
    pass
try:
    record_data['conversation_type'] = record['attributes']['custom']['typeStr']
except KeyError:
    pass
try:
    record_data['waiting_for'] = record['attributes']['custom']['typeStr']
except KeyError:
    pass
try:
    record_data['first_marked_done'] = record['attributes']['firstDone']['createdAt']
except KeyError:
    pass

现在重写,您可以确保一致的错误处理而不重复逻辑:

代码语言:javascript
复制
from contextlib import contextmanager

@contextmanager
def error_handling():
    try:
        yield
    except KeyError:
        pass

with error_handling():
    record_data['status'] = record['attributes']['status']
with error_handling():
    record_data['conversation_type'] = record['attributes']['custom']['typeStr']
with error_handling():
    record_data['waiting_for'] = record['attributes']['custom']['typeStr']
with error_handling():
    record_data['first_marked_done'] = record['attributes']['firstDone']['createdAt']

您可以为希望应用的各种规则定义任意数量的函数(如error_handling )。

票数 2
EN

Stack Overflow用户

发布于 2018-10-26 10:03:12

您可以使用函数,这会给您提供嵌套dicts中的元素,如果它“不存在”,则不会引发异常。

就像这个速写稿:

代码语言:javascript
复制
def get_nested_dict_value(src_dict, *nested_keys, **kwargs):
    """
    Get value of some nested dict by series of keys with default value.
    Example:
    instead of: 
        x = data['a']['b']['c']['d']
    use
        x = get_nested_dict_value(data, 'a', 'b', 'c', 'd')
    or, if you need some non-None default value, add default=xxx kwarg:
        x = get_nested_dict_value(data, 'a', 'b', 'c', 'd', default=0)
    """
    default = kwargs.get("default", None)
    pointer = src_dict
    i = 0
    for key in nested_keys:
        i += 1
        if key in pointer:
            pointer = pointer[key]
            if i == len(nested_keys):
                return pointer
        else:
            return default

因此,与其:

代码语言:javascript
复制
try:    
    record_data['conversation_type'] = record['attributes']['custom']['typeStr']
except Exception:
    pass

你只需键入:

代码语言:javascript
复制
record_data['conversation_type'] = get_nested_dict_value(record, 'attributes', 'custom', 'typeStr')
票数 0
EN

Stack Overflow用户

发布于 2018-10-27 14:24:18

输入和输出端的不同命名约定使得很难克服显式赋值的清晰性。保留您的版本的精确语义(例如,在没有typeStr的情况下不分配conversation_category,即使typeStr可用)排除了某些选择(比如在每次访问时设置一个try/except循环的数据结构);如果对输入数据进行更多的假设,您可能会做得更好。

尽管如此,除了dict.get already mentioned之外,您还可以使用内置程序(anyordict),并引入一个助手函数和一些临时变量,以使代码更加可读性:

代码语言:javascript
复制
# this gives one digit of the hour for me...?
def ptime(s): return str(datetime.strptime(s,'%Y-%m-%dT%H:%M:%S.%fZ'))[:-7]

try:
    attr=record['attributes']
    cust=attr.get('custom',{})  # defer KeyErrors into the below
    record_data = dict(
        id = record['id'],
        created_at = ptime(attr['createdAt']),
        last_activity_at = ptime(attr['lastActivityAt']),
        first_marked_done = None,
        last_marked_done = None,
        assigned_team = attr['assignedTeams'][0] or None,
        conversation_type = None,
        conversation_category = None,
        conversation_subcategory = None,
        message_count = attr['messageCount'],
        note_count = attr['noteCount'],
        satisfaction = attr['satisfaction'],
        status = attr.get('status'),
        email = int(any(x == 'email' for x in attr['channels'])),
        chat = int(any(x == 'chat' for x in attr['channels'])),
        priority = attr['priority'],
        direction = 'outbound' if attr['direction'] == 'out' else 'in',
        nlp_score = None,
        nlp_sentiment = None,
        waiting_for = cust.get('typeStr'),
        sla_breach = 0,
        sla_status = None,
        breached_sla = None,
        breached_at = None
    )
    try:
        record_data['conversation_type'] = cust['typeStr']
        cat=str(cust['categoryTree']).split('.')
        record_data['conversation_category'] = cat[0]
        record_data['conversation_subcategory'] = cat[1] if len(cat) > 1 else None
    except KeyError: pass
    try:
        record_data['first_marked_done'] = ptime(attr['firstDone']['createdAt'])
        record_data['last_marked_done'] = ptime(attr['lastDone']['createdAt'])
    except KeyError: pass
    try:
        sla=attr['sla']
        record_data['sla_breach'] = 0 if sla['breached'] is False else 1
        record_data['sla_status'] = sla['status']
        if record_data['sla_breach'] == 1:
            try:
                record_data['breached_sla'] = sla['breach']['metric']
                record_data['breached_at'] = sla['breach']['at']
            except KeyError:
                for m,v in sla['metrics'].items():
                    try:
                        v=v['breachAt']
                        if v == sla['summary']['firstBreachAt']:
                            record_data['breached_sla'] = m
                            record_data['breached_at'] = ptime(v)
                    except KeyError: pass
    except KeyError: pass
    print(record_data)
    self.db.insert_update(KustomerConversations(**record_data))

except KeyError: pass

虽然您可能有一个针对它的策略,但在本例中,我建议将其余的except KeyError: pass子句分别写在一行上:它有助于对暂定代码进行可视化划分。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53006095

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档