使用fast\u ExecuteMy插入pandas数据帧时发生utf\u 16\u le\u解码系统错误

  6 年前


    def insertDataFrameInDB(cursor, dataFrame, toTable, fieldNames = None):
        if fieldNames:
            dataFrame = dataFrame[fieldNames]
            fieldNames = dataFrame.columns
        for r in dataFrame.columns.values:
            dataFrame[r] = dataFrame[r].map(str)
            dataFrame[r] = dataFrame[r].map(str.strip)   
        params = [tuple(x) for x in dataFrame.values]
        fieldNameStr = ",".join(fieldNames)
        valueStr = ",".join(["?"] * len(fieldNames))
        sql = "INSERT INTO {} ({}) VALUES({})".format(toTable, fieldNameStr, valueStr)
        cursor.fast_executemany = True
        cursor.executemany(sql, params)
    insertDataFrameInDB(cursor, df, "table")


    DataError                                 Traceback (most recent call last)
    DataError: ('String data, right truncation: length 24 buffer 20', '22001')
    The above exception was the direct cause of the following exception:
    SystemError                               Traceback (most recent call last)
    ~\AppData\Local\Continuum\anaconda3\lib\encodings\utf_16_le.py in decode(input, errors)
         15 def decode(input, errors='strict'):
    ---> 16     return codecs.utf_16_le_decode(input, errors, True)
    SystemError: <built-in function utf_16_le_decode> returned a result with an error set
    The above exception was the direct cause of the following exception:
    SystemError                               Traceback (most recent call last)
    SystemError: decoding with 'utf-16le' codec failed (SystemError: <built-in function utf_16_le_decode> returned a result with an error set)
    The above exception was the direct cause of the following exception:
    SystemError                               Traceback (most recent call last)
    ~\AppData\Local\Continuum\anaconda3\lib\encodings\utf_16_le.py in decode(input, errors)
         15 def decode(input, errors='strict'):
    ---> 16     return codecs.utf_16_le_decode(input, errors, True)
    SystemError: <built-in function utf_16_le_decode> returned a result with an error set
    The above exception was the direct cause of the following exception:
    SystemError                               Traceback (most recent call last)
    SystemError: decoding with 'utf-16le' codec failed (SystemError: <built-in function utf_16_le_decode> returned a result with an error set)
    The above exception was the direct cause of the following exception:
    SystemError                               Traceback (most recent call last)
    <ipython-input-6-f73d9346f943> in <module>()
         13 cursor = getCursor(conData)
    ---> 14 insertDataFrameInDB(cursor, df, "snowplow.sankey")
    <ipython-input-1-69ecbca20fc8> in insertDataFrameInDB(cursor, dataFrame, toTable, fieldNames)
         29     sql = "INSERT INTO {} ({}) VALUES({})".format(toTable, fieldNameStr, valueStr)
         30     cursor.fast_executemany = True
    ---> 31     cursor.executemany(sql, params)
         32     cursor.commit()
    SystemError: <class 'pyodbc.Error'> returned a result with an error set


  6 年前

    您正在使用Microsoft的“ODBC驱动程序…用于SQL Server”,因此 fast_executemany 应使用pyodbc 4.0.21。但是,您可以在仍使用 DataFrame#to_sql 通过使用SQLAlchemy执行事件,如所示 this question .

    示例:以下代码没有利用 fast\u ExecuteMy

    import pandas as pd
    from sqlalchemy import create_engine
    import time
    engine = create_engine('mssql+pyodbc://@SQL_panorama')
    # test environment
    num_rows = 1000
    df = pd.DataFrame(
        [[x, f'row{x:03}'] for x in range(num_rows)],
        columns=['id', 'txt']
    cnxn = engine.connect()
        cnxn.execute("DROP TABLE df_to_sql_test")
    cnxn.execute("CREATE TABLE df_to_sql_test (id INT PRIMARY KEY, txt NVARCHAR(50))")
    # timing test
    t0 = time.time()
    df.to_sql("df_to_sql_test", engine, if_exists='append', index=False)
    print(f"{num_rows} rows written in {(time.time() - t0):.1f} seconds")


    1000 rows written in 25.2 seconds


    import pandas as pd
    from sqlalchemy import create_engine, event
    import time
    engine = create_engine('mssql+pyodbc://@SQL_panorama')
    @event.listens_for(engine, 'before_cursor_execute')
    def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
        if executemany:
            cursor.fast_executemany = True
    # test environment
    num_rows = 1000
    df = pd.DataFrame(
        [[x, f'row{x:03}'] for x in range(num_rows)],
        columns=['id', 'txt']
    cnxn = engine.connect()
        cnxn.execute("DROP TABLE df_to_sql_test")
    cnxn.execute("CREATE TABLE df_to_sql_test (id INT PRIMARY KEY, txt NVARCHAR(50))")
    # timing test
    t0 = time.time()
    df.to_sql("df_to_sql_test", engine, if_exists='append', index=False)
    print(f"{num_rows} rows written in {(time.time() - t0):.1f} seconds")


    1000 rows written in 1.6 seconds

    有关此方法的更完整讨论,请参阅 this answer .