Flux转Mono next()

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class TestFindResult {
  private static final Map<String, String> templates;
  private static final int sleep = 1000;

  static {
    templates = new LinkedHashMap<>();
    templates.put("aDB", "a");
    templates.put("bDB", "b");
    templates.put("cDB", "c");
  }

  public Mono<String> findResult(Function<String, Mono<String>> query) {
    return Flux.fromIterable(templates.values())
        .flatMap(query)
        .next()
        .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
        .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new);
  }

  public static void main(String[] args) {
    TestFindResult test = new TestFindResult();
    Function<String, Mono<String>> query = (value) -> {
      try {
        Thread.sleep(sleep); // mock DB query
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info(
          "Thread id:{}, Thread name:{}, value:{}, used ms:{}",
          Thread.currentThread().getId(),
          Thread.currentThread().getName(),
          value,
          sleep);
      return Mono.just(value);
    };
    System.out.println(test.findResult(query).subscribe());
  }
}
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class TestFindMongo {
  private static final Map<String, String> templates;
  private static final int sleep = 1000;

  static {
    templates = new LinkedHashMap<>();
    templates.put("aDB", "a");
    templates.put("bDB", "b");
    templates.put("cDB", "c");
  }

  public Mono<String> findMongo() {
    StopWatch stopWatch = StopWatch.createStarted();
    return Flux.fromIterable(templates.entrySet())
        .filterWhen(
            template -> {
              String key = template.getKey();
              String value = template.getValue();
              try {
                Thread.sleep(sleep); // mock DB query
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              log.info(
                  "Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}",
                  Thread.currentThread().getId(),
                  Thread.currentThread().getName(),
                  key,
                  value,
                  sleep);
              return Mono.just(value.equals("b"));
            })
        .next()
        .doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey()))
        .map(Entry::getValue)
        .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
        .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new)
        .doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime()));
  }

  public static void main(String[] args) {
    TestFindMongo test = new TestFindMongo();
    System.out.println(test.findMongo().subscribe());
  }
}
import static org.springframework.http.HttpStatus.*;

import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;

public class MultipleUpstreamException extends ResponseStatusException {

  private static final String MULTILPLE_UPSTREAM_MATCH_ERR =
      "Your query contains properties matching multiple upstreams. "
          + "Data for multiple upstreams can‘t be returned in one query. "
          + "Please either specify upstream by providing publisherSystem "
          + "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) "
          + "and region or request deal properties matching only one upstream";

  MultipleUpstreamException() {
    super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR);
  }

  /**
   * This constructor has syntax adapted to Mono API
   *
   * @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one
   *     elements
   * @see Mono#onErrorMap(Class, java.util.function.Function))
   */
  MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) {
    this();
  }
}

相关推荐