代码之家  ›  专栏  ›  技术社区  ›  prototype

按组保留N条最新记录,同时删除其他记录

  •  2
  • prototype  · 技术社区  · 7 年前

    我们有一个MongoDB集合,其中保存了“文档”和“日志”的集合。每个日志都保留对文档ID和日期的引用:

    • 文件 ({_id: ObjectId})
    • ({ docId: String, date: Date})

    日志数据库越来越大。我想删除所有日志,但以下日志除外:

    • 在给定docId的最近12个日志中

    我知道如何删除所有超过30天的日志。但是我不知道如何保存文档的最新N个日志。

    1 回复  |  直到 7 年前
        1
  •  3
  •   Neil Lunn    7 年前

    单凭一句话是做不到的。你可以做的是从本质上识别每个可能的“最后12个” "docId" 值,然后通过将列表添加到 $nin

    您没有指定首选的编程环境,但以下是使用NodeJ的一般过程:

    const MongoClient = require('mongodb').MongoClient;
    
    const uri = 'mongodb://localhost/test';
    
    (async function() {
    
      let db;
    
      // Calculate date cutoff
      let oneDay = 1000 * 60 * 60 * 24,
          thirtyDays = oneDay * 30,
          now = Date.now(),
          cutoff = new Date(
            ( now - ( now % oneDay ) ) - thirtyDays
          );
    
      try {
    
        db = await MongoClient.connect(uri);
    
        let log = db.collection('log');
        let doc = db.collection('doc');
    
        await new Promise((resolve,reject) => {
    
          let ops = [];
    
          let stream = doc.find();
    
          stream.on('error', reject);
          stream.on('end', async () => {
            if ( ops.length > 0 ) {
              await log.bulkWrite(ops);
              ops = [];
            }
            resolve();
          });
    
          stream.on('data', async (data) => {
    
            // Pause processing input stream
            stream.pause();
    
            // get last 12 for doc
            let last = await (log.find({ docId: data._id })
              .project({ _id: 1 })
              .sort({ date: -1 }).limit(12)).map(d => d._id);
    
            ops.push({
              deleteMany: {
                filter: {
                  _id: { $nin: last },
                  docId: data._id,
                  date: { $lt: cutoff }
                }
              }
            });
    
            if ( ops.length >= 1000 ) {
              await log.bulkWrite(ops);
              ops = [];
            }
    
            // Resume processing input stream
            stream.resume()
    
          });
    
        });
    
      } catch(e) {
        console.error(e);
      } finally {
        db.close();
      }
    
    })();
    

    _id 找到的每个文件中的值(如果有)。

    deleteMany .bulkWrite() 而不是每次迭代源文档时都发出请求。这大大减少了网络流量和延迟。

    然后,基本语句是删除 “文档ID” 从光标中的源匹配当前文档,并且日期早于30天的截止点。

    $nin 从上一个查询中“排除”任何被标识为“最近12个”的文档。这样可以确保始终保留这些文档,因为它们不会被删除。

            ops.push({
              deleteMany: {
                filter: {
                  _id: { $nin: last },
                  docId: data._id,
                  date: { $lt: cutoff }
                }
              }
            });
    


    MongoDB 3.6预览版

    您可以从当前“即将发布的”MongoDB中得到一件事,那就是它允许一种“非相关”形式的 $lookup 这意味着我们基本上可以在一个请求中获得每个目标文档的“前12个”,而不是发出多个查询。

    它这样做是因为这种形式的 $查找 $match , $sort $limit 结果返回。

    const MongoClient = require('mongodb').MongoClient;
    
    const uri = 'mongodb://localhost/test';
    
    (async function() {
    
      let db;
    
      // Calculate date cutoff
      let oneDay = 1000 * 60 * 60 * 24,
          thirtyDays = oneDay * 30,
          now = Date.now(),
          cutoff = new Date(
            ( now - ( now % oneDay ) ) - thirtyDays
          );
    
      try {
    
        db = await MongoClient.connect(uri);
    
        await new Promise((resolve,reject) => {
    
          let ops = [];
    
          let stream = db.collection('doc').aggregate([
            { "$lookup": {
              "from": "log",
              "let": {
                "id": "$_id"
              },
              "pipeline": [
                { "$match": {
                  "docId": { "$eq": { "$expr": "$$id" } }
                }},
                { "$sort": { "date": -1 } },
                { "$limit": 12 },
                { "$project": { "_id": 1 } }
              ],
              "as": 'docs'
            }},
          ]);
    
          stream.on('error', reject);
          stream.on('end', async () => {
            if ( ops.length > 0 ) {
              await db.collection('log').bulkWrite(ops);
              ops = [];
            }
            resolve();
          });
    
          stream.on('data', async (data) => {
            stream.pause();
    
            ops.push({
              deleteMany: {
                filter: {
                  _id: { $nin: data.docs.map(d => d._id) },
                  docId: data._id,
                  date: { $lt: cutoff }
                }
              }
            });
    
            if ( ops.length >= 1000 ) {
              await db.collection('log').bulkWrite(ops);
              ops = [];
            }
            stream.resume();
    
          });
    
        });
    
    
      } catch(e) {
        console.error(e);
      } finally {
        db.close();
      }
    
    })();
    

    $expr ,该版本仅在3.5.12开发版本中最终确定。这使得 $匹配

    当然,你真的想等待它准备好投入生产。但是,了解这一点很好,这样您就可以在最终升级基础MongoDB时过渡到这样一个过程。