@3皱襞,
Github Repo
我使用下面的方法来创建PartitionCollection-
static internal async Task<DocumentCollection> CreatePartitionedCollectionAsync(DocumentClient client, string databaseName,
string collectionName, int collectionThroughput)
{
PartitionKeyDefinition partitionKey = new PartitionKeyDefinition
{
Paths = new Collection<string> { ConfigurationManager.AppSettings["CollectionPartitionKey"] }
};
DocumentCollection collection = new DocumentCollection { Id = collectionName, PartitionKey = partitionKey };
try
{
collection = await client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(databaseName),
collection,
new RequestOptions { OfferThroughput = collectionThroughput });
}
catch (Exception e)
{
throw e;
}
return collection;
}
然后在主要的方法中采用了批量导入的方式下图:-
//在这里用简单的分区键创建文档。
字符串partitionKeyProperty=dataCollection.PartitionKey.Paths[0]。替换(“/”,“”);
long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);
// Set retry options high for initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var tasks=新列表();
tasks.Add(Task.Run(async () =>
{
Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", i));
do
{
try
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
{
Trace.TraceError("Document client exception: {0}", de);
break;
}
catch (Exception e)
{
Trace.TraceError("Exception: {0}", e);
break;
}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token));