代码之家  ›  专栏  ›  技术社区  ›  Earthling

使用sqlalchemy将一个对象从一个数据库持久化到另一个数据库

  •  1
  • Earthling  · 技术社区  · 7 年前

    我有两个数据库(都是Mysql),它们有完全相同的表,我想使用Sqlalchemy将一些数据从一个数据库复制到另一个数据库。

    我可以根据这个问题的答案复制简单对象:

    Cannot move object from one database to another

    问题是当对象具有来自另一个表的依赖项时,我也想复制这些依赖项。

    因此,更清楚地说,这是我的模型(两个数据库相同,但使用不同的bind\u键指向不同的数据库):

    db1 = SQLAlchemy()
    
    Class Payment(db.Model):
        __tablename__ = 'payments'
        __bind_key__ = 'db1'
    
        id = db1.Column(db.Integer, primary_key=True)
        paymethod_id = db1.Column(db.Integer(), db1.ForeignKey(PaymentMethod.id))
        payment_method = db1.relationship(PaymentMethod)
    

    我想做的是:

    from models1 import Payment as Payment1
    from models2 import Payment as Payment2
    
    # query from one database
    payment1 = db1.session.query(Payment1).first()
    
    # create and add it to the other database
    payment2 = Payment2(**payment1.__dict__.copy())
    db2.session.add(payment)
    db2.session.commit()
    

    但在这种情况下,外键失败,因为我还没有存储PaymentMethod。

    有没有不同的方法来实现这一点,或者我必须为我的对象的每个依赖项执行这个过程,并确保我事先存储了子对象?

    感谢您的帮助:)

    1 回复  |  直到 7 年前
        1
  •  1
  •   Earthling    7 年前

    我提出了一个解决方案,将对象重新映射到正确的模型,并存储其所有子对象。你调用这个方法 save_obj 并传递要映射的对象。然后,它将检索具有相同名称的表,但从要将对象重新映射到的模型中,它将递归地对其所有子对象执行相同的操作。您必须在方法中定义正确的模型 get_model

    要运行此功能,需要禁用自动刷新以防止在正确形成对象之前提交,还需要在调用该方法后提交。我在使用炼金术。

    希望这能对面临类似问题的人有所帮助或提供一些见解:)

    def save_obj(obj, checked=[]):
        if obj in checked:
            # if the object was already converted, retrieve the right object
            model = get_model(obj.__mapper__.mapped_table.name)
            return get_obj(obj, model)
    
        checked.append(obj)
        children = []
    
        relations = obj.__mapper__.relationships.items()
    
        # get all the relationships of this model (foreign keys)
        for relation in relations:
            model = get_model(relation[1].table.name)
            if model:
                # remove the cascade option for this object, so the children are not stored automatically in the session
                relation[1]._cascade = CascadeOptions('')
                child = getattr(obj, relation[0])
    
                if not child:
                    continue
    
                # if the child is a set of children
                if isinstance(child, list):
                    new_children = []
                    for ch in copy(child):
                        # convert the child
                        new_child = save_obj(ch, checked)
                        new_children.append(new_child)
    
                    children.append((relation[0], new_children))
                else:
                    new_child = save_obj(child, checked)
                    children.append((relation[0], new_child))
    
        # get the model of the object passed
        model = get_model(obj.__mapper__.mapped_table.name)
        new_obj = get_obj(obj, model)
    
        # set all the children in this object
        for child in children:
            if child[1]:
                setattr(new_obj, child[0], child[1])
    
        checked.append(new_obj)
    
        session.add(new_obj)
        return new_obj
    
    
    def get_model(table_name):
        # get the right model for this object
        for table in db.Model._decl_class_registry.values():
            if hasattr(table, '__tablename__') and table.__tablename__ == table_name:
                return table
        return None
    
    
    def create_new_obj(obj, model):
        params = obj.__dict__.copy()
        params.pop('_sa_instance_state')
        return model(**params)
    
    
    def get_obj(child, model):
        # check if the object is already stored in the db
        child_in_db = session.query(model).get(child.id)
    
        if child_in_db:
            return child_in_db
    
        # check if the object is already in the session
        for s in session.new:
            if type(s) == model and s.id == child.id:
                return s
    
        return create_new_obj(child, model)