代码之家  ›  专栏  ›  技术社区  ›  amin mahmoudi

在android worker(服务管理器)中disposablesubscriber的onnext()中不传递任何内容的flowable

  •  0
  • amin mahmoudi  · 技术社区  · 5 年前

    我想在android中上传文件并显示进度上传 暂停工作线程并等待完成上传文件任务。

    上传到服务器成功,发射器发出当前值,但可处理订阅服务器 不要打电话 onNext() 方法!!

    下面的代码展示了如何上传文件并发出当前进度

      @Override
    public Flowable<Double> uploadFile(Message message, String ticket_id) {
         ///create Flowable
        return Flowable.create((FlowableEmitter<Double> emitter) -> {
            try {
                 //call api
                Timber.w("upload file");
                ResponseBody response = api.postFile(
                        createMultipartBody(message, emitter),
                        RequestBody.create(MediaType.parse("text/plain"), "file"),
                        RequestBody.create(MediaType.parse("text/plain"), message.getFileType()),
                        ticket_id,
                        getApiKey()
                )
                        .doOnError(emitter::tryOnError)
                        .blockingGet();
                emitter.onComplete();
            } catch (Exception e) {
                emitter.tryOnError(e);
            }
        }, BackpressureStrategy.LATEST);
    
    
    }
    
    private MultipartBody.Part createMultipartBody(Message message, FlowableEmitter<Double> emitter) {
        File file;
        if (message.getFileType().equals("File")) {
            file = new File(message.getUrlFile());
    
            return MultipartBody.Part.createFormData("file", file.getName(),
                    createCountingRequestBody(file, emitter));
        } else {
            file = new File(message.getImageUrl());
    
            return MultipartBody.Part.createFormData("file", file.getName(),
                    createCountingRequestBody(file, emitter));
    
        }
    }
    
    
    private RequestBody createCountingRequestBody(File file, FlowableEmitter<Double> emitter) {
        RequestBody requestBody = createRequestBody(file);
        return new CountingRequestBody(requestBody, (bytesWritten, contentLength) -> {
            emitter.onNext( (1.0 * bytesWritten) / contentLength);
        });
    }
    
    
    private RequestBody createRequestBody(File file) {
        return RequestBody.create(MediaType.parse("multipart/form-data"), file);
    }
    

    这里是我的工人阶级 uploadFile() 方法

    public class SenderService extends Worker {
    // Define the parameter keys:
    public static final String FILE_ADDRESS = "file_address";
    public static final String FILE_TYPE = "type";
    public static final String TICKET_ID = "ticket_id";
    public static final String MESSAGE_ID = "message_id";
    private final Message message;
    private Data data;
    @Inject
    Repository repository;
    private CountDownLatch latch;
    private Result result;
    private String ticketId;
    
    
    public SenderService(@NonNull Context context, @NonNull WorkerParameters workerParams) {
        super(context, workerParams);
        setupComponent(((App) context.getApplicationContext()).getAppComponent());
        message=new Message();
        data=workerParams.getInputData();
    
        message.setFileType(data.getString(FILE_TYPE));
        message.setUrlFile(data.getString(FILE_ADDRESS));
        message.setId(data.getString(MESSAGE_ID));
        ticketId=data.getString(TICKET_ID);
        Timber.w("service called");
        latch = new CountDownLatch(1);
    
    
    }
    
    @NonNull
    @Override
    public Result doWork() {
      //check is  it new task or a failure task 
         if (message.getId()==null) {
        try(Realm realm=Realm.getDefaultInstance()){
                TicketDetail ticket = realm.where(TicketDetail.class).equalTo("id", ticketId).findFirst();
                String message_id=UUID.randomUUID().toString();
                message.setId(message_id);
                data = new Data.Builder()
                        .putString(SenderService.FILE_ADDRESS, message.getUrlFile())
                        .putString(SenderService.FILE_TYPE, message.getId())
                        .putString(SenderService.TICKET_ID,ticketId)
                        .putString(SenderService.MESSAGE_ID,message_id)
                        .build();
                realm.executeTransaction(realm1 -> {
                    realm1.insert(message);
                    RealmList<Message> messages = ticket
                            .getMessages();
                    if (messages != null) {
                        messages.add(message);
                        ticket.setMessages(messages);
                    } else {
                        messages = new RealmList<>();
                        messages.add(message);
                        ticket.setMessages(messages);
                    }
                });
    
            }
        }
        //call repo and upload file
        repository.uploadFile(message,ticketId)
                .subscribeOn(Schedulers.io())
                .onBackpressureLatest()
                    .subscribe(new DisposableSubscriber<Double>() {
                        @Override
                        protected void onStart() {
                            super.onStart();
                        }
    
                        @Override
            public void onNext(Double aDouble) {
                  try (Realm realm = Realm.getDefaultInstance()) {
                            realm.executeTransaction(realm1 -> {
                                realm1.where(TicketDetail.class).equalTo("id", ticketId).findFirst().getMessages()
                                        .where().equalTo("id", message.getId()).findFirst().setProgress(aDouble);
                            });
    
                        }
                Timber.w("onNext:" +aDouble);
            }
    
            @Override
            public void onError(Throwable t) {
                Timber.e(t);
                result = Result.failure(data);
                latch.countDown();
            }
    
            @Override
            public void onComplete() {
                Timber.w("onComplete");
                repository.getTicketDetail(ticketId);
                result = Result.success();
                latch.countDown();
            }
    
        });
    
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
    
    
    
    protected void setupComponent(@NonNull AppComponent parentComponent) {
        DaggerSenderServiceComponent.builder()
                .appComponent(parentComponent)
                .senderServiceModule(new SenderServiceModule(this))
                .build()
                .inject(this);
    }
    
    
    
    
    }
    

    回答 : 我用 throttleLatest(200,TimeUnit.MILLISECONDS) 我的问题解决了:) throttleTest发出反应源发出的下一项,然后定期发出最新项以控制单位时间内更新进度值的数目:)

    0 回复  |  直到 5 年前