西安给公司做网站,网站更改文章标题,一般一个网站从建设到运营要多久,国内网站开发语言Go和Java实现观察者模式
在监控系统中#xff0c;我们需要采集监控指标的信息#xff0c;假设当采集的指标信息超过阈值时我们需要对该监控指标持久化到
数据库中并且进行告警。
本文通过指标采集持久化和告警来说明观察者模式的使用#xff0c;使用Go语言和Java语言实现…Go和Java实现观察者模式
在监控系统中我们需要采集监控指标的信息假设当采集的指标信息超过阈值时我们需要对该监控指标持久化到
数据库中并且进行告警。
本文通过指标采集持久化和告警来说明观察者模式的使用使用Go语言和Java语言实现。
1、观察者模式
观察者模式是一种行为型设计模式它定义了一种一对多的依赖关系当一个对象的状态发生改变时其所有依赖
者都会收到通知并自动更新。
当对象间存在一对多关系时则使用观察者模式。比如当一个对象被修改时则会自动通知依赖它的对象。观察
者模式属于行为型模式。 意图定义对象间的一种一对多的依赖关系当一个对象的状态发生改变时所有依赖于它的对象都得到通知 并被自动更新。 主要解决一个对象状态改变给其他对象通知的问题而且要考虑到易用和低耦合保证高度的协作。 何时使用一个对象目标对象的状态发生改变所有的依赖对象观察者对象都将得到通知进行广播 通知。 如何解决使用面向对象技术可以将这种依赖关系弱化。 关键代码在抽象类里有一个 ArrayList 存放观察者们。 应用实例1、拍卖的时候拍卖师观察最高标价然后通知给其他竞价者竞价。 2、西游记里面悟空请求菩 萨降服红孩儿菩萨洒了一地水招来一个老乌龟这个乌龟就是观察者他观察菩萨洒水这个动作。 优点1、观察者和被观察者是抽象耦合的。 2、建立一套触发机制。 缺点1、如果一个被观察者对象有很多的直接和间接的观察者的话将所有的观察者都通知到会花费很多时 间。 2、如果在观察者和观察目标之间有循环依赖的话观察目标会触发它们之间进行循环调用可能导致系 统崩溃。 3、观察者模式没有相应的机制让观察者知道所观察的目标对象是怎么发生变化的而仅仅只是知道 观察目标发生了变化。 使用场景 一个抽象模型有两个方面其中一个方面依赖于另一个方面。将这些方面封装在独立的对象中使它们可以各自 独立地改变和复用。 一个对象的改变将导致其他一个或多个对象也发生改变而不知道具体有多少对象将发生改变可以降低对象 之间的耦合度。 一个对象必须通知其他对象而并不知道这些对象是谁。 需要在系统中创建一个触发链A对象的行为将影响B对象B对象的行为将影响C对象……可以使用观察者 模式创建一种链式触发机制。 注意事项1、JAVA 中已经有了对观察者模式的支持类。 2、避免循环引用。 3、如果顺序执行某一观察者 错误会导致系统卡壳一般采用异步方式。 观察者模式包含以下几个核心角色 主题Subject也称为被观察者或可观察者它是具有状态的对象并维护着一个观察者列表。主题提供 了添加、删除和通知观察者的方法。 观察者Observer观察者是接收主题通知的对象。观察者需要实现一个更新方法当收到主题的通知 时调用该方法进行更新操作。 具体主题Concrete Subject具体主题是主题的具体实现类。它维护着观察者列表并在状态发生改变时 通知观察者。 具体观察者Concrete Observer具体观察者是观察者的具体实现类。它实现了更新方法定义了在收到 主题通知时需要执行的具体操作。
观察者模式通过将主题和观察者解耦实现了对象之间的松耦合。当主题的状态发生改变时所有依赖于它的观察
者都会收到通知并进行相应的更新。
2、Go实现观察者模式
package observer// ISubject通知者
type ISubject interface {Register(observer IObserver)Remove(observer IObserver)Notify(metric Metric)
}package observer// MetricSubject指标通知者
type MetricSubject struct {observers []IObserver
}type Metric struct {Name stringValue float64Time string
}func (metricSubject *MetricSubject) Register(observer IObserver) {metricSubject.observers append(metricSubject.observers, observer)
}func (metricSubject *MetricSubject) Remove(observer IObserver) {for i, ob : range metricSubject.observers {if ob observer {metricSubject.observers append(metricSubject.observers[:i], metricSubject.observers[i1:]...)}}
}func (metricSubject *MetricSubject) Notify(metric Metric) {for _, o : range metricSubject.observers {o.Update(metric)}
}package observer// IObserver观察者
type IObserver interface {Update(metric Metric)
}package observerimport fmt// alert观察者
type AlertObserver struct {
}func (alertObserver * AlertObserver) Update(metric Metric){fmt.Printf([%s] 指标 [%s] 的值为 [%.1f] 超过阈值,进行告警!\n, metric.Time, metric.Name, metric.Value)
}package observerimport fmt// db观察者
type DbObserver struct {
}func (dbObserver *DbObserver) Update(metric Metric) {fmt.Printf([%s] 指标 [%s] 的值为 [%.1f] 超过阈值,保存到数据库!\n, metric.Time, metric.Name, metric.Value)
}package mainimport (fmt. proj/observertime
)func main() {sub : MetricSubject{}alert : AlertObserver{}db : DbObserver{}sub.Register(alert)sub.Register(db)sub.Notify(Metric{Name: CPU, Value: 90, Time: time.Now().Format(2006-01-02 15:04:05)})sub.Notify(Metric{Name: Memory, Value: 80, Time: time.Now().Format(2006-01-02 15:04:05)})fmt.Println()// 后面不需要进行持久化了sub.Remove(db)sub.Notify(Metric{Name: CPU, Value: 90, Time: time.Now().Format(2006-01-02 15:04:05)})sub.Notify(Metric{Name: Memory, Value: 80, Time: time.Now().Format(2006-01-02 15:04:05)})
}# 程序输出
[2023-07-11 22:05:34] 指标 [CPU] 的值为 [90.0] 超过阈值,进行告警!
[2023-07-11 22:05:34] 指标 [CPU] 的值为 [90.0] 超过阈值,保存到数据库!
[2023-07-11 22:05:34] 指标 [Memory] 的值为 [80.0] 超过阈值,进行告警!
[2023-07-11 22:05:34] 指标 [Memory] 的值为 [80.0] 超过阈值,保存到数据库![2023-07-11 22:05:34] 指标 [CPU] 的值为 [90.0] 超过阈值,进行告警!
[2023-07-11 22:05:34] 指标 [Memory] 的值为 [80.0] 超过阈值,进行告警!3、Java实现观察者模式
package com.observer;import java.util.List;// ISubject通知者
public abstract class ISubject {ListIObserver observerList;abstract void Register(IObserver observer);abstract void Remove(IObserver observer);abstract void Notify(Metric metric);
}package com.observer;import java.util.ArrayList;// MetricSubject指标通知者
public class MetricSubject extends ISubject {public MetricSubject() {this.observerList new ArrayList();}Overridevoid Register(IObserver observer) {observerList.add(observer);}Overridevoid Remove(IObserver observer) {observerList.remove(observer);}Overridevoid Notify(Metric metric) {for (IObserver observer : observerList) {observer.Update(metric);}}
}package com.observer;// IObserver观察者
public interface IObserver {void Update(Metric metric);
}package com.observer;// db观察者
public class DbObserver implements IObserver {Overridepublic void Update(Metric metric) {String url [%s] 指标 [%s] 的值为 [%.1f] 超过阈值,保存到数据库!;System.out.println(String.format(url, metric.Time, metric.Name, metric.Value));}
}package com.observer;// alert观察者
public class AlertObserver implements IObserver {Overridepublic void Update(Metric metric) {String url [%s] 指标 [%s] 的值为 [%.1f] 超过阈值,进行告警!;System.out.println(String.format(url, metric.Time, metric.Name, metric.Value));}
}package com.observer;import java.text.SimpleDateFormat;
import java.util.Date;public class Test {public static void main(String[] args) {ISubject subject new MetricSubject();IObserver observer1 new DbObserver();IObserver observer2 new AlertObserver();subject.Register(observer1);subject.Register(observer2);subject.Notify(new Metric(CPU, 90, new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date())));subject.Notify(new Metric(Memory, 80, new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date())));System.out.println();// 后面不需要进行持久化了subject.Remove(observer1);subject.Notify(new Metric(CPU, 90, new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date())));subject.Notify(new Metric(Memory, 80, new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date())));}
}# 程序输出
[2023-07-12 20:42:04] 指标 [CPU] 的值为 [90.0] 超过阈值,保存到数据库!
[2023-07-12 20:42:04] 指标 [CPU] 的值为 [90.0] 超过阈值,进行告警!
[2023-07-12 20:42:04] 指标 [Memory] 的值为 [80.0] 超过阈值,保存到数据库!
[2023-07-12 20:42:04] 指标 [Memory] 的值为 [80.0] 超过阈值,进行告警![2023-07-12 20:42:04] 指标 [CPU] 的值为 [90.0] 超过阈值,进行告警!
[2023-07-12 20:42:04] 指标 [Memory] 的值为 [80.0] 超过阈值,进行告警!4、使用Go实现EventBus
我们实现一个支持以下功能的事件总线
1、异步不阻塞
2、支持任意参数值
package eventbusimport (fmtreflectsync
)// Bus
type Bus interface {Subscribe(topic string, handler interface{}) errorPublish(topic string, args ...interface{})
}// AsyncEventBus异步事件总线
type AsyncEventBus struct {handlers map[string][]reflect.Valuelock sync.Mutex
}// NewAsyncEventBus
func NewAsyncEventBus() *AsyncEventBus {return AsyncEventBus{handlers: map[string][]reflect.Value{},lock: sync.Mutex{},}
}// Subscribe订阅
func (bus *AsyncEventBus) Subscribe(topic string, f interface{}) error {bus.lock.Lock()defer bus.lock.Unlock()v : reflect.ValueOf(f)if v.Type().Kind() ! reflect.Func {return fmt.Errorf(handler is not a function)}handler, ok : bus.handlers[topic]if !ok {handler []reflect.Value{}}handler append(handler, v)bus.handlers[topic] handlerreturn nil
}// Publish发布
// 这里异步执行,并且不会等待返回结果
func (bus *AsyncEventBus) Publish(topic string, args ...interface{}) {handlers, ok : bus.handlers[topic]if !ok {fmt.Println(not found handlers in topic:, topic)return}params : make([]reflect.Value, len(args))for i, arg : range args {params[i] reflect.ValueOf(arg)}for i : range handlers {go handlers[i].Call(params)}
}package mainimport (fmt. proj/eventbustime
)func sub1(msg1, msg2 string) {time.Sleep(1 * time.Microsecond)fmt.Printf(sub1, %s %s\n, msg1, msg2)
}func sub2(msg1, msg2 string) {fmt.Printf(sub2, %s %s\n, msg1, msg2)
}func main() {bus : NewAsyncEventBus()bus.Subscribe(topic1, sub1)bus.Subscribe(topic1, sub2)bus.Publish(topic1, 1, 2)bus.Publish(topic1, a, b)time.Sleep(1 * time.Second)
}# 程序输出
sub2, a b
sub2, 1 2
sub1, 1 2
sub1, a b