代码之家  ›  专栏  ›  技术社区  ›  Trace

lagom cassandra readside表未创建

  •  4
  • Trace  · 技术社区  · 6 年前

    我已经在user impl服务中创建了一个用户存储库lagom readside,但是由于某些原因,在运行时没有创建cassandra users表 sbt lagom:runAll . 我不知道我错过了什么。

    我的userrepository类:

    @Singleton
    public class UserRepository {
    
        private final CassandraSession session;
    
        @Inject
        public UserRepository(CassandraSession session, ReadSide readSide) {
            this.session = session;
            readSide.register(PUserEventProcessor.class);
        }
    
        private static class PUserEventProcessor extends ReadSideProcessor<PUserEvent> {
    
            private final CassandraSession session;
            private final CassandraReadSide readSide;
    
            private PreparedStatement insertUserStatement;
            private PreparedStatement updateUserStatement;
    
            @Inject
            public PUserEventProcessor(CassandraSession session, CassandraReadSide readSide) {
                this.session = session;
                this.readSide = readSide;
            }
    
            @Override
            public ReadSideHandler<PUserEvent> buildHandler() {
                return readSide.<PUserEvent>builder("pUserEventOffset")
                        .setGlobalPrepare(this::createTables)
                        .setPrepare(tag -> prepareStatements())
                        .setEventHandler(PUserEvent.PUserCreated.class,
                                e -> insertUser(e.getUser()))
                        .setEventHandler(PUserEvent.PUserUpdated.class,
                                e -> updateUser(e.getUser()))
                        .build();
            }
    
            private void registerCodec(Session session, InstantCodec codec) {
                session.getCluster().getConfiguration().getCodecRegistry().register(codec);
            }
    
            @Override
            public PSequence<AggregateEventTag<PUserEvent>> aggregateTags() {
                return PUserEvent.TAG.allTags();
            }
    
            private CompletionStage<Done> createTables() {
                return doAll(
                        session.executeCreateTable(
                                "CREATE TABLE IF NOT EXISTS users (" +
                                        "userId UUID, " +
                                        "email text, " +
                                        "firstName text, " +
                                        "lastName text, " +
                                        "gender text, " +
                                        "PRIMARY KEY (userId) " +
                                        ")"
                        )
                );
            }
    
            private CompletionStage<Done> prepareStatements() {
                return doAll(
                        session.underlying()
                                .thenAccept(s -> registerCodec(s, InstantCodec.instance))
                                .thenApply(x -> Done.getInstance()),
                        prepareInsertUserStatement(),
                        prepareUpdateUserStatement());
            }
    
    
            // Insert users
    
            private CompletionStage<Done> prepareInsertUserStatement() {
                return session.
                        prepare("INSERT INTO users(" +
                                "userId, " +
                                "email" +
                                "firstName, " +
                                "lastName, " +
                                "gender, " +
                                "refreshToken" +
                                ") " +
                                "VALUES (?, ?, ?, ?, ?, ?)"
                        )
                        .thenApply(accept(s -> insertUserStatement = s));
            }
    
    
            private CompletionStage<List<BoundStatement>> insertUser(DbUser user) {
                return completedStatements(
                        insertUserCreator(user)
                );
            }
    
            private BoundStatement insertUserCreator(DbUser user) {
                return insertUserStatement.bind(
                        user.getUserId(),
                        user.getEmail(),
                        user.getFirstName(),
                        user.getLastName(),
                        user.getGender(),
                        user.getRefreshToken()
                );
            }
    
            // Update user by userId
    
            private CompletionStage<Done> prepareUpdateUserStatement() {
                return session.
                        prepare("UPDATE users " +
                                "SET email = ?, " +
                                "   firstName = ?, " +
                                "   lastName = ?, " +
                                "   gender = ?, " +
                                "   refreshToken = ?" +
                                "WHERE userId = ?;"
                        )
                        .thenApply(accept(s -> updateUserStatement = s));
            }
    
    
            private CompletionStage<List<BoundStatement>> updateUser(DbUser user) {
                return completedStatements(
                        updateUserCreator(user)
                );
            }
    
            private BoundStatement updateUserCreator(DbUser user) {
                return updateUserStatement.bind(
                        user.getEmail(),
                        user.getFirstName(),
                        user.getLastName(),
                        user.getGender(),
                        user.getUserId(),
                        user.getRefreshToken()
                );
            }
    
        }
    } 
    

    添加到usermodule.java:

    public class UserModule extends AbstractModule implements ServiceGuiceSupport {
    
        @Override
        protected void configure() {
            bindService(UserService.class, UserServiceImpl.class);
            bind(UserRepository.class);
        }
    }
    

    application.conf文件:

    play.modules.enabled += UserModule
    lagom.persistence.ask-timeout = 1000s
    
    user.cassandra.keyspace = user
    
    cassandra-journal.keyspace = ${user.cassandra.keyspace}
    cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
    lagom.persistence.read-side.cassandra.keyspace = ${user.cassandra.keyspace}
    

    enter image description here

    我的设置是标准的:

    enter image description here

    “user”表被创建,消息表甚至存储事件。
    关于“users”表为何不出现或如何排除故障的任何帮助?

    这让我发疯。
    启动服务时不会抛出错误,检查了lagom拍卖示例,那里有可用的密钥空间。我看代码没什么区别…

    编辑: 更多的尝试得到我的桌子: -sbt清除/手动删除嵌入的cassandra -升级lagom-sbt-plugin至1.4.8版本

    非常感谢你的帮助甚至小费。没有readside就做不了什么,哈哈。

    1 回复  |  直到 6 年前
        1
  •  0
  •   erip Jigar Trivedi    6 年前

    你需要把 UserRepository 作为一个 eager singleton . 这将强制初始化不懒惰。在prod中,您会注意到这并不重要,但对于开发来说是重要的(这可能是您在dev中看到这个问题的原因)。

    例如,请参见 online auction .