当前位置: 首页 > news >正文

专业性行业网站有哪些更改wordpress地址

专业性行业网站有哪些,更改wordpress地址,网站怎么做支付,网站开发常用图标在Java中#xff0c;经典的生产者#xff0d;消费者模式相对简单#xff0c;因为我们有java.util.concurrent.BlockingQueue 。 为了避免繁忙的等待和容易出错的手动锁定#xff0c;我们只需利用put()和take() 。 如果队列已满或为空#xff0c;它们都将阻塞。 我们需要的… 在Java中经典的生产者消费者模式相对简单因为我们有java.util.concurrent.BlockingQueue 。 为了避免繁忙的等待和容易出错的手动锁定我们只需利用put()和take() 。 如果队列已满或为空它们都将阻塞。 我们需要的是一堆线程共享对同一队列的引用一些正在生产而其他正在消耗。 当然队列必须具有有限的容量否则如果生产者的表现优于消费者我们很快就会用光内存。 格雷格·扬Greg Young在波兰Devoxx期间对这条规则的强调不够 永远不要创建无限队列 使用 这是最简单的例子。 首先我们需要一个将对象放在共享队列中的生产者 import lombok.Value; import lombok.extern.slf4j.Slf4j;Slf4j Value class Producer implements Runnable {private final BlockingQueueUser queue;Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user new User(User System.currentTimeMillis());log.info(Producing {}, user);queue.put(user);TimeUnit.SECONDS.sleep(1);}} catch (Exception e) {log.error(Interrupted, e);}} } 生产者只需每秒将User类的实例无论它是什么发布到给定队列。 显然在现实生活中将User在队列中是系统中某些操作例如用户登录的结果。 同样消费者从队列中获取新项目并进行处理 Slf4j Value class Consumer implements Runnable {private final BlockingQueueUser queue;Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user queue.take();log.info(Consuming: {}, user);}} catch (Exception e) {log.error(Interrupted, e);}} } 再次在现实生活中处理将意味着存储在数据库中或对用户运行某些欺诈检测。 我们使用队列将处理线程与消耗线程解耦例如减少延迟。 为了运行一个简单的测试让我们启动几个生产者和消费者线程 BlockingQueueUser queue new ArrayBlockingQueue(1_000); final ListRunnable runnables Arrays.asList(new Producer(queue),new Producer(queue),new Consumer(queue),new Consumer(queue),new Consumer(queue) );final ListThread threads runnables.stream().map(runnable - new Thread(runnable, threadName(runnable))).peek(Thread::start).collect(toList());TimeUnit.SECONDS.sleep(5); threads.forEach(Thread::interrupt);//...private static String threadName(Runnable runnable) {return runnable.getClass().getSimpleName() - System.identityHashCode(runnable); } 我们有2个生产者和3个消费者似乎一切正常。 在现实生活中您可能会有一些隐式生产者线程例如HTTP请求处理线程。 在使用者方面您很可能会使用线程池。 这种模式效果很好但是特别是在消费方面是很底层的。 介绍 本文的目的是介绍一种抽象其行为类似于生产者方的队列但表现为来自消费者方的RxJava的Observable 。 换句话说我们可以将添加到队列中的对象视为可以在客户端映射过滤撰写等的流。 有趣的是这不再是排在后面的队列。 ObservableQueueT仅将所有新对象直接转发给订阅的使用者并且在没有人监听“可观察到的” 热 的情况下不缓冲事件。 ObservableQueueT本身并不是队列它只是一个API与另一个API之间的桥梁。 它类似于java.util.concurrent.SynchronousQueue 但是如果没有人对使用感兴趣则将对象简单地丢弃。 这是第一个实验性实现。 这只是一个玩具代码不要认为它已准备就绪。 另外我们稍后将对其进行简化 public class ObservableQueueT implements BlockingQueueT, Closeable {private final SetSubscriber? super T subscribers Collections.newSetFromMap(new ConcurrentHashMap());private final ObservableT observable Observable.create(subscriber - {subscriber.add(new Subscription() {Overridepublic void unsubscribe() {subscribers.remove(subscriber);}Overridepublic boolean isUnsubscribed() {return false;}});subscribers.add(subscriber);});public ObservableT observe() {return observable;}Overridepublic boolean add(T t) {return offer(t);}Overridepublic boolean offer(T t) {subscribers.forEach(subscriber - subscriber.onNext(t));return true;}Overridepublic T remove() {return noSuchElement();}Overridepublic T poll() {return null;}Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}Overridepublic T peek() {return null;}Overridepublic void put(T t) throws InterruptedException {offer(t);}Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException(Use observe() instead);}Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}Overridepublic int remainingCapacity() {return 0;}Overridepublic boolean remove(Object o) {return false;}Overridepublic boolean containsAll(Collection? c) {return false;}Overridepublic boolean addAll(Collection? extends T c) {c.forEach(this::offer);return true;}Overridepublic boolean removeAll(Collection? c) {return false;}Overridepublic boolean retainAll(Collection? c) {return false;}Overridepublic void clear() {}Overridepublic int size() {return 0;}Overridepublic boolean isEmpty() {return true;}Overridepublic boolean contains(Object o) {return false;}Overridepublic IteratorT iterator() {return Collections.emptyIterator();}Overridepublic Object[] toArray() {return new Object[0];}Overridepublic T T[] toArray(T[] a) {return a;}Overridepublic int drainTo(Collection? super T c) {return 0;}Overridepublic int drainTo(Collection? super T c, int maxElements) {return 0;}Overridepublic void close() throws IOException {subscribers.forEach(rx.Observer::onCompleted);} } 关于它有两个有趣的事实 我们必须跟踪所有订户即愿意接收新商品的消费者。 如果其中一个订阅者不再感兴趣我们必须删除该订阅者否则会发生内存泄漏请继续阅读 此队列的行为就好像它始终为空。 它永远不会保存任何项目–当您将某些内容放入此队列时它会自动传递给订阅者并被遗忘 从技术上讲此队列是无界的这意味着您可以根据需要放置任意数量的项目。 但是由于将项目传递给所有订户如果有并立即丢弃因此此队列实际上始终为空请参见上文 生产者可能仍会生成太多事件而消费者可能无法跟上这一步– RxJava现在具有背压支持本文未介绍。 假设我正确实现了队列协定生产者可以像使用其他BlockingQueueT一样使用ObservableQueueT 。 但是消费者看起来更轻巧更聪明 final ObservableQueueUser users new ObservableQueue(); final ObservableUser observable users.observe();users.offer(new User(A)); observable.subscribe(user - log.info(User logged in: {}, user)); users.offer(new User(B)); users.offer(new User(C)); 上面的代码仅打印B和C 。 由于ObservableQueue会在没有人监听的情况下丢弃项目因此设计会丢失A 。 显然 Producer类现在使用users队列。 一切正常您可以随时调用users.observe()并应用数十个Observable运算符之一。 但是有一个警告默认情况下RxJava不执行任何线程处理因此消耗与产生线程在同一线程中发生 我们失去了生产者-消费者模式的最重要特征即线程去耦。 幸运的是RxJava中的所有内容都是声明性的线程调度也是如此 users.observe().observeOn(Schedulers.computation()).forEach(user -log.info(User logged in: {}, user)); 现在让我们看一下RxJava的真正功能。 假设您要计算每秒登录的用户数其中每个登录都作为事件放入队列中 users.observe().map(User::getName).filter(name - !name.isEmpty()).window(1, TimeUnit.SECONDS).flatMap(Observable::count).doOnCompleted(() - log.info(System shuts down)).forEach(c - log.info(Logins in last second: {}, c)); 性能也是可以接受的这样的队列每秒可以在我的一个订户的笔记本电脑上接受约300万个对象。 将此类视为使用队列到现代反应世界的旧系统的适配器。 可是等等 使用ObservableQueueT很容易但是使用subscribers同步集的实现似乎太底层了。 幸运的是有SubjectT, T 。 Subject是Observable “另一面” –您可以将事件推送到Subject但是它仍然实现Observable 因此您可以轻松地创建任意Observable 。 使用Subject实现之一 ObservableQueue外观如何 public class ObservableQueueT implements BlockingQueueT, Closeable {private final SubjectT, T subject PublishSubject.create();public ObservableT observe() {return subject;}Overridepublic boolean add(T t) {return offer(t);}Overridepublic boolean offer(T t) {subject.onNext(t);return true;}Overridepublic void close() throws IOException {subject.onCompleted();}Overridepublic T remove() {return noSuchElement();}Overridepublic T poll() {return null;}Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}Overridepublic T peek() {return null;}Overridepublic void put(T t) throws InterruptedException {offer(t);}Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException(Use observe() instead);}Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}Overridepublic int remainingCapacity() {return 0;}Overridepublic boolean remove(Object o) {return false;}Overridepublic boolean containsAll(Collection? c) {return false;}Overridepublic boolean addAll(Collection? extends T c) {c.forEach(this::offer);return true;}Overridepublic boolean removeAll(Collection? c) {return false;}Overridepublic boolean retainAll(Collection? c) {return false;}Overridepublic void clear() {}Overridepublic int size() {return 0;}Overridepublic boolean isEmpty() {return true;}Overridepublic boolean contains(Object o) {return false;}Overridepublic IteratorT iterator() {return Collections.emptyIterator();}Overridepublic Object[] toArray() {return new Object[0];}Overridepublic T T[] toArray(T[] a) {return a;}Overridepublic int drainTo(Collection? super T c) {return 0;}Overridepublic int drainTo(Collection? super T c, int maxElements) {return 0;}} 上面的实现更加简洁我们完全不必担心线程同步。 翻译自: https://www.javacodegeeks.com/2015/07/consuming-java-util-concurrent-blockingqueue-as-rx-observable.html
http://www.yutouwan.com/news/369141/

相关文章:

  • 南通企业建站模板上海手机网站建设公司
  • 做开箱的网站网站使用arial字体下载
  • 建设工程信息哪个网站有详细信息wordpress下一篇
  • 自己搭建网站需要什么产品网站定制
  • 南宁百度网站公司吗折800网站模板
  • 教人做策划的网站企业在什么网站推广
  • 做亚马逊外国网站需要语言好吗怎么下载网站程序
  • 抖音小程序代理郑州网站制作选择乐云seo
  • 在线做试卷的网站17我们一起做网站
  • wordpress整站无刷新网站登录后不显示内容
  • 无锡建设网站wordpress如何重装
  • 高端大气上档次的网站模板网站服务器设置
  • 建设一个一般网站需要多少时间建设银行博士后招聘网站
  • 俄罗斯网站建设公司网站备案后在百度上多长时间可以搜索到
  • 建设阅读网站的研究意义网站开发项目的心得体会
  • 网站导航html源码天津网站开发网站
  • 拟定网站优化方案免费国外服务器地址
  • 广州网站制青岛正规品牌网站制作策划
  • 网站seo具体怎么做个人网站建站教程
  • c#网站开发网易云课堂百度云下载做古建的那些网站比较适合
  • 河北雄安建设投资集团网站当今做啥网站致富
  • 宁夏电力建设工程公司外部网站杭州做营销型网站
  • 外贸网站建站莆田企业自助建站
  • 营销型企业网站策划方案网站全网建设 莱芜
  • 网站跳出率的衡量标准互动网站
  • 哪有免费的网站网站建设对企业经营
  • 个人做电商网站赚钱吗e点互动网站
  • 公司网站建设的改进的建议好的网站设计特点
  • 全网营销型网站 新闻青岛电子商务网站建设
  • 简单网站建设软件如何建立官方网站