单凭一句话是做不到的。你可以做的是从本质上识别每个可能的“最后12个”
"docId"
值,然后通过将列表添加到
$nin
您没有指定首选的编程环境,但以下是使用NodeJ的一般过程:
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost/test';
(async function() {
let db;
// Calculate date cutoff
let oneDay = 1000 * 60 * 60 * 24,
thirtyDays = oneDay * 30,
now = Date.now(),
cutoff = new Date(
( now - ( now % oneDay ) ) - thirtyDays
);
try {
db = await MongoClient.connect(uri);
let log = db.collection('log');
let doc = db.collection('doc');
await new Promise((resolve,reject) => {
let ops = [];
let stream = doc.find();
stream.on('error', reject);
stream.on('end', async () => {
if ( ops.length > 0 ) {
await log.bulkWrite(ops);
ops = [];
}
resolve();
});
stream.on('data', async (data) => {
// Pause processing input stream
stream.pause();
// get last 12 for doc
let last = await (log.find({ docId: data._id })
.project({ _id: 1 })
.sort({ date: -1 }).limit(12)).map(d => d._id);
ops.push({
deleteMany: {
filter: {
_id: { $nin: last },
docId: data._id,
date: { $lt: cutoff }
}
}
});
if ( ops.length >= 1000 ) {
await log.bulkWrite(ops);
ops = [];
}
// Resume processing input stream
stream.resume()
});
});
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
_id
找到的每个文件中的值(如果有)。
deleteMany
.bulkWrite()
而不是每次迭代源文档时都发出请求。这大大减少了网络流量和延迟。
然后,基本语句是删除
“文档ID”
从光标中的源匹配当前文档,并且日期早于30天的截止点。
$nin
从上一个查询中“排除”任何被标识为“最近12个”的文档。这样可以确保始终保留这些文档,因为它们不会被删除。
ops.push({
deleteMany: {
filter: {
_id: { $nin: last },
docId: data._id,
date: { $lt: cutoff }
}
}
});
MongoDB 3.6预览版
您可以从当前“即将发布的”MongoDB中得到一件事,那就是它允许一种“非相关”形式的
$lookup
这意味着我们基本上可以在一个请求中获得每个目标文档的“前12个”,而不是发出多个查询。
它这样做是因为这种形式的
$查找
$match
,
$sort
和
$limit
结果返回。
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost/test';
(async function() {
let db;
// Calculate date cutoff
let oneDay = 1000 * 60 * 60 * 24,
thirtyDays = oneDay * 30,
now = Date.now(),
cutoff = new Date(
( now - ( now % oneDay ) ) - thirtyDays
);
try {
db = await MongoClient.connect(uri);
await new Promise((resolve,reject) => {
let ops = [];
let stream = db.collection('doc').aggregate([
{ "$lookup": {
"from": "log",
"let": {
"id": "$_id"
},
"pipeline": [
{ "$match": {
"docId": { "$eq": { "$expr": "$$id" } }
}},
{ "$sort": { "date": -1 } },
{ "$limit": 12 },
{ "$project": { "_id": 1 } }
],
"as": 'docs'
}},
]);
stream.on('error', reject);
stream.on('end', async () => {
if ( ops.length > 0 ) {
await db.collection('log').bulkWrite(ops);
ops = [];
}
resolve();
});
stream.on('data', async (data) => {
stream.pause();
ops.push({
deleteMany: {
filter: {
_id: { $nin: data.docs.map(d => d._id) },
docId: data._id,
date: { $lt: cutoff }
}
}
});
if ( ops.length >= 1000 ) {
await db.collection('log').bulkWrite(ops);
ops = [];
}
stream.resume();
});
});
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
$expr
,该版本仅在3.5.12开发版本中最终确定。这使得
$匹配
当然,你真的想等待它准备好投入生产。但是,了解这一点很好,这样您就可以在最终升级基础MongoDB时过渡到这样一个过程。