代码之家  ›  专栏  ›  技术社区  ›  user6354073

从其他可观察对象获取值的Rx可观察对象

  •  2
  • user6354073  · 技术社区  · 7 年前

    我的viewModel有一个名为 rx_fetchItems(for:) 这完成了从后端获取相关内容的繁重任务,并返回 Observable<[Item]> .

    我的目标是提供名为 collectionItems ,最后一个发出的元素从返回 ,为我的collectionView提供数据。

    Daniel T提供了我可能使用的解决方案:

    protocol ServerAPI {
        func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
    }
    
        struct ViewModel {
    
            let collectionItems: Observable<[Item]>
            let error: Observable<Error>
    
            init(controlValue: Observable<Int>, api: ServerAPI) {
                let serverItems = controlValue
                    .map { ItemCategory(rawValue: $0) }
                    .filter { $0 != nil }.map { $0! } // or use a `filterNil` operator if you already have one implemented.
                    .flatMap { api.rx_fetchItems(for: $0)
                        .materialize()
                    }
                    .filter { $0.isCompleted == false }
                    .shareReplayLatestWhileConnected()
    
                collectionItems = serverItems.filter { $0.element != nil }.dematerialize()
                error = serverItems.filter { $0.error != nil }.map { $0.error! }
            }
    
        }
    

    class FirebaseAPI {
    
        private let session: URLSession
    
        init() {
            self.session = URLSession.shared
        }
    
        /// Responsible for Making actual API requests & Handling response
        /// Returns an observable object that conforms to JSONable protocol.
        /// Entities that confrom to JSONable just means they can be initialized with json.
        func rx_fireRequest<Entity: JSONable>(_ endpoint: FirebaseEndpoint, ofType _: Entity.Type ) -> Observable<[Entity]> {
    
            return Observable.create { [weak self] observer in
                self?.session.dataTask(with: endpoint.request, completionHandler: { (data, response, error) in
    
                    /// Parse response from request.
                    let parsedResponse = Parser(data: data, response: response, error: error)
                        .parse()
    
                    switch parsedResponse {
    
                    case .error(let error):
                        observer.onError(error)
                        return
    
                    case .success(let data):
    
                        var entities = [Entity]()
    
                        switch endpoint.method {
    
                        /// Flatten JSON strucuture to retrieve a list of entities.
                        /// Denoted by 'GETALL' method.
                        case .GETALL:
    
                            /// Key (underscored) is unique identifier for each entity, which is not needed here.
                            /// value is k/v pairs of entity attributes.
                            for (_, value) in data {
                                if let value = value as? [String: AnyObject], let entity = Entity(json: value) {
                                    entities.append(entity)
                                }
                            }
    
                            // Need to force downcast for generic type inference.
                            observer.onNext(entities as! [Entity])
                            observer.onCompleted()
    
                        /// All other methods return JSON that can be used to initialize JSONable entities 
                        default:
                            if let entity = Entity(json: data) {
                            observer.onNext([entity] as! [Entity])
                            observer.onCompleted()
                        } else {
                            observer.onError(NetworkError.initializationFailure)
                            }
                        }
                    }
                }).resume()
                return Disposables.create()
            }
        }
    }
    

    最重要的是 rx_fireRequest 方法是它接受一个 FirebaseEndpoint .

    /// Conforms to Endpoint protocol in extension, so one of these enum members will be the input for FirebaseAPI's `fireRequest` method.
    
    enum FirebaseEndpoint {
    
        case saveUser(data: [String: AnyObject])
        case fetchUser(id: String)
        case removeUser(id: String)
    
        case saveItem(data: [String: AnyObject])
        case fetchItem(id: String)
        case fetchItems
        case removeItem(id: String)
    
        case saveMessage(data: [String: AnyObject])
        case fetchMessages(chatroomId: String)
        case removeMessage(id: String)
    
    }
    

    FirebaseEndpoint 进入内部方法 FirebaseAPI 。在每个方法中,调用

    如果这有助于更好的服务器API设计,我希望做出这样的改变。因此,一个简单的问题是,这个重构是否会改进我的整体API设计,以及它如何与ViewModels交互。我意识到这正在演变为代码审查。

    此外……这里是该协议方法的实现及其助手:

     func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>  {
            // fetched items returns all items in database as Observable<[Item]>
            let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
            switch category {
            case .Local:
                let localItems = fetchedItems
                .flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
                    return self!.rx_localItems(items: itemList)
                }
    
                return localItems
    
                // TODO: Handle other cases like RecentlyAdded, Trending, etc..
            }
        }
    
        // Helper method to filter items for only local items nearby user.
        private func rx_localItems(items: [Item]) -> Observable<[Item]> {
            return Observable.create { observable in
                observable.onNext(items.filter { $0.location == "LA" })
                observable.onCompleted()
                return Disposables.create()
            }
        }
    

    3 回复  |  直到 7 年前
        1
  •  1
  •   alephao    7 年前

    我知道很难开始理解RxSwift

    Subject s或 Variable s作为 Observable Driver s作为 视图模型

    下面是一个重构代码的示例

    视图模型

    // Inputs
    let didSelectItemCategory: PublishSubject<ItemCategory> = .init()
    
    // Outputs
    let items: Observable<[Item]>
    
    init() {
        let client = FirebaseAPI()
    
        let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
    
        self.items = didSelectItemCategory
            .withLatestFrom(fetchedItems, resultSelector: { itemCategory, fetchedItems in
                switch itemCategory {
                case .Local:
                    return fetchedItems.filter { $0.location == "Los Angeles" }
                default: return []
                }
            })
    }
    

    segmentedControl.rx.value
        .map(ItemCategory.init(rawValue:))
        .startWith(.Local)
        .bind(to: viewModel.didSelectItemCategory)
        .disposed(by: disposeBag)
    
    viewModel.items
        .subscribe(onNext: { items in
            // Do something
        })
        .disposed(by: disposeBag)
    
        2
  •  0
  •   Daniel T.    7 年前

    我认为你所面临的问题是,你只是在观察范式上走了一半,这让你很失望。试着一路接受它,看看这是否有帮助。例如:

    protocol ServerAPI {
        func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
    }
    
    struct ViewModel {
    
        let collectionItems: Observable<[Item]>
        let error: Observable<Error>
    
        init(controlValue: Observable<Int>, api: ServerAPI) {
            let serverItems = controlValue
                .map { ItemCategory(rawValue: $0) }
                .filter { $0 != nil }.map { $0! } // or use a `filterNil` operator if you already have one implemented.
                .flatMap { api.rx_fetchItems(for: $0)
                    .materialize()
                }
                .filter { $0.isCompleted == false }
                .shareReplayLatestWhileConnected()
    
            collectionItems = serverItems.filter { $0.element != nil }.dematerialize()
            error = serverItems.filter { $0.error != nil }.map { $0.error! }
        }
    }
    

    编辑以处理评论中提到的问题。现在需要传入具有 rx_fetchItems(for:) 方法您应该有多个这样的对象:一个指向服务器,另一个不指向任何服务器,而是返回固定数据,以便您可以测试任何可能的响应,包括错误。(视图模型不应直接与服务器对话,而应通过中介进行对话。。。

    materialize 将错误事件包装为包含错误对象的正常事件的运算符。这样可以防止网络错误导致整个系统关闭。


    为了回答您问题中的更改……您可以简单地使FirebaseAPI符合ServerAPI:

    extension FirebaseAPI: ServerAPI {
        func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>  {
            // fetched items returns all items in database as Observable<[Item]>
            let fetchedItems = self.rx_fireRequest(.fetchItems, ofType: Item.self)
            switch category {
            case .Local:
                let localItems = fetchedItems
                    .flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
                        return self!.rx_localItems(items: itemList)
                }
    
                return localItems
    
                // TODO: Handle other cases like RecentlyAdded, Trending, etc..
            }
        }
    
        // Helper method to filter items for only local items nearby user.
        private func rx_localItems(items: [Item]) -> Observable<[Item]> {
            return Observable.create { observable in
                observable.onNext(items.filter { $0.location == "LA" })
                observable.onCompleted()
                return Disposables.create()
            }
        }
    }
    

    您可能应该更改 ServerAPI 在这一点上 FetchItemsAPI

        3
  •  0
  •   Ian    7 年前

    您在这里遇到了一个棘手的情况,因为您的可观察对象可能会抛出一个错误,一旦它确实抛出一个错误,可观察序列就会出错,并且不会再发出任何事件。因此,为了处理后续的网络请求,您必须采用当前采用的方法重新分配任务。然而,这通常不利于驱动UI元素,例如集合视图,因为每次都必须绑定到重新分配的可观察对象。在驱动UI元素时,您应该倾向于保证不会出错的类型(即变量和驱动程序)。你可以 Observable<[Item]> 成为 let items = Variable<[Item]>([]) let errorMessage = Variable<String?>(nil) ,对于来自网络请求的错误消息,然后可以将errorMessage字符串绑定到标签或类似的东西以显示错误消息。