首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >模拟AWSwrangler进行单元测试

模拟AWSwrangler进行单元测试
EN

Stack Overflow用户
提问于 2021-11-30 09:05:24
回答 1查看 678关注 0票数 3

由于moto不支持AWSwrangler,所以我被困在这里,不知道如何模仿。

我正在尝试统一我的lambda代码,它使用AWSwrangler运行雅典娜查询。

代码语言:javascript
复制
import awswrangler as wr
import boto3

def athena_query(dbtable, contact_id, athena_output, session):
    
    query = """
    SELECT
        *
    FROM
        :dbtable;
    WHERE 
    contactid=:contactid;
    """

    output = wr.athena.read_sql_query(
        query, 
        params = {
            "contactid": f"'{contact_id}'", 
            "dbtable": f"{dbtable}"
        }, 
        s3_output = athena_output,
        boto3_session = session
    )
    results = output.head().loc[0]
    
    return results

response = athena_query("table_name", "123", "s3://bucket", boto3.session.Session())

我引用了AWSwrangler github问题,在尝试链接中提供的一些测试时,它使用的是AWS服务,而不是本地运行的。

EN

回答 1

Stack Overflow用户

发布于 2022-08-02 13:21:17

下面是使用moto和pytest实现此函数的示例。

首先,我将根据当前版本(2.16.1)中的awswrangler必需参数来更正您的函数。

代码语言:javascript
复制
import awswrangler as wr
import boto3

def athena_query(database, dbtable, contact_id, athena_output, session):
    
    query = """
    SELECT
        *
    FROM
        :dbtable;
    WHERE 
    contactid=:contactid;
    """

    output = wr.athena.read_sql_query(
        query, 
        database,
        params = {
            "contactid": f"'{contact_id}'", 
            "dbtable": f"{dbtable}"
        }, 
        s3_output = athena_output,
        boto3_session = session
    )
    results = output.head().loc[0]
    
    return results

test/conftest.py fil中,我将声明必要的模拟对象:

代码语言:javascript
复制
import pytest
import moto


TEST_BUCKET_NAME = "my_bucket"
REGION = "us-east-1"
DATABASE_NAME = "test_db"
TABLE_NAME = "test_table"
TABLE_DDL = f"""CREATE EXTERNAL TABLE IF NOT EXISTS
{DATABASE_NAME}.{TABLE_NAME} (
  a string,
  b string,
  contactid string
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  'separatorChar' = ',',
  'quoteChar' = '\"',
  'escapeChar' = '\\'
)
STORED AS TEXTFILE
LOCATION 's3://{TEST_BUCKET_NAME}/input/';"""


@pytest.fixture
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ["AWS_ACCESS_KEY_ID"] = "testing"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
    os.environ["AWS_SECURITY_TOKEN"] = "testing"
    os.environ["AWS_SESSION_TOKEN"] = "testing"


@pytest.fixture
def s3_client(aws_credentials):
    with moto.mock_s3():
        conn = boto3.client("s3", region_name=REGION)
        yield conn

@pytest.fixture
def s3_client(aws_credentials):
    with moto.mock_s3():
        conn = boto3.client("s3", region_name=REGION)
        yield conn

@pytest.fixture
def athena_client(aws_credentials):
    with moto.athena.mock_athena():
        conn = boto3.client("athena", region_name=REGION)
        yield conn

@pytest.fixture
def s3_bucket(s3_client):
    s3_client.create_bucket(
        Bucket=TEST_BUCKET_NAME,
        CreateBucketConfiguration={
            'LocationConstraint': 'eu-west-1'
        }
    )
    yield boto3.resource('s3').Bucket(TEST_BUCKET_NAME)

@pytest.fixture
def athena_table(athena_client, s3_bucket):
    # create database
    _ = athena_client.start_query_execution(
        QueryString=f"create database {DATABASE_NAME}",
        ResultConfiguration={"OutputLocation": "s3://{TEST_BUCKET_NAME}/queries/"}
    )
    # create table 
    _ = athena_client.start_query_execution(
            QueryString=TABLE_DDL,
            ResultConfiguration={"OutputLocation": "s3://{TEST_BUCKET_NAME}/queries/"}
        )

然后,我将在一个单独的test/athena_test.py文件中定义一个函数的测试。这是使用created来模拟对查询的awswrangler响应,但是您可以使用在conftest.py文件中创建的模拟对象进行高级测试:

代码语言:javascript
复制
from conftest import TEST_BUCKET_NAME, DATABASE_NAME, TABLE_NAME
# import your function to test here

def test_athena_query(s3_bucket, athena_table, mocker):

    def mock_response(*args, **kwargs):
        return pd.DataFrame.from_dict({"a": [1, 2], "b": [3, 4], "contactid": [123, 123]})

    # mocking
    mock_wr_call = mocker.patch('wr.athena.read_sql_query')
    mock_wr_call.side_effect = mock_response

    response = athena_query(DATABASE_NAME, TABLE_NAME, "123", f"s3://{TEST_BUCKET_NAME}/queries/", boto3.session.Session())

    assert response.shape[0] == 2

资源:

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70166751

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档