我想使用日期作为标识,将两个不同的表链接在一起。我搜索了很多次,找到了一些替代解决方案,但我得到的错误消息如下:
pyodbc.Error: ('21S01', '[21S01] [Microsoft][ODBC SQL Server Driver][SQL Server]There are more columns in the INSERT statement than values specified in the VALUES clause. The number of values in the VALUES clause must match the number of columns specified in the INSERT statement. (109) (SQLExecDirectW)')这是我使用的代码:
from src.server.connectToDB import get_sql_conn
import pandas as pd
from datetime import datetime
if __name__ == '__main__':
cursor = get_sql_conn().cursor()
localFile = 'C:\\Users\\dersimw\\Source\Repos\\nordpoolAnalyse\\data\\2011-3.xlsx'
excelFile = pd.ExcelFile(localFile)
rowsID = []
a = ["01"]
for sheets in a:
df = excelFile.parse(sheets).head(5)
df.dropna(axis=1, how='all', inplace=True)
df.fillna(0, inplace=True)
print(df)
now = datetime.now()
DateDateTime = now.strftime('?Y-?m-?d ?H:?M:?S')
for key, rows in df.items():
print("## Column: ", key, "\n")
columnInsertSql = "INSERT INTO Table11 (DateDateTime, AcceptedBlockBuy, AcceptedBlockSell, RejectedBlockBuy, RejectedBlockSell, NetImports) VALUES("
columnCounter = 1
columnHasData = False
for key, column in rows.items():
if isinstance(column, int) or isinstance(column, float):
columnHasData = True
columnInsertSql += str(column)
if columnCounter != len(list(rows.items())):
columnInsertSql += ", "
columnCounter += 1
columnInsertSql += ")"
if columnHasData == True:
cursor.execute(columnInsertSql)
cursor.commit()这就是我所拥有的:
Id A.BlockBuy A.BlockSell R.BlockBuy R.BlockSell NetImports
1 112 1 14 655 65
2 123 1 54 3 654
3 122 1 65 43 43
.
.
122 21 12 54 54 54这就是我想要的:
Id DateDate A.BlockBuy A.BlockSell R.BlockBuy R.BlockSell NetImports
1 2018-08-1 112 1 14 655 65
2 2018-08-1 123 1 54 3 654
3 2018-08-1 122 1 65 43 43
.
.
122 2018-08-01 21 12 54 54 54发布于 2018-08-01 22:12:40
你尝试的方式不是做etl的好方法。我已经使用postgres和python为我的一个项目构建了自己的包。对于SQL Server,该过程应完全相同。您应该将datetime列添加到数据中(etl_run_time )。在上传到数据库之前,我总是将其添加到dataframe/data中。然后我就可以对数据库进行批量插入了。
主要的事情是,数据加载到python和插入到数据库应该是分开的任务。然后,如果需要的话,应该有一些更新任务。我无法抽出时间来准确地复制您的任务。但你可以在博客中详细阅读:https://datapsycho.github.io/PsychoBlog/dataparrot-18-01
# import datetime
import time
# from dateutil.relativedelta import relativedelta
import json
# that function has username and password for db connection
# you can create your own which will be used as cursor
from auths.auth import db_connection
import os
import pandas as pd
class DataLoader():
# function to process survey sent data
@classmethod
def process_survey_sent_data(cls):
# my file path is very big so I divide it to 3 different part
input_path_1 = r'path\to\your\file'
input_path_2 = r'\path\to\your\file'
input_path_3 = r'\path\to\your\file'
file_path = input_path_1 + input_path_2 + input_path_3
file_list = os.listdir(os.path.join(file_path))
file_list = [file_name for file_name in file_list if '.txt' in file_name]
field_names = ['country', 'ticket_id']
pd_list = []
for file_name in file_list:
# collecting file name to put them as column
date_ = file_name.replace(" ", "-")[:-4]
file_path_ = file_path + '\\' + file_name
df_ = pd.read_csv(os.path.join(file_path_), sep='\t', usecols=field_names).assign(sent_date=date_)
df_['sent_date'] = pd.to_datetime(df_['sent_date'])
df_['sent_date'] = df_['sent_date'].values.astype('datetime64[M]')
df_['sent_date'] = df_['sent_date'].astype(str)
pd_list.append(df_)
df_ = pd.concat(pd_list)
# doing few more cleaning
# creating a unique ID
df_ = df_[['country','sent_date', 'ticket_id']].groupby(['country','sent_date']).agg('count').reset_index()
df_['sent_id'] = df_['country'] + '_' + df_['sent_date']
df_.drop_duplicates(keep='first', subset='sent_id')
print(df_.head())
output_path_1 = r'\\long\output\path1'
output_path_2 = r'\lont\output\path2'
output_path = output_path_1 + output_path_2
# put the file name
survey_sent_file = 'survey_sent.json'
# add etl run time
df_['etl_run_time'] = pd.to_datetime('today').strftime('%Y-%m-%d')
# write file to json
df_.to_json(os.path.join(output_path, survey_sent_file), orient='records')
return print('Survey Sent data stored as json dump')
# function to crate a database insert query
@classmethod
def insert_string(cls, column_list,table_name):
# Uncomment the first part in the console
first_part = 'INSERT INTO {} VALUES ('.format(table_name)
second_part = ', '.join(['%({})s'.format(col) for col in column_list])
return first_part + second_part + ') ;'
# function to execute database query
@classmethod
def empty_table(cls, table_name):
conn = db_connection()
cursor = conn.cursor()
cursor.execute("delete from {} ;".format(table_name))
conn.commit()
conn.close()
# #function to run post post_sql code after the data load
# @classmethod
# def run_post_sql(cls):
# # create a database query which can run after the insertation of data
# post_sql = """
# INSERT INTO schema.target_table -- target
# select * from schema.extract_table -- extract
# WHERE
# schema.extract_table.sent_id -- primary key of extract
# NOT IN (SELECT DISTINCT sent_id FROM schema.target_table) -- not in target
# """
# conn = db_connection()
# cursor = conn.cursor()
# cursor.execute(post_sql)
# conn.commit()
# conn.close()
# return print("Post SQL for servey sent has run for Survey Sent.")
# function to insert data to server
@classmethod
def insert_survey_sent_data(cls):
output_path_1 = r'new\json\file\path1'
output_path_2 = r'\new\json\file\path2'
output_path = output_path_1 + output_path_2
## create file
output_survey_file = 'survey_sent.json'
full_path = os.path.join(output_path, output_survey_file)
# column name from the json file
table_def = ['sent_id','country', 'ticket_id', 'sent_date', 'etl_run_time']
# load the data as json and partitioning
with open(full_path, 'r') as file:
chunk = 60
json_data = json.loads(file.read())
json_data = [json_data[i * chunk:(i + 1) * chunk] for i in range((len(json_data) + chunk - 1) // chunk )]
# create connection delete existing data and insert data
table_name = 'schema.extract_table'
cls.empty_table(table_name)
print('{} total chunk will be inserted, each chunk have {} rows.'.format(len(json_data), chunk))
for iteration, chunk_ in enumerate(json_data, 1):
conn = db_connection()
cursor = conn.cursor()
insert_statement = cls.insert_string(table_def, table_name)
start_time = time.time()
cursor.executemany(insert_statement, chunk_)
conn.commit()
conn.close()
print(iteration, " %s seconds" % round((time.time() - start_time), 2))
return print('Insert happened for survey sent.')
if __name__ == "__main__":
DataLoader.process_survey_sent_data()
DataLoader.insert_survey_sent_data()
# DataLoader.run_post_sql()https://stackoverflow.com/questions/51633117
复制相似问题