800cb95821
TBD: Queue not converted yet Signed-off-by: Elliot Pahl <elliot.pahl@gmail.com>
184 lines
5.4 KiB
Go
184 lines
5.4 KiB
Go
package notifications
|
|
|
|
import (
|
|
"expvar"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
|
|
prometheus "github.com/docker/distribution/metrics"
|
|
events "github.com/docker/go-events"
|
|
"github.com/docker/go-metrics"
|
|
)
|
|
|
|
var (
|
|
// eventsCounter counts total events of incoming, success, failure, and errors
|
|
eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "endpoint")
|
|
// pendingGauge measures the pending queue size
|
|
pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "endpoint")
|
|
// statusCounter counts the total notification call per each status code
|
|
statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "endpoint")
|
|
)
|
|
|
|
// EndpointMetrics track various actions taken by the endpoint, typically by
|
|
// number of events. The goal of this to export it via expvar but we may find
|
|
// some other future solution to be better.
|
|
type EndpointMetrics struct {
|
|
Pending int // events pending in queue
|
|
Events int // total events incoming
|
|
Successes int // total events written successfully
|
|
Failures int // total events failed
|
|
Errors int // total events errored
|
|
Statuses map[string]int // status code histogram, per call event
|
|
}
|
|
|
|
// safeMetrics guards the metrics implementation with a lock and provides a
|
|
// safe update function.
|
|
type safeMetrics struct {
|
|
EndpointName string
|
|
EndpointMetrics
|
|
sync.Mutex // protects statuses map
|
|
}
|
|
|
|
// newSafeMetrics returns safeMetrics with map allocated.
|
|
func newSafeMetrics(name string) *safeMetrics {
|
|
var sm safeMetrics
|
|
sm.Statuses = make(map[string]int)
|
|
sm.EndpointName = name
|
|
return &sm
|
|
}
|
|
|
|
// httpStatusListener returns the listener for the http sink that updates the
|
|
// relevant counters.
|
|
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
|
|
return &endpointMetricsHTTPStatusListener{
|
|
safeMetrics: sm,
|
|
}
|
|
}
|
|
|
|
// eventQueueListener returns a listener that maintains queue related counters.
|
|
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
|
|
return &endpointMetricsEventQueueListener{
|
|
safeMetrics: sm,
|
|
}
|
|
}
|
|
|
|
// endpointMetricsHTTPStatusListener increments counters related to http sinks
|
|
// for the relevant events.
|
|
type endpointMetricsHTTPStatusListener struct {
|
|
*safeMetrics
|
|
}
|
|
|
|
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) success(status int, event events.Event) {
|
|
emsl.safeMetrics.Lock()
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++
|
|
emsl.Successes++
|
|
|
|
statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
|
|
eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1)
|
|
}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, event events.Event) {
|
|
emsl.safeMetrics.Lock()
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++
|
|
emsl.Failures++
|
|
|
|
statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
|
|
eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1)
|
|
}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) err(err error, event events.Event) {
|
|
emsl.safeMetrics.Lock()
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Errors++
|
|
|
|
eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1)
|
|
}
|
|
|
|
// endpointMetricsEventQueueListener maintains the incoming events counter and
|
|
// the queues pending count.
|
|
type endpointMetricsEventQueueListener struct {
|
|
*safeMetrics
|
|
}
|
|
|
|
func (eqc *endpointMetricsEventQueueListener) ingress(event events.Event) {
|
|
eqc.Lock()
|
|
defer eqc.Unlock()
|
|
eqc.Events++
|
|
eqc.Pending++
|
|
|
|
eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
|
|
pendingGauge.WithValues(eqc.EndpointName).Inc(1)
|
|
}
|
|
|
|
func (eqc *endpointMetricsEventQueueListener) egress(event events.Event) {
|
|
eqc.Lock()
|
|
defer eqc.Unlock()
|
|
eqc.Pending--
|
|
|
|
pendingGauge.WithValues(eqc.EndpointName).Dec(1)
|
|
}
|
|
|
|
// endpoints is global registry of endpoints used to report metrics to expvar
|
|
var endpoints struct {
|
|
registered []*Endpoint
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// register places the endpoint into expvar so that stats are tracked.
|
|
func register(e *Endpoint) {
|
|
endpoints.mu.Lock()
|
|
defer endpoints.mu.Unlock()
|
|
|
|
endpoints.registered = append(endpoints.registered, e)
|
|
}
|
|
|
|
func init() {
|
|
// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
|
|
// Ideally, we do more metrics through logging but we need some nice
|
|
// realtime metrics for queue state for now.
|
|
|
|
registry := expvar.Get("registry")
|
|
|
|
if registry == nil {
|
|
registry = expvar.NewMap("registry")
|
|
}
|
|
|
|
var notifications expvar.Map
|
|
notifications.Init()
|
|
notifications.Set("endpoints", expvar.Func(func() interface{} {
|
|
endpoints.mu.Lock()
|
|
defer endpoints.mu.Unlock()
|
|
|
|
var names []interface{}
|
|
for _, v := range endpoints.registered {
|
|
var epjson struct {
|
|
Name string `json:"name"`
|
|
URL string `json:"url"`
|
|
EndpointConfig
|
|
|
|
Metrics EndpointMetrics
|
|
}
|
|
|
|
epjson.Name = v.Name()
|
|
epjson.URL = v.URL()
|
|
epjson.EndpointConfig = v.EndpointConfig
|
|
|
|
v.ReadMetrics(&epjson.Metrics)
|
|
|
|
names = append(names, epjson)
|
|
}
|
|
|
|
return names
|
|
}))
|
|
|
|
registry.(*expvar.Map).Set("notifications", ¬ifications)
|
|
|
|
// register prometheus metrics
|
|
metrics.Register(prometheus.NotificationsNamespace)
|
|
}
|