代码之家  ›  专栏  ›  技术社区  ›  Alexey Romanov

为什么Erlang驱动程序输出与预期不同?

  •  3
  • Alexey Romanov  · 技术社区  · 14 年前

    我尝试将对64位整数的支持添加到我的erlang sqlite 3包装器的分支(因为这是sqlite3使用的主要整数类型)。虽然看起来很简单,但我遇到了一个问题。

    这是带有调试语句的驱动程序C端的代码(请参见 https://github.com/alexeyr/erlang-sqlite3/tree/64bit 对于其他文件,但我认为问题包含在这里):

    #include "sqlite3_drv.h"
    
    // Callback Array
    static ErlDrvEntry basic_driver_entry = {
        NULL,                             /* init */
        start,                            /* startup (defined below) */
        stop,                             /* shutdown (defined below) */
        NULL,                             /* output */
        NULL,                             /* ready_input */
        NULL,                             /* ready_output */
        "sqlite3_drv",                    /* the name of the driver */
        NULL,                             /* finish */
        NULL,                             /* handle */
        control,                          /* control */
        NULL,                             /* timeout */
        NULL,                             /* outputv (defined below) */
        ready_async,                      /* ready_async */
        NULL,                             /* flush */
        NULL,                             /* call */
        NULL,                             /* event */
        ERL_DRV_EXTENDED_MARKER,          /* ERL_DRV_EXTENDED_MARKER */
        ERL_DRV_EXTENDED_MAJOR_VERSION,   /* ERL_DRV_EXTENDED_MAJOR_VERSION */
        ERL_DRV_EXTENDED_MAJOR_VERSION,   /* ERL_DRV_EXTENDED_MINOR_VERSION */
        ERL_DRV_FLAG_USE_PORT_LOCKING     /* ERL_DRV_FLAGs */
    };
    
    DRIVER_INIT(basic_driver) {
      return &basic_driver_entry;
    }
    
    // Driver Start
    static ErlDrvData start(ErlDrvPort port, char* cmd) {
      sqlite3_drv_t* retval = (sqlite3_drv_t*) driver_alloc(sizeof(sqlite3_drv_t));
      struct sqlite3 *db = 0;
      int status = 0;
    
      retval->log = fopen ("/tmp/erlang-sqlite3-drv.log", "a+");
      if (!retval->log) {
        fprintf (stderr, "Can't create log file\n");
      }
    
      fprintf (retval->log, "--- Start erlang-sqlite3 driver\nCommand line: [%s]\n", cmd);
    
    
      const char *db_name = strstr (cmd, " ");
      if (!db_name) {
        fprintf (retval->log, "ERROR: DB name should be passed at command line\n");
        db_name = DB_PATH;
      } else {
        ++db_name;
      }
    
      // Create and open the database
      sqlite3_open(db_name, &db);
      status = sqlite3_errcode(db);
    
      if(status != SQLITE_OK) {
        fprintf(retval->log, "ERROR: Unabled to open file: %s because %s\n\n", DB_PATH, sqlite3_errmsg(db));
      } else {
        fprintf(retval->log, "Opened file %s\n", db_name);
      }
    
      // Set the state for the driver
      retval->port = port;
      retval->db = db;
      retval->key = 42; //FIXME: Just a magic number, make real key
    
    #define STR_(ARG) #ARG
    #define STR(ARG) STR_(ARG)
      retval->atom_error = driver_mk_atom ("error");
      retval->atom_columns = driver_mk_atom ("columns");
      retval->atom_rows = driver_mk_atom ("rows");
      retval->atom_null = driver_mk_atom (STR (NULL_ATOM));
      retval->atom_id = driver_mk_atom ("id");
      retval->atom_ok = driver_mk_atom ("ok");
      retval->atom_unknown_cmd = driver_mk_atom ("uknown_command");
    
      fflush (retval->log);
      return (ErlDrvData) retval;
    }
    
    
    // Driver Stop
    static void stop(ErlDrvData handle) {
      sqlite3_drv_t* driver_data = (sqlite3_drv_t*) handle;
    
      sqlite3_close(driver_data->db);
      fclose (driver_data->log);
      driver_data->log = 0;
    
      driver_free(driver_data);
    }
    
    // Handle input from Erlang VM
    static int control(ErlDrvData drv_data, unsigned int command, char *buf, 
                       int len, char **rbuf, int rlen) {
      sqlite3_drv_t* driver_data = (sqlite3_drv_t*) drv_data;
    
      switch(command) {
        case CMD_SQL_EXEC:
          sql_exec(driver_data, buf, len);
          break;
        default:
          unknown(driver_data, buf, len);
      }
      return 0;
    }
    
    
    static inline int return_error(sqlite3_drv_t *drv, const char *error, ErlDrvTermData **spec, int *terms_count) {
      *spec = (ErlDrvTermData *)calloc(7, sizeof(ErlDrvTermData));
      (*spec)[0] = ERL_DRV_ATOM;
      (*spec)[1] = drv->atom_error;
      (*spec)[2] = ERL_DRV_STRING;
      (*spec)[3] = (ErlDrvTermData)error;
      (*spec)[4] = strlen(error);
      (*spec)[5] = ERL_DRV_TUPLE;
      (*spec)[6] = 2;
      *terms_count = 7;
      return 0;
    }
    
    static int sql_exec(sqlite3_drv_t *drv, char *command, int command_size) {
    
      int result, next_row;
      char *rest = NULL;
      sqlite3_stmt *statement;
    
      // fprintf(drv->log, "Preexec: %.*s\n", command_size, command);
      // fflush (drv->log);
      result = sqlite3_prepare_v2(drv->db, command, command_size, &statement, (const char **)&rest);
      if(result != SQLITE_OK) { 
        ErlDrvTermData *dataset;
        int term_count;
        return_error(drv, sqlite3_errmsg(drv->db), &dataset, &term_count); 
        driver_output_term(drv->port, dataset, term_count);
        return 0;
      }
    
      async_sqlite3_command *async_command = (async_sqlite3_command *)calloc(1, sizeof(async_sqlite3_command));
      async_command->driver_data = drv;
      async_command->statement = statement;
    
      // fprintf(drv->log, "Driver async: %d %p\n", SQLITE_VERSION_NUMBER, async_command->statement);
      // fflush (drv->log);
    
      if (sqlite3_threadsafe()) {
        drv->async_handle = driver_async(drv->port, &drv->key, sql_exec_async, async_command, sql_free_async);
      } else {
        sql_exec_async(async_command);
        ready_async((ErlDrvData)drv, (ErlDrvThreadData)async_command);
      }
      return 0;
    }
    
    static void sql_free_async(void *_async_command)
    {
      int i;
      async_sqlite3_command *async_command = (async_sqlite3_command *)_async_command;
      free(async_command->dataset);
    
      async_command->driver_data->async_handle = 0;
    
      if (async_command->int64s) {
        free(async_command->int64s);
      }
      if (async_command->floats) {
        free(async_command->floats);
      }
      for (i = 0; i < async_command->binaries_count; i++) {
        driver_free_binary(async_command->binaries[i]);
      }
      if(async_command->binaries) {
        free(async_command->binaries);
      }
      if (async_command->statement) {
        sqlite3_finalize(async_command->statement);
      }
      free(async_command);
    }
    
    
    static void sql_exec_async(void *_async_command) {
      async_sqlite3_command *async_command = (async_sqlite3_command *)_async_command;
      ErlDrvTermData *dataset = async_command->dataset;
      int term_count = async_command->term_count;
      int row_count = async_command->row_count;
      sqlite3_drv_t *drv = async_command->driver_data;
    
      int result, next_row, column_count;
      char *error = NULL;
      char *rest = NULL;
      sqlite3_stmt *statement = async_command->statement;
    
      sqlite3_int64 *int64s = NULL;
      int int64_count = 0;
    
      double *floats = NULL;
      int float_count = 0;
    
      ErlDrvBinary **binaries = NULL;
      int binaries_count = 0;
      int i;
    
    
      column_count = sqlite3_column_count(statement);
      dataset = NULL;
    
      term_count += 2;
      dataset = realloc(dataset, sizeof(*dataset) * term_count);
      dataset[term_count - 2] = ERL_DRV_PORT;
      dataset[term_count - 1] = driver_mk_port(drv->port);
    
      if (column_count > 0) {
        int base = term_count;
        term_count += 2 + column_count*3 + 1 + 2 + 2 + 2;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[base] = ERL_DRV_ATOM;
        dataset[base + 1] = drv->atom_columns;
        for (i = 0; i < column_count; i++) {
          char *column_name = (char *)sqlite3_column_name(statement, i);
          // fprintf(drv->log, "Column: %s\n", column_name);
          // fflush (drv->log);
    
          dataset[base + 2 + (i*3)] = ERL_DRV_STRING;
          dataset[base + 2 + (i*3) + 1] = (ErlDrvTermData) column_name;
          dataset[base + 2 + (i*3) + 2] = strlen (column_name);
        }
        dataset[base + 2 + column_count*3 + 0] = ERL_DRV_NIL;
        dataset[base + 2 + column_count*3 + 1] = ERL_DRV_LIST;
        dataset[base + 2 + column_count*3 + 2] = column_count + 1;
        dataset[base + 2 + column_count*3 + 3] = ERL_DRV_TUPLE;
        dataset[base + 2 + column_count*3 + 4] = 2;
    
        dataset[base + 2 + column_count*3 + 5] = ERL_DRV_ATOM;
        dataset[base + 2 + column_count*3 + 6] = drv->atom_rows;
      }
    
      // fprintf(drv->log, "Exec: %s\n", sqlite3_sql(statement));
      // fflush (drv->log);
    
      while ((next_row = sqlite3_step(statement)) == SQLITE_ROW) {
    
        for (i = 0; i < column_count; i++) {
          // fprintf(drv->log, "Column %d type: %d\n", i, sqlite3_column_type(statement, i));
          // fflush (drv->log);
          switch (sqlite3_column_type(statement, i)) {
            case SQLITE_INTEGER: {
              int64_count++;
              int64s = realloc(int64s, sizeof(sqlite3_int64) * int64_count);
              int64s[int64_count - 1] = sqlite3_column_int64(statement, i);
    
              term_count += 2;
              dataset = realloc(dataset, sizeof(*dataset) * term_count);
              dataset[term_count - 2] = ERL_DRV_INT64;
              dataset[term_count - 1] = (ErlDrvTermData)&int64s[int64_count - 1];
              printf("Dataset element: %lld\n", *((sqlite3_int64 *) dataset[term_count - 1]));
              break;
            }
            case SQLITE_FLOAT: {
              float_count++;
              floats = realloc(floats, sizeof(double) * float_count);
              floats[float_count - 1] = sqlite3_column_double(statement, i);
    
              term_count += 2;
              dataset = realloc(dataset, sizeof(*dataset) * term_count);
              dataset[term_count - 2] = ERL_DRV_FLOAT;
              dataset[term_count - 1] = (ErlDrvTermData)&floats[float_count - 1];
              break;
            }
            case SQLITE_BLOB: 
            case SQLITE_TEXT: {
              int bytes = sqlite3_column_bytes(statement, i);
              binaries_count++;
              binaries = realloc(binaries, sizeof(*binaries) * binaries_count);
              binaries[binaries_count - 1] = driver_alloc_binary(bytes);
              binaries[binaries_count - 1]->orig_size = bytes;
              memcpy(binaries[binaries_count - 1]->orig_bytes, sqlite3_column_blob(statement, i), bytes);
    
              term_count += 4;
              dataset = realloc(dataset, sizeof(*dataset) * term_count);
              dataset[term_count - 4] = ERL_DRV_BINARY;
              dataset[term_count - 3] = (ErlDrvTermData)binaries[binaries_count - 1];
              dataset[term_count - 2] = bytes;
              dataset[term_count - 1] = 0;
              break;
            }
            case SQLITE_NULL: {
              term_count += 2;
              dataset = realloc (dataset, sizeof (*dataset) * term_count);
              dataset[term_count - 2] = ERL_DRV_ATOM;
              dataset[term_count - 1] = drv->atom_null;
              break;
            }
          }
        }
        term_count += 2;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[term_count - 2] = ERL_DRV_TUPLE;
        dataset[term_count - 1] = column_count;
    
        row_count++;
      }
      async_command->row_count = row_count;
      async_command->int64s = int64s;
      async_command->floats = floats;
      async_command->binaries = binaries;
      async_command->binaries_count = binaries_count;
    
    
      if (next_row == SQLITE_BUSY) {
        return_error(drv, "SQLite3 database is busy", &async_command->dataset, &async_command->term_count);
        return;
      }
      if (next_row != SQLITE_DONE) {
        return_error(drv, sqlite3_errmsg(drv->db), &async_command->dataset, &async_command->term_count);
        return;
      }
    
    
      if (column_count > 0) {
        term_count += 3;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[term_count - 3] = ERL_DRV_NIL;
        dataset[term_count - 2] = ERL_DRV_LIST;
        dataset[term_count - 1] = row_count + 1;
    
        term_count += 2;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[term_count - 2] = ERL_DRV_TUPLE;
        dataset[term_count - 1] = 2;
    
    
        term_count += 3;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[term_count - 3] = ERL_DRV_NIL;
        dataset[term_count - 2] = ERL_DRV_LIST;
        dataset[term_count - 1] = 3;
    
      } else if (strcasestr(sqlite3_sql(statement), "INSERT"))  {
    
        sqlite3_int64 rowid = sqlite3_last_insert_rowid(drv->db);
        term_count += 6;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[term_count - 6] = ERL_DRV_ATOM;
        dataset[term_count - 5] = drv->atom_id;
        dataset[term_count - 4] = ERL_DRV_INT;
        dataset[term_count - 3] = rowid;
        dataset[term_count - 2] = ERL_DRV_TUPLE;
        dataset[term_count - 1] = 2;
      } else {
        term_count += 6;
        dataset = realloc(dataset, sizeof(*dataset) * term_count);
        dataset[term_count - 6] = ERL_DRV_ATOM;
        dataset[term_count - 5] = drv->atom_ok;
        dataset[term_count - 4] = ERL_DRV_INT;
        dataset[term_count - 3] = next_row;
        dataset[term_count - 2] = ERL_DRV_TUPLE;
        dataset[term_count - 1] = 2;
      }
    
      term_count += 2;
      dataset = realloc(dataset, sizeof(*dataset) * term_count);
      dataset[term_count - 2] = ERL_DRV_TUPLE;
      dataset[term_count - 1] = 2;
    
    
    
    
      async_command->dataset = dataset;
      async_command->term_count = term_count;
      // fprintf(drv->log, "Total term count: %p %d, rows count: %dx%d\n", statement, term_count, column_count, row_count);
      // fflush (drv->log);
    }
    
    static void ready_async(ErlDrvData drv_data, ErlDrvThreadData thread_data)
    {
      async_sqlite3_command *async_command = (async_sqlite3_command *)thread_data;
      sqlite3_drv_t *drv = async_command->driver_data;
    
      int res = driver_output_term(drv->port, async_command->dataset, async_command->term_count);
      // fprintf(drv->log, "Total term count: %p %d, rows count: %d (%d)\n", async_command->statement, async_command->term_count, async_command->row_count, res);
      // fflush (drv->log);
      sql_free_async(async_command);
    }
    
    
    // Unkown Command
    static int unknown(sqlite3_drv_t *drv, char *command, int command_size) {
      // Return {error, unkown_command}
      ErlDrvTermData spec[] = {ERL_DRV_ATOM, drv->atom_error,
                   ERL_DRV_ATOM, drv->atom_unknown_cmd,
                   ERL_DRV_TUPLE, 2};
      return driver_output_term(drv->port, spec, sizeof(spec) / sizeof(spec[0]));
    }
    

    测试输出:

    module 'sqlite3_test'
      sqlite3_test: basic_functionality_test...Dataset element: 1
    Dataset element: 20
    Dataset element: 2000
    Dataset element: 2
    Dataset element: 30
    Dataset element: 2000
    *failed*
    ::error:{assertEqual_failed,
              [{module,sqlite3_test},
               {line,58},
               {expression,"sqlite3 : sql_exec ( ct , \"select * from user;\" )"},
               {expected,
                   [{columns,["id","name","age","wage"]},
                    {rows,[{1,<<"abby">>,20,2000},{2,<<"marge">>,30,2000}]}]},
               {value,
                   [{columns,["id","name","age","wage"]},
                    {rows,[{0,<<"abby">>,20,2000},{2,<<"marge">>,30,2000}]}]}]}
      in function sqlite3_test:'-basic_functionality_test/0-fun-5-'/1
      in call from sqlite3_test:basic_functionality_test/0
    
    
      sqlite3_test: large_number_test...Dataset element: 4294967298
    Dataset element: 4294967298
    *failed*
    ::error:{assertEqual_failed,
              [{module,sqlite3_test},
               {line,141},
               {expression,"rows ( sqlite3 : sql_exec ( ct , Query1 ) )"},
               {expected,[{4294967298,4294967298}]},
               {value,[{4294967296,4294967298}]}]}
      in function sqlite3_test:'-large_number_test/0-fun-0-'/2
    

    所以当 第一 列是一个整数,将正确的值放入数据集中,但Erlang从驱动程序接收不同的值。为什么会这样?

    1 回复  |  直到 14 年前
        1
  •  0
  •   Alexey Romanov    14 年前

    由于重新分配数组,出现了问题。当我打印地址时 int64 除了它们的值,我得到的输出类似于

    While filling row: (0x8a9c010:4294967298 )
    dataset (22 terms):
    int64: 0x8a9c010:4294967298
    While filling row: (0x8a9bfb8:4294967298 0x8a9bfc0:4294967298 )
    dataset (24 terms):
    int64: 0x8a9c010:4294967296
    int64: 0x8a9bfc0:4294967298
    While filling row: (0x8a9bfb8:4294967298 0x8a9bfc0:4294967298 0x8a9bfc8:4294967298 )
    dataset (26 terms):
    int64: 0x8a9c010:4294967296
    int64: 0x8a9bfc0:4294967298
    int64: 0x8a9bfc8:4294967298
    

    (数组 int64s 在“填充行时”后的括号中打印,下面的值取自数据集)。因此,当重新分配数组时,数据集中列出的指针指向旧数组。

    通过切换到链接列表修复。