代码之家  ›  专栏  ›  技术社区  ›  Chielio

使用fast\u ExecuteMy插入pandas数据帧时发生utf\u 16\u le\u解码系统错误

  •  1
  • Chielio  · 技术社区  · 6 年前

    这是我的代码:

    def insertDataFrameInDB(cursor, dataFrame, toTable, fieldNames = None):
        if fieldNames:
            dataFrame = dataFrame[fieldNames]
        else:
            fieldNames = dataFrame.columns
    
        for r in dataFrame.columns.values:
            dataFrame[r] = dataFrame[r].map(str)
            dataFrame[r] = dataFrame[r].map(str.strip)   
        params = [tuple(x) for x in dataFrame.values]
    
        fieldNameStr = ",".join(fieldNames)
        valueStr = ",".join(["?"] * len(fieldNames))
        sql = "INSERT INTO {} ({}) VALUES({})".format(toTable, fieldNameStr, valueStr)
        cursor.fast_executemany = True
        cursor.executemany(sql, params)
        cursor.commit()
    
    
    insertDataFrameInDB(cursor, df, "table")
    

    它给出了以下我确实无法解决的错误:

    DataError                                 Traceback (most recent call last)
    DataError: ('String data, right truncation: length 24 buffer 20', '22001')
    
    The above exception was the direct cause of the following exception:
    
    SystemError                               Traceback (most recent call last)
    ~\AppData\Local\Continuum\anaconda3\lib\encodings\utf_16_le.py in decode(input, errors)
         15 def decode(input, errors='strict'):
    ---> 16     return codecs.utf_16_le_decode(input, errors, True)
         17 
    
    SystemError: <built-in function utf_16_le_decode> returned a result with an error set
    
    The above exception was the direct cause of the following exception:
    
    SystemError                               Traceback (most recent call last)
    SystemError: decoding with 'utf-16le' codec failed (SystemError: <built-in function utf_16_le_decode> returned a result with an error set)
    
    The above exception was the direct cause of the following exception:
    
    SystemError                               Traceback (most recent call last)
    ~\AppData\Local\Continuum\anaconda3\lib\encodings\utf_16_le.py in decode(input, errors)
         15 def decode(input, errors='strict'):
    ---> 16     return codecs.utf_16_le_decode(input, errors, True)
         17 
    
    SystemError: <built-in function utf_16_le_decode> returned a result with an error set
    
    The above exception was the direct cause of the following exception:
    
    SystemError                               Traceback (most recent call last)
    SystemError: decoding with 'utf-16le' codec failed (SystemError: <built-in function utf_16_le_decode> returned a result with an error set)
    
    The above exception was the direct cause of the following exception:
    
    SystemError                               Traceback (most recent call last)
    <ipython-input-6-f73d9346f943> in <module>()
         12 
         13 cursor = getCursor(conData)
    ---> 14 insertDataFrameInDB(cursor, df, "snowplow.sankey")
    
    <ipython-input-1-69ecbca20fc8> in insertDataFrameInDB(cursor, dataFrame, toTable, fieldNames)
         29     sql = "INSERT INTO {} ({}) VALUES({})".format(toTable, fieldNameStr, valueStr)
         30     cursor.fast_executemany = True
    ---> 31     cursor.executemany(sql, params)
         32     cursor.commit()
    SystemError: <class 'pyodbc.Error'> returned a result with an error set
    

    很多错误搜索让我觉得这与缺少BOM表有关,我尝试解码“params”元组中的字符串,也尝试了str.astype('U')。是否有人知道问题的原因以及可能的解决方法?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Gord Thompson    6 年前

    您正在使用Microsoft的“ODBC驱动程序…用于SQL Server”,因此 fast_executemany 应使用pyodbc 4.0.21。但是,您可以在仍使用 DataFrame#to_sql 通过使用SQLAlchemy执行事件,如所示 this question .

    示例:以下代码没有利用 fast\u ExecuteMy

    import pandas as pd
    from sqlalchemy import create_engine
    import time
    
    engine = create_engine('mssql+pyodbc://@SQL_panorama')
    
    # test environment
    num_rows = 1000
    df = pd.DataFrame(
        [[x, f'row{x:03}'] for x in range(num_rows)],
        columns=['id', 'txt']
    )
    #
    cnxn = engine.connect()
    try:
        cnxn.execute("DROP TABLE df_to_sql_test")
    except:
        pass
    cnxn.execute("CREATE TABLE df_to_sql_test (id INT PRIMARY KEY, txt NVARCHAR(50))")
    
    # timing test
    t0 = time.time()
    df.to_sql("df_to_sql_test", engine, if_exists='append', index=False)
    print(f"{num_rows} rows written in {(time.time() - t0):.1f} seconds")
    

    结果:

    1000 rows written in 25.2 seconds
    

    添加SQLAlchemy执行事件处理程序可以显著减少执行时间

    import pandas as pd
    from sqlalchemy import create_engine, event
    import time
    
    engine = create_engine('mssql+pyodbc://@SQL_panorama')
    
    @event.listens_for(engine, 'before_cursor_execute')
    def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
        if executemany:
            cursor.fast_executemany = True
    
    
    # test environment
    num_rows = 1000
    df = pd.DataFrame(
        [[x, f'row{x:03}'] for x in range(num_rows)],
        columns=['id', 'txt']
    )
    #
    cnxn = engine.connect()
    try:
        cnxn.execute("DROP TABLE df_to_sql_test")
    except:
        pass
    cnxn.execute("CREATE TABLE df_to_sql_test (id INT PRIMARY KEY, txt NVARCHAR(50))")
    
    # timing test
    t0 = time.time()
    df.to_sql("df_to_sql_test", engine, if_exists='append', index=False)
    print(f"{num_rows} rows written in {(time.time() - t0):.1f} seconds")
    

    结果:

    1000 rows written in 1.6 seconds
    

    有关此方法的更完整讨论,请参阅 this answer .