package ngalert import ( "context" "fmt" "sync" "time" "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/ngalert/eval" "golang.org/x/sync/errgroup" ) type scheduleService interface { Ticker(context.Context) error Pause() error Unpause() error // the following are used by tests only used for tests evalApplied(alertDefinitionKey, time.Time) stopApplied(alertDefinitionKey) overrideCfg(cfg schedulerCfg) } func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error { sch.log.Debug("alert definition routine started", "key", key) evalRunning := false var start, end time.Time var attempt int64 var alertDefinition *AlertDefinition for { select { case ctx := <-evalCh: if evalRunning { continue } evaluate := func(attempt int64) error { start = timeNow() // fetch latest alert definition version if alertDefinition == nil || alertDefinition.Version < ctx.version { q := getAlertDefinitionByUIDQuery{OrgID: key.orgID, UID: key.definitionUID} err := sch.store.getAlertDefinitionByUID(&q) if err != nil { sch.log.Error("failed to fetch alert definition", "key", key) return err } alertDefinition = q.Result sch.log.Debug("new alert definition version fetched", "title", alertDefinition.Title, "key", key, "version", alertDefinition.Version) } condition := eval.Condition{ RefID: alertDefinition.Condition, OrgID: alertDefinition.OrgID, QueriesAndExpressions: alertDefinition.Data, } results, err := sch.evaluator.ConditionEval(&condition, ctx.now) end = timeNow() if err != nil { // consider saving alert instance on error sch.log.Error("failed to evaluate alert definition", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err) return err } for _, r := range results { sch.log.Debug("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String()) cmd := saveAlertInstanceCommand{DefinitionOrgID: key.orgID, DefinitionUID: key.definitionUID, State: InstanceStateType(r.State.String()), Labels: InstanceLabels(r.Instance), LastEvalTime: ctx.now} err := sch.store.saveAlertInstance(&cmd) if err != nil { sch.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err) } } return nil } func() { evalRunning = true defer func() { evalRunning = false sch.evalApplied(key, ctx.now) }() for attempt = 0; attempt < sch.maxAttempts; attempt++ { err := evaluate(attempt) if err == nil { break } } }() case <-stopCh: sch.stopApplied(key) sch.log.Debug("stopping alert definition routine", "key", key) // interrupt evaluation if it's running return nil case <-grafanaCtx.Done(): return grafanaCtx.Err() } } } type schedule struct { // base tick rate (fastest possible configured check) baseInterval time.Duration // each alert definition gets its own channel and routine registry alertDefinitionRegistry maxAttempts int64 clock clock.Clock heartbeat *alerting.Ticker // evalApplied is only used for tests: test code can set it to non-nil // function, and then it'll be called from the event loop whenever the // message from evalApplied is handled. evalAppliedFunc func(alertDefinitionKey, time.Time) // stopApplied is only used for tests: test code can set it to non-nil // function, and then it'll be called from the event loop whenever the // message from stopApplied is handled. stopAppliedFunc func(alertDefinitionKey) log log.Logger evaluator eval.Evaluator store store } type schedulerCfg struct { c clock.Clock baseInterval time.Duration logger log.Logger evalAppliedFunc func(alertDefinitionKey, time.Time) stopAppliedFunc func(alertDefinitionKey) evaluator eval.Evaluator store store } // newScheduler returns a new schedule. func newScheduler(cfg schedulerCfg) *schedule { ticker := alerting.NewTicker(cfg.c.Now(), time.Second*0, cfg.c, int64(cfg.baseInterval.Seconds())) sch := schedule{ registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[alertDefinitionKey]alertDefinitionInfo)}, maxAttempts: maxAttempts, clock: cfg.c, baseInterval: cfg.baseInterval, log: cfg.logger, heartbeat: ticker, evalAppliedFunc: cfg.evalAppliedFunc, stopAppliedFunc: cfg.stopAppliedFunc, evaluator: cfg.evaluator, store: cfg.store, } return &sch } func (sch *schedule) overrideCfg(cfg schedulerCfg) { sch.clock = cfg.c sch.baseInterval = cfg.baseInterval sch.heartbeat = alerting.NewTicker(cfg.c.Now(), time.Second*0, cfg.c, int64(cfg.baseInterval.Seconds())) sch.evalAppliedFunc = cfg.evalAppliedFunc sch.stopAppliedFunc = cfg.stopAppliedFunc } func (sch *schedule) evalApplied(alertDefKey alertDefinitionKey, now time.Time) { if sch.evalAppliedFunc == nil { return } sch.evalAppliedFunc(alertDefKey, now) } func (sch *schedule) stopApplied(alertDefKey alertDefinitionKey) { if sch.stopAppliedFunc == nil { return } sch.stopAppliedFunc(alertDefKey) } func (sch *schedule) Pause() error { if sch == nil { return fmt.Errorf("scheduler is not initialised") } sch.heartbeat.Pause() sch.log.Info("alert definition scheduler paused", "now", sch.clock.Now()) return nil } func (sch *schedule) Unpause() error { if sch == nil { return fmt.Errorf("scheduler is not initialised") } sch.heartbeat.Unpause() sch.log.Info("alert definition scheduler unpaused", "now", sch.clock.Now()) return nil } func (sch *schedule) Ticker(grafanaCtx context.Context) error { dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx) for { select { case tick := <-sch.heartbeat.C: tickNum := tick.Unix() / int64(sch.baseInterval.Seconds()) alertDefinitions := sch.fetchAllDetails(tick) sch.log.Debug("alert definitions fetched", "count", len(alertDefinitions)) // registeredDefinitions is a map used for finding deleted alert definitions // initially it is assigned to all known alert definitions from the previous cycle // each alert definition found also in this cycle is removed // so, at the end, the remaining registered alert definitions are the deleted ones registeredDefinitions := sch.registry.keyMap() type readyToRunItem struct { key alertDefinitionKey definitionInfo alertDefinitionInfo } readyToRun := make([]readyToRunItem, 0) for _, item := range alertDefinitions { if item.Paused { continue } key := item.getKey() itemVersion := item.Version newRoutine := !sch.registry.exists(key) definitionInfo := sch.registry.getOrCreateInfo(key, itemVersion) invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0 if newRoutine && !invalidInterval { dispatcherGroup.Go(func() error { return sch.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh) }) } if invalidInterval { // this is expected to be always false // give that we validate interval during alert definition updates sch.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", sch.baseInterval) continue } itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { readyToRun = append(readyToRun, readyToRunItem{key: key, definitionInfo: definitionInfo}) } // remove the alert definition from the registered alert definitions delete(registeredDefinitions, key) } var step int64 = 0 if len(readyToRun) > 0 { step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun)) } for i := range readyToRun { item := readyToRun[i] time.AfterFunc(time.Duration(int64(i)*step), func() { item.definitionInfo.evalCh <- &evalContext{now: tick, version: item.definitionInfo.version} }) } // unregister and stop routines of the deleted alert definitions for key := range registeredDefinitions { definitionInfo, err := sch.registry.get(key) if err != nil { sch.log.Error("failed to get alert definition routine information", "err", err) continue } definitionInfo.stopCh <- struct{}{} sch.registry.del(key) } case <-grafanaCtx.Done(): err := dispatcherGroup.Wait() return err } } } type alertDefinitionRegistry struct { mu sync.Mutex alertDefinitionInfo map[alertDefinitionKey]alertDefinitionInfo } // getOrCreateInfo returns the channel for the specific alert definition // if it does not exists creates one and returns it func (r *alertDefinitionRegistry) getOrCreateInfo(key alertDefinitionKey, definitionVersion int64) alertDefinitionInfo { r.mu.Lock() defer r.mu.Unlock() info, ok := r.alertDefinitionInfo[key] if !ok { r.alertDefinitionInfo[key] = alertDefinitionInfo{evalCh: make(chan *evalContext), stopCh: make(chan struct{}), version: definitionVersion} return r.alertDefinitionInfo[key] } info.version = definitionVersion r.alertDefinitionInfo[key] = info return info } // get returns the channel for the specific alert definition // if the key does not exist returns an error func (r *alertDefinitionRegistry) get(key alertDefinitionKey) (*alertDefinitionInfo, error) { r.mu.Lock() defer r.mu.Unlock() info, ok := r.alertDefinitionInfo[key] if !ok { return nil, fmt.Errorf("%v key not found", key) } return &info, nil } func (r *alertDefinitionRegistry) exists(key alertDefinitionKey) bool { r.mu.Lock() defer r.mu.Unlock() _, ok := r.alertDefinitionInfo[key] return ok } func (r *alertDefinitionRegistry) del(key alertDefinitionKey) { r.mu.Lock() defer r.mu.Unlock() delete(r.alertDefinitionInfo, key) } func (r *alertDefinitionRegistry) iter() <-chan alertDefinitionKey { c := make(chan alertDefinitionKey) f := func() { r.mu.Lock() defer r.mu.Unlock() for k := range r.alertDefinitionInfo { c <- k } close(c) } go f() return c } func (r *alertDefinitionRegistry) keyMap() map[alertDefinitionKey]struct{} { definitionsIDs := make(map[alertDefinitionKey]struct{}) for k := range r.iter() { definitionsIDs[k] = struct{}{} } return definitionsIDs } type alertDefinitionInfo struct { evalCh chan *evalContext stopCh chan struct{} version int64 } type evalContext struct { now time.Time version int64 }