代码之家  ›  专栏  ›  技术社区  ›  r3plica

找不到Azure Cosmos BulkExecuter方法

  •  1
  • r3plica  · 技术社区  · 6 年前

    我正在使用 Microsoft.Azure.DocumentDb Microsoft.Azure.CosmsDB.散装执行器 当我运行代码时:

    public async Task CreateMultipleAsync(IEnumerable<JObject> models)
    {
        var collectionLink = UriFactory.CreateDocumentCollectionUri(_databaseName, _collectionName);
        var collection = await _client.ReadDocumentCollectionAsync(collectionLink);
        var bulkExecutor = new BulkExecutor(_client, collection);
    
        try
        {
            await bulkExecutor.InitializeAsync();
            var response = await bulkExecutor.BulkImportAsync(models, true);
        } catch (Exception ex)
        {
    
        }
    }
    

    我得到这个错误:

    找不到方法:'系统线程化任务.任务`1Microsoft.Azure.Documents.Routing.PartitionKeyRangeCache.TryLookupAsync(系统字符串, Microsoft.Azure.Documents.Routing.CollectionRoutingMap, System.Threading.CancellationToken,布尔值)“。

    我见过一些人有这个问题(它似乎不断出现),他们建议分别使用v2.1.3和v1.4.0,我已经在做了。

    .net 4.6.2版 . 有人知道为什么会这样吗

    1 回复  |  直到 6 年前
        1
  •  0
  •   Mohit Verma    6 年前

    @3皱襞,

    Github Repo

    我使用下面的方法来创建PartitionCollection-

    static internal async Task<DocumentCollection> CreatePartitionedCollectionAsync(DocumentClient client, string databaseName,
                string collectionName, int collectionThroughput)
            {
                PartitionKeyDefinition partitionKey = new PartitionKeyDefinition
                {
                    Paths = new Collection<string> { ConfigurationManager.AppSettings["CollectionPartitionKey"] }
                };
                DocumentCollection collection = new DocumentCollection { Id = collectionName, PartitionKey = partitionKey };
    
                try
                {
                    collection = await client.CreateDocumentCollectionAsync(
                        UriFactory.CreateDatabaseUri(databaseName),
                        collection,
                        new RequestOptions { OfferThroughput = collectionThroughput });
                }
                catch (Exception e)
                {
                    throw e;
                }
    
                return collection;
            }
    

    然后在主要的方法中采用了批量导入的方式下图:-

    //在这里用简单的分区键创建文档。 字符串partitionKeyProperty=dataCollection.PartitionKey.Paths[0]。替换(“/”,“”);

        long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
        int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
        long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);
    
        // Set retry options high for initialization (default values).
        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
        IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
        await bulkExecutor.InitializeAsync();
    
        // Set retries to 0 to pass control to bulk executor.
        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
        BulkImportResponse bulkImportResponse = null;
        long totalNumberOfDocumentsInserted = 0;
        double totalRequestUnitsConsumed = 0;
        double totalTimeTakenSec = 0;
    
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;
    

    var tasks=新列表();

            tasks.Add(Task.Run(async () =>
            {
                Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", i));
                do
                {
                    try
                    {
                        bulkImportResponse = await bulkExecutor.BulkImportAsync(
                            documents: documentsToImportInBatch,
                            enableUpsert: true,
                            disableAutomaticIdGeneration: true,
                            maxConcurrencyPerPartitionKeyRange: null,
                            maxInMemorySortingBatchSize: null,
                            cancellationToken: token);
                    }
                    catch (DocumentClientException de)
                    {
                        Trace.TraceError("Document client exception: {0}", de);
                        break;
                    }
                    catch (Exception e)
                    {
                        Trace.TraceError("Exception: {0}", e);
                        break;
                    }
                } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
    
    
    
                totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
                totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
                totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
            },
            token));