首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么Azure Data似乎坚持将DateTimes作为字符串插入?

为什么Azure Data似乎坚持将DateTimes作为字符串插入?
EN

Stack Overflow用户
提问于 2019-07-09 08:08:11
回答 2查看 1.8K关注 0票数 4

我正在尝试设置一个Azure database,以便将数据从AzureSQL数据库复制和反造假到另一个AzureSQL数据库,以便使用数据流报告/BI,但我在插入日期时遇到了问题。

这是我的数据流的定义。

代码语言:javascript
复制
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable2",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
        }
    }
}

以下是我的资料来源的定义

代码语言:javascript
复制
{
    "name": "AzureSqlTable1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Source_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "BucketId",
                "type": "varchar"
            },
            {
                "name": "StreamId",
                "type": "char"
            },
            {
                "name": "StreamIdOriginal",
                "type": "nvarchar"
            },
            {
                "name": "StreamRevision",
                "type": "int",
                "precision": 10
            },
            {
                "name": "Items",
                "type": "tinyint",
                "precision": 3
            },
            {
                "name": "CommitId",
                "type": "uniqueidentifier"
            },
            {
                "name": "CommitSequence",
                "type": "int",
                "precision": 10
            },
            {
                "name": "CommitStamp",
                "type": "datetime2",
                "scale": 7
            },
            {
                "name": "CheckpointNumber",
                "type": "bigint",
                "precision": 19
            },
            {
                "name": "Dispatched",
                "type": "bit"
            },
            {
                "name": "Headers",
                "type": "varbinary"
            },
            {
                "name": "Payload",
                "type": "varbinary"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[Commits]"
        }
    }
}

和接收器数据集

代码语言:javascript
复制
{
    "name": "AzureSqlTable2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Dest_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [],
        "typeProperties": {
            "tableName": "dbo.Test2"
        }
    }
}

当使用数据流运行管道时,会得到以下错误:

代码语言:javascript
复制
Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:58)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:709)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:124)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我的Azure审计日志显示了以下失败的语句(考虑到它使用VARCHAR(50)作为[CommitStamp]的类型,这并不令人惊讶

代码语言:javascript
复制
INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
  [BucketId] NVARCHAR(MAX), 
  [CommitStamp] VARCHAR(50), 
  [StreamId] NVARCHAR(MAX), 
  [StreamIdOriginal] NVARCHAR(MAX),
  [StreamRevision] INT,
  [Items] INT,
  [CommitId] NVARCHAR(MAX),
  [CommitSequence] INT, 
  [CheckpointNumber] BIGINT, 
  [Dispatched] BIT,
  [Headers] VARBINARY(MAX),
  [Payload] VARBINARY(MAX),
  [r8e440f7252bb401b9ead107597de6293] INT) 
with (ROWS_PER_BATCH = 4096, TABLOCK)

我完全不知道为什么会这样。看起来模式信息是正确的,但是数据工厂/数据流似乎希望将CommitStamp作为字符串类型插入。

根据请求,数据流/代码/计划视图的输出:

代码语言:javascript
复制
source(output(
        BucketId as string,
        StreamId as string,
        StreamIdOriginal as string,
        StreamRevision as integer,
        Items as integer,
        CommitId as string,
        CommitSequence as integer,
        CommitStamp as timestamp,
        CheckpointNumber as long,
        Dispatched as boolean,
        Headers as binary,
        Payload as binary
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    isolationLevel: 'READ_UNCOMMITTED',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[Commits]',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Application',
    user: 'Sign2025Admin',
    password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    mapColumn(
        BucketId,
        CommitStamp
    ),
    schemaName: 'dbo',
    tableName: 'Test2',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Reporting',
    user: 'Sign2025Admin',
    password: '**********') ~> sink1
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-07-09 09:51:39

我创建了一个数据流,将数据从Azure SQL数据库复制到另一个Azure SQL数据库。它成功地将datatime2秘密转化为VARCHAR(50)

这是我的数据流的定义:

代码语言:javascript
复制
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_sto",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_mex",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as timestamp\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(input(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false) ~> sink1"
        }
    }
}

我的资料来源的定义:

代码语言:javascript
复制
{
    "name": "DestinationDataset_sto",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "datetime2",
                "scale": 7
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

我的水槽设置:

代码语言:javascript
复制
{
    "name": "DestinationDataset_mex",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "varchar"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo1]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

以下是我的数据流步骤。

步骤1:源设置:

步骤2:接收器设置:

运行成功:

表演示和demo1除了myTime之外几乎都有相同的模式。

我的源表和它的数据:

我的接收器表和从demo复制的数据

数据流计划:

代码语言:javascript
复制
source(output(
        ID as integer,
        tName as string,
        myTime as timestamp
    ),
    allowSchemaDrift: true,
    validateSchema: true,
    isolationLevel: 'SERIALIZABLE',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[demo]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> source1
source1 sink(input(
        ID as integer,
        tName as string,
        myTime as string
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    schemaName: '[dbo]',
    tableName: '[demo1]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> sink1

Update1:

我手动创建接收器表,发现:

数据流可以将datatime2转换为VARCHAR()(可能是NVARCHAR())、datedatetimeoffset

当我尝试日期类型timedatetimedatetime2smalldatetime时,数据流总是会出现错误:

代码语言:javascript
复制
"message": "DF-EXEC-1 Conversion failed when converting date and/or time from character 

更新2019-7-11:

我向Azure支持部门寻求帮助,他们回答说:这是数据流的一个bug,目前还没有解决方案。

更新2019-7-12:

我用Azure支持进行了测试,他们认为这是一个bug。以下是新的电子邮件:

他们还告诉我,修复已经完成,并将在下一次部署火车上部署。这可能是下周末

希望这能有所帮助。

票数 1
EN

Stack Overflow用户

发布于 2019-07-10 21:40:42

看起来,Sink dataset将myTime定义为字符串:

接收器(输入( ID为整数,tName为字符串,myTime为字符串)

你能把它改为时间戳或日期吗?

或者,您可以通过在Sink上设置“重新创建表”将数据放置在SQL中的临时暂存表中,并允许ADF使用数据流中映射字段的数据类型动态生成新的表定义。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56948054

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档