对令牌范围的查询被执行,spark使用它们来有效地获取数据。但你需要记住-
getTokenRanges
将为您提供所有现有的令牌范围,但也有一些边缘情况-最后一个范围将是表示第一个范围的正数到负数,因此,您的查询将不会执行任何操作。基本上你错过了
MIN_TOKEN
和第一个令牌,以及在最后一个令牌和
MAX_TOKEN
. 火花连接器
generates different CQL statements
基于令牌。另外,您需要将查询路由到正确的节点-这可以通过
setRoutingToken
.
类似的方法可以在Java代码中使用(
full code
):
Metadata metadata = cluster.getMetadata();
Metadata metadata = cluster.getMetadata();
List<TokenRange> ranges = new ArrayList(metadata.getTokenRanges());
Collections.sort(ranges);
System.out.println("Processing " + (ranges.size()+1) + " token ranges...");
Token minToken = ranges.get(0).getStart();
String baseQuery = "SELECT id, col1 FROM test.range_scan WHERE ";
Map<String, Token> queries = new HashMap<>();
for (int i = 0; i < ranges.size(); i++) {
TokenRange range = ranges.get(i);
Token rangeStart = range.getStart();
Token rangeEnd = range.getEnd();
if (i == 0) {
queries.put(baseQuery + "token(id) <= " + minToken, minToken);
queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
} else if (rangeEnd.equals(minToken)) {
queries.put(baseQuery + "token(id) > " + rangeStart, rangeEnd);
} else {
queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
}
}
long rowCount = 0;
for (Map.Entry<String, Token> entry: queries.entrySet()) {
SimpleStatement statement = new SimpleStatement(entry.getKey());
statement.setRoutingToken(entry.getValue());
ResultSet rs = session.execute(statement);
}