代码之家  ›  专栏  ›  技术社区  ›  Chet

需要将大型QueryRunner结果流式传输到文件,似乎正在内存中存储

  •  0
  • Chet  · 技术社区  · 6 年前

    我正在尝试构建一个Java应用程序,它可以将非常大的任意SQL查询结果集流到JSOL文件中,特别是通过SQLServer,但希望与任何JDBC一起运行。 DataSource . 在Python中,只需将SQL客户机结果视为生成器,然后调用 json.dumps() . 然而,在这段代码中,它似乎在写出之前将所有内容放入内存中,这通常会导致堆和垃圾收集异常。我需要这个来运行的查询非常大,可以恢复到10GB的原始数据。执行时间不是主要的问题,只要它每次都工作。

    我试过称之为“每行后刷新”(这很荒谬),这似乎有助于小数据集,但不适用于大数据集。有人能提出一个我可以轻松实现的策略吗?

    在我的SQL客户机类中,我使用apache dbutils QueryRunner MapListHandler 创建 Map 这是我需要的灵活性(与爪哇需要指定模式和类型的更传统的方法):

    public List<Map<String, Object>> query(String queryText) {
        try {
            DbUtils.loadDriver("com.microsoft.sqlserver.jdbc.Driver");
    
            // this function just sets up all the connection properties. Ommitted for clarity
            DataSource ds = this.initDataSource();
    
            StatementConfiguration sc = new StatementConfiguration.Builder().fetchSize(10000).build();
            QueryRunner queryRunner = new QueryRunner(ds, sc);
            MapListHandler handler = new MapListHandler();
            return queryRunner.query(queryText, handler);
        } catch (Exception e) {
            logger.error(e.getMessage());
            e.printStackTrace();
            return null;
        }
    }
    

    JsonLOutputWriter 班级:

    JsonLOutputWriter(String filename) {
        GsonBuilder gsonBuilder = new GsonBuilder();
        gsonBuilder.serializeNulls();
        this.gson = gsonBuilder.create();
        try {
            this.writer = new PrintWriter(new File(filename), ENCODING);
        } catch (FileNotFoundException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    
    void writeRow(Map row) {
        this.writer.println(this.gson.toJson(row));
    }
    
    void flush() {
        this.writer.flush();
    }
    

    主要方法:

    JsonLOutputWriter writer = new JsonLOutputWriter(outputFile)
    for (Map row : client.query(inputSql)) {
        writer.writeRow(row);
    }
    writer.flush()
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Chet    6 年前

    基本上这不能用 DbUtils 开箱即用。我摆脱了 QueryRunner MapListHandler 因为处理程序创建了 ArrayList . 我把它变成了推式的,创建了一个非常相似的 MyQueryRunner 那需要一个 MyRowHandler 而不是返回一个集合 ResultSet 调用输出函数。

    我确信有更优雅的方法来实现这一点并返回某种类型的行缓冲区,但这是我需要的80/20,并且适用于大型数据集。

    罗汉德勒

    public class RowHandler {
        private static final RowProcessor ROW_PROCESSOR = new BasicRowProcessor();
        private JsonLOutputWriter writer;
    
        public RowHandler(JsonLOutputWriter writer) {
            this.writer = writer;
        }
    
        int handle(ResultSet rs) throws SQLException {
            AtomicInteger counter = new AtomicInteger();
            while (rs.next()) {
                writer.writeRow(this.handleRow(rs));
                counter.getAndIncrement();
            }
            return counter.intValue();
        }
    
        protected Map<String, Object> handleRow(ResultSet rs) throws SQLException {
            return this.ROW_PROCESSOR.toMap(rs);
        }
    
    }
    

    查询处理器

    class CustomQueryRunner extends AbstractQueryRunner {
    
        private final RowHandler rh;
    
        CustomQueryRunner(DataSource ds, StatementConfiguration stmtConfig, RowHandler rh) {
            super(ds, stmtConfig);
            this.rh = rh;
        }
    
        int query(String sql) throws SQLException {
            Connection conn = this.prepareConnection();
            return this.query(conn, true, sql);
        }
    
        private int query(Connection conn, boolean closeConn, String sql, Object... params)
                throws SQLException {
            if (conn == null) {
                throw new SQLException("Null connection");
            }
            PreparedStatement stmt = null;
            ResultSet rs = null;
            int count = 0;
            try {
                stmt = this.prepareStatement(conn, sql);
                this.fillStatement(stmt, params);
                rs = this.wrap(stmt.executeQuery());
                count = rh.handle(rs);
            } catch (SQLException e) {
                this.rethrow(e, sql, params);
            } finally {
                try {
                    close(rs);
                } finally {
                    close(stmt);
                    if (closeConn) {
                        close(conn);
                    }
                }
            }
            return count;
        }
    }