我已经在user impl服务中创建了一个用户存储库lagom readside,但是由于某些原因,在运行时没有创建cassandra users表
sbt lagom:runAll
. 我不知道我错过了什么。
我的userrepository类:
@Singleton
public class UserRepository {
private final CassandraSession session;
@Inject
public UserRepository(CassandraSession session, ReadSide readSide) {
this.session = session;
readSide.register(PUserEventProcessor.class);
}
private static class PUserEventProcessor extends ReadSideProcessor<PUserEvent> {
private final CassandraSession session;
private final CassandraReadSide readSide;
private PreparedStatement insertUserStatement;
private PreparedStatement updateUserStatement;
@Inject
public PUserEventProcessor(CassandraSession session, CassandraReadSide readSide) {
this.session = session;
this.readSide = readSide;
}
@Override
public ReadSideHandler<PUserEvent> buildHandler() {
return readSide.<PUserEvent>builder("pUserEventOffset")
.setGlobalPrepare(this::createTables)
.setPrepare(tag -> prepareStatements())
.setEventHandler(PUserEvent.PUserCreated.class,
e -> insertUser(e.getUser()))
.setEventHandler(PUserEvent.PUserUpdated.class,
e -> updateUser(e.getUser()))
.build();
}
private void registerCodec(Session session, InstantCodec codec) {
session.getCluster().getConfiguration().getCodecRegistry().register(codec);
}
@Override
public PSequence<AggregateEventTag<PUserEvent>> aggregateTags() {
return PUserEvent.TAG.allTags();
}
private CompletionStage<Done> createTables() {
return doAll(
session.executeCreateTable(
"CREATE TABLE IF NOT EXISTS users (" +
"userId UUID, " +
"email text, " +
"firstName text, " +
"lastName text, " +
"gender text, " +
"PRIMARY KEY (userId) " +
")"
)
);
}
private CompletionStage<Done> prepareStatements() {
return doAll(
session.underlying()
.thenAccept(s -> registerCodec(s, InstantCodec.instance))
.thenApply(x -> Done.getInstance()),
prepareInsertUserStatement(),
prepareUpdateUserStatement());
}
// Insert users
private CompletionStage<Done> prepareInsertUserStatement() {
return session.
prepare("INSERT INTO users(" +
"userId, " +
"email" +
"firstName, " +
"lastName, " +
"gender, " +
"refreshToken" +
") " +
"VALUES (?, ?, ?, ?, ?, ?)"
)
.thenApply(accept(s -> insertUserStatement = s));
}
private CompletionStage<List<BoundStatement>> insertUser(DbUser user) {
return completedStatements(
insertUserCreator(user)
);
}
private BoundStatement insertUserCreator(DbUser user) {
return insertUserStatement.bind(
user.getUserId(),
user.getEmail(),
user.getFirstName(),
user.getLastName(),
user.getGender(),
user.getRefreshToken()
);
}
// Update user by userId
private CompletionStage<Done> prepareUpdateUserStatement() {
return session.
prepare("UPDATE users " +
"SET email = ?, " +
" firstName = ?, " +
" lastName = ?, " +
" gender = ?, " +
" refreshToken = ?" +
"WHERE userId = ?;"
)
.thenApply(accept(s -> updateUserStatement = s));
}
private CompletionStage<List<BoundStatement>> updateUser(DbUser user) {
return completedStatements(
updateUserCreator(user)
);
}
private BoundStatement updateUserCreator(DbUser user) {
return updateUserStatement.bind(
user.getEmail(),
user.getFirstName(),
user.getLastName(),
user.getGender(),
user.getUserId(),
user.getRefreshToken()
);
}
}
}
添加到usermodule.java:
public class UserModule extends AbstractModule implements ServiceGuiceSupport {
@Override
protected void configure() {
bindService(UserService.class, UserServiceImpl.class);
bind(UserRepository.class);
}
}
application.conf文件:
play.modules.enabled += UserModule
lagom.persistence.ask-timeout = 1000s
user.cassandra.keyspace = user
cassandra-journal.keyspace = ${user.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${user.cassandra.keyspace}
我的设置是标准的:
“user”表被创建,消息表甚至存储事件。
关于“users”表为何不出现或如何排除故障的任何帮助?
这让我发疯。
启动服务时不会抛出错误,检查了lagom拍卖示例,那里有可用的密钥空间。我看代码没什么区别…
编辑:
更多的尝试得到我的桌子:
-sbt清除/手动删除嵌入的cassandra
-升级lagom-sbt-plugin至1.4.8版本
非常感谢你的帮助甚至小费。没有readside就做不了什么,哈哈。