wordpress tag 排序,seo工作流程,wordpress 批量创建目录结构,理解wordpress轮翻图代码分布式系统中某些节点任务当满足某个条件时才允许继续运行#xff0c;如果不满足则当前节点需要等待。这个时候就需要一个屏障来阻止节点的处理。ZooKeeper Barrier是ZooKeeper提供的一种用于分布式环境中实现同步和协调的机制。具体逻辑就是#xff1a;
1、检测某个barrier…分布式系统中某些节点任务当满足某个条件时才允许继续运行如果不满足则当前节点需要等待。这个时候就需要一个屏障来阻止节点的处理。ZooKeeper Barrier是ZooKeeper提供的一种用于分布式环境中实现同步和协调的机制。具体逻辑就是
1、检测某个barrier node是否存在
2、如果屏障节点不存在则屏障不存在可以继续执行
3、如果屏障存在则需要watcher屏障节点的删除事件当屏障节点删除当前程序才可继续删除之前当前程序一直阻塞等待。
这里使用Curator框架API写一个简单的样例程序
CuratorFramework client CuratorFrameworkFactory.newClient(localhost:2181, new ExponentialBackoffRetry(1000, 3));
client.start();
String path /barrier;
//创建屏障节点
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
//另一个业务执行逻辑完成后删除屏障
new Thread(()-{try {Thread.sleep(2000);client.delete().forPath(path);} catch (Exception e) {e.printStackTrace();}
}).start();
Object obj new Object();
//主线程wathcer方式监听屏障删除事件
Stat stat client.checkExists().usingWatcher(new Watcher() {Overridepublic void process(WatchedEvent event) {if (event.getType() Event.EventType.NodeDeleted) {System.out.println(节点删除);//节点删除 通知主线程synchronized (obj){obj.notifyAll();}}}
}).forPath(path);
//主线程同步等待屏障删除
synchronized (obj){obj.wait();
}System.out.println(barrier delete);这里最后主线程会等待另一个线程执行完成才继续。这里在同一个程序里模拟同时运行两个程序可能更直观。
另外Curator的recipes也提供的对barrier的封装DistributedBarrier类。
使用例子
CuratorFramework client CuratorFrameworkFactory.newClient(localhost:2181, new RetryOneTime(1));
try {client.start();final DistributedBarrier barrier new DistributedBarrier(client, /barrier);//添加屏障barrier.setBarrier();new Thread(()-{try {Thread.sleep(2000);//屏障移除barrier.removeBarrier();}catch (Exception e) {e.printStackTrace();}}).start();//阻塞等待屏障移除barrier.waitOnBarrier(10, TimeUnit.SECONDS);System.out.println(end);
} catch (Exception e) {e.printStackTrace();
} finally {client.close();
}其内部阻塞原理还是使用的基础的waitnotify机制。这里封装方法可以设置等待时间。
Double Barriers
double barriers即多屏障。在某些情况下需要多个条件同时满足程序才可以继续。比如批量任务并行分成5个线程任务去做同一阶段工作。5个线程都执行完成才可进入下一阶段。
还是使用Curator封装的DistributedDoubleBarrier来演示
CuratorFramework client CuratorFrameworkFactory.newClient(localhost:2181, new RetryOneTime(1));
client.start();
ExecutorService executor Executors.newFixedThreadPool(5);
int quantity 5;//屏障数量
String barrierPath /barrier1;//屏障节点路径
for (int i 0; i 5; i) {executor.execute(()-{DistributedDoubleBarrier barrier new DistributedDoubleBarrier(client,barrierPath,quantity);try {int time 1000*new Random().nextInt(10);System.out.println(time sleeps for enter,Thread.currentThread().getName());Thread.sleep(time);/**当前参与者执行完前置逻辑进入屏障等待其它参与者到达阻塞等待当所有参与者都到达屏障点后屏障会通知所有参与者继续执行解除阻塞*/barrier.enter(10,TimeUnit.SECONDS);System.out.println(System.currentTimeMillis() do sth,Thread.currentThread().getName());time 1000*new Random().nextInt(10);System.out.println(time sleeps for leave,Thread.currentThread().getName());Thread.sleep(time);/**完成同步操作后调用leave()方法告知屏障该参与者已经离开屏障点并等待其他参与者也离开当所有参与者都离开屏障点后屏障会通知所有参与者继续执行后续操作*/barrier.leave();//删除屏障System.out.println(System.currentTimeMillis() leave,Thread.currentThread().getName());} catch (Exception e) {e.printStackTrace();}});
}executor.shutdown();这里定义了5个屏障所有的程序会在barrier.enter()处阻塞等待直到所有的线程都执行到该方法才会继续执行。这时候查看/barrier1节点会发现其下有5个uuid类型的子节点和一个ready节点。每个enter的线程创建了一个子节点barrier判断达到屏障数量时自动创建一个ready节点。
[zk: localhost:2181(CONNECTED) 20] ls /barrier1
[0ffbe0f0-0bf6-4098-a494-912ce57d8f5f, 10e54092-9909-4a0a-a764-774e55584b1d, 4b72c6bb-bff4-4bf4-9762-dbb56daaaf87, 8d8ecafa-31a8-4799-8b49-35f61098a05f, cbe75b72-8fdb-48dd-9620-2d34b59d411e, ready]后面leave()方法离开屏障点又是一个阻塞点屏障会等到所有的参与者都调用leave方法后才会通知所有参与者继续执行。最后 enter()-leave()方法之间是多个参与者同步操作业务逻辑。