由于moto不支持AWSwrangler,所以我被困在这里,不知道如何模仿。
我正在尝试统一我的lambda代码,它使用AWSwrangler运行雅典娜查询。
import awswrangler as wr
import boto3
def athena_query(dbtable, contact_id, athena_output, session):
query = """
SELECT
*
FROM
:dbtable;
WHERE
contactid=:contactid;
"""
output = wr.athena.read_sql_query(
query,
params = {
"contactid": f"'{contact_id}'",
"dbtable": f"{dbtable}"
},
s3_output = athena_output,
boto3_session = session
)
results = output.head().loc[0]
return results
response = athena_query("table_name", "123", "s3://bucket", boto3.session.Session())我引用了AWSwrangler github问题,在尝试链接中提供的一些测试时,它使用的是AWS服务,而不是本地运行的。
发布于 2022-08-02 13:21:17
下面是使用moto和pytest实现此函数的示例。
首先,我将根据当前版本(2.16.1)中的awswrangler必需参数来更正您的函数。
import awswrangler as wr
import boto3
def athena_query(database, dbtable, contact_id, athena_output, session):
query = """
SELECT
*
FROM
:dbtable;
WHERE
contactid=:contactid;
"""
output = wr.athena.read_sql_query(
query,
database,
params = {
"contactid": f"'{contact_id}'",
"dbtable": f"{dbtable}"
},
s3_output = athena_output,
boto3_session = session
)
results = output.head().loc[0]
return results在test/conftest.py fil中,我将声明必要的模拟对象:
import pytest
import moto
TEST_BUCKET_NAME = "my_bucket"
REGION = "us-east-1"
DATABASE_NAME = "test_db"
TABLE_NAME = "test_table"
TABLE_DDL = f"""CREATE EXTERNAL TABLE IF NOT EXISTS
{DATABASE_NAME}.{TABLE_NAME} (
a string,
b string,
contactid string
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '\"',
'escapeChar' = '\\'
)
STORED AS TEXTFILE
LOCATION 's3://{TEST_BUCKET_NAME}/input/';"""
@pytest.fixture
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture
def s3_client(aws_credentials):
with moto.mock_s3():
conn = boto3.client("s3", region_name=REGION)
yield conn
@pytest.fixture
def s3_client(aws_credentials):
with moto.mock_s3():
conn = boto3.client("s3", region_name=REGION)
yield conn
@pytest.fixture
def athena_client(aws_credentials):
with moto.athena.mock_athena():
conn = boto3.client("athena", region_name=REGION)
yield conn
@pytest.fixture
def s3_bucket(s3_client):
s3_client.create_bucket(
Bucket=TEST_BUCKET_NAME,
CreateBucketConfiguration={
'LocationConstraint': 'eu-west-1'
}
)
yield boto3.resource('s3').Bucket(TEST_BUCKET_NAME)
@pytest.fixture
def athena_table(athena_client, s3_bucket):
# create database
_ = athena_client.start_query_execution(
QueryString=f"create database {DATABASE_NAME}",
ResultConfiguration={"OutputLocation": "s3://{TEST_BUCKET_NAME}/queries/"}
)
# create table
_ = athena_client.start_query_execution(
QueryString=TABLE_DDL,
ResultConfiguration={"OutputLocation": "s3://{TEST_BUCKET_NAME}/queries/"}
)然后,我将在一个单独的test/athena_test.py文件中定义一个函数的测试。这是使用created来模拟对查询的awswrangler响应,但是您可以使用在conftest.py文件中创建的模拟对象进行高级测试:
from conftest import TEST_BUCKET_NAME, DATABASE_NAME, TABLE_NAME
# import your function to test here
def test_athena_query(s3_bucket, athena_table, mocker):
def mock_response(*args, **kwargs):
return pd.DataFrame.from_dict({"a": [1, 2], "b": [3, 4], "contactid": [123, 123]})
# mocking
mock_wr_call = mocker.patch('wr.athena.read_sql_query')
mock_wr_call.side_effect = mock_response
response = athena_query(DATABASE_NAME, TABLE_NAME, "123", f"s3://{TEST_BUCKET_NAME}/queries/", boto3.session.Session())
assert response.shape[0] == 2资源:
https://stackoverflow.com/questions/70166751
复制相似问题