monitor.go 6.83 KB
Newer Older
1 2 3
package agent

import (
4
	"fmt"
5
	"regexp"
6
	"strings"
7
	"sync"
8
	"time"
9

Matthijs Mekking's avatar
Matthijs Mekking committed
10
	"github.com/pkg/errors"
11
	"github.com/shirou/gopsutil/process"
12
	log "github.com/sirupsen/logrus"
13

14
	storkutil "isc.org/stork/util"
15 16
)

17 18
// An access point for an application to retrieve information such
// as status or metrics.
19
type AccessPoint struct {
20
	Type    string
21 22 23 24 25
	Address string
	Port    int64
	Key     string
}

26 27 28 29 30
// Currently supported types are: "control" and "statistics".
const (
	AccessPointControl    = "control"
	AccessPointStatistics = "statistics"
)
31

32
type App struct {
33
	Pid          int32
34
	Type         string
35
	AccessPoints []AccessPoint
36 37
}

38 39 40 41 42
// Currently supported types are: "kea" and "bind9".
const (
	AppTypeKea   = "kea"
	AppTypeBind9 = "bind9"
)
43

44
type AppMonitor interface {
45
	GetApps() []*App
46
	Start(agent *StorkAgent)
47 48 49
	Shutdown()
}

50
type appMonitor struct {
51 52
	requests chan chan []*App // input to app monitor, ie. channel for receiving requests
	quit     chan bool        // channel for stopping app monitor
53 54
	running  bool
	wg       *sync.WaitGroup
55

56
	apps []*App // list of detected apps on the host
57 58
}

59 60
// Names of apps that are being detected.
const (
61
	keaProcName   = "kea-ctrl-agent"
62 63 64
	namedProcName = "named"
)

65 66
// Creates an AppMonitor instance. It used to start it as well, but this is now done
// by a dedicated method Start(). Make sure you call Start() before using app monitor.
67
func NewAppMonitor() AppMonitor {
68
	sm := &appMonitor{
69
		requests: make(chan chan []*App),
70
		quit:     make(chan bool),
71
		wg:       &sync.WaitGroup{},
72
	}
73 74 75 76 77
	return sm
}

// This function starts the actual monitor. This start is delayed in case we want to only
// do command line parameters parsing, e.g. to print version or help and quit.
78
func (sm *appMonitor) Start(storkAgent *StorkAgent) {
79
	sm.wg.Add(1)
80
	go sm.run(storkAgent)
81 82
}

83
func (sm *appMonitor) run(storkAgent *StorkAgent) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
84 85
	log.Printf("Started app monitor")

86 87
	sm.running = true
	defer sm.wg.Done()
88

89
	// run app detection one time immediately at startup
90 91
	sm.detectApps()

92 93 94 95
	// For each detected Kea app, let's gather the logs which can be viewed
	// from the UI.
	sm.detectAllowedLogs(storkAgent)

96
	// prepare ticker
97 98 99
	const detectionInterval = 10 * time.Second
	ticker := time.NewTicker(detectionInterval)
	defer ticker.Stop()
100 101 102

	for {
		select {
103
		case ret := <-sm.requests:
104
			// process user request
105
			ret <- sm.apps
106

107
		case <-ticker.C:
108
			// periodic detection
109
			sm.detectApps()
110

111
		case <-sm.quit:
112
			// exit run
113
			log.Printf("Stopped app monitor")
114
			sm.running = false
115 116 117 118 119
			return
		}
	}
}

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
func printNewOrUpdatedApps(newApps []*App, oldApps []*App) {
	// look for new or updated apps
	var newUpdatedApps []*App
	for _, appNew := range newApps {
		found := false
		for _, appOld := range oldApps {
			if appOld.Type != appNew.Type {
				continue
			}
			if len(appNew.AccessPoints) != len(appOld.AccessPoints) {
				continue
			}
			for idx, acPtNew := range appNew.AccessPoints {
				acPtOld := appOld.AccessPoints[idx]
				if acPtNew.Type != acPtOld.Type {
					continue
				}
				if acPtNew.Address != acPtOld.Address {
					continue
				}
				if acPtNew.Port != acPtOld.Port {
					continue
				}
			}
			found = true
		}
		if !found {
			newUpdatedApps = append(newUpdatedApps, appNew)
		}
	}
	// if found print new or updated apps
	if len(newUpdatedApps) > 0 {
		log.Printf("new or updated apps detected:")
		for _, app := range newUpdatedApps {
			var acPts []string
			for _, acPt := range app.AccessPoints {
				s := fmt.Sprintf("%s: %s:%d", acPt.Type, acPt.Address, acPt.Port)
				acPts = append(acPts, s)
			}
			log.Printf("   %s: %s", app.Type, strings.Join(acPts, ", "))
		}
	}
}

164 165
func (sm *appMonitor) detectApps() {
	// Kea app is being detected by browsing list of processes in the systam
166
	// where cmdline of the process contains given pattern with kea-ctrl-agent
167 168
	// substring. Such found processes are being processed further and all other
	// Kea daemons are discovered and queried for their versions, etc.
169
	keaPtrn := regexp.MustCompile(`(.*?)kea-ctrl-agent\s+.*-c\s+(\S+)`)
170
	// BIND 9 app is being detecting by browsing list of processes in the system
171
	// where cmdline of the process contains given pattern with named substring.
172
	bind9Ptrn := regexp.MustCompile(`(.*?)named\s+(.*)`)
173

174
	var apps []*App
175 176 177

	procs, _ := process.Processes()
	for _, p := range procs {
178
		procName, _ := p.Name()
179 180 181
		cmdline := ""
		cwd := ""
		var err error
182
		if procName == keaProcName || procName == namedProcName {
183
			cmdline, err = p.Cmdline()
184
			if err != nil {
185 186
				log.Warnf("cannot get process command line: %+v", err)
				continue
187
			}
188 189 190 191 192 193
			cwd, err = p.Cwd()
			if err != nil {
				log.Warnf("cannot get process current working directory: %+v", err)
				cwd = ""
			}
		}
194

195
		if procName == keaProcName {
196 197 198
			// detect kea
			m := keaPtrn.FindStringSubmatch(cmdline)
			if m != nil {
199
				keaApp := detectKeaApp(m, cwd)
200
				if keaApp != nil {
201
					keaApp.Pid = p.Pid
202
					apps = append(apps, keaApp)
203
				}
204
			}
205 206 207
			continue
		}

208
		if procName == namedProcName {
209 210 211
			// detect bind9
			m := bind9Ptrn.FindStringSubmatch(cmdline)
			if m != nil {
212 213
				cmdr := &storkutil.RealCommander{}
				bind9App := detectBind9App(m, cwd, cmdr)
214
				if bind9App != nil {
215
					bind9App.Pid = p.Pid
216
					apps = append(apps, bind9App)
217 218 219
				}
			}
			continue
220 221 222
		}
	}

223
	// check changes in apps and print them
224
	printNewOrUpdatedApps(apps, sm.apps)
225

226
	// remember detected apps
227
	sm.apps = apps
228 229
}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
// Gathers the configured log files for detected apps and enables them
// for viewing from the UI.
func (sm *appMonitor) detectAllowedLogs(storkAgent *StorkAgent) {
	// Nothing to do if the agent is not set. It may be nil when running some
	// tests.
	if storkAgent == nil {
		return
	}
	for _, app := range sm.apps {
		// Only Kea apps are currently supported.
		if app.Type == AppTypeKea {
			for _, ac := range app.AccessPoints {
				if ac.Type == AccessPointControl {
					err := detectKeaAllowedLogs(storkAgent, ac.Address, ac.Port)
					if err != nil {
						err = errors.WithMessagef(err, "failed to detect log files for Kea")
						log.WithFields(
							log.Fields{
								"address": ac.Address,
								"port":    ac.Port,
							},
						).Warn(err)
					}
					break
				}
			}
		}
	}
}

260 261
func (sm *appMonitor) GetApps() []*App {
	ret := make(chan []*App)
262
	sm.requests <- ret
263
	srvs := <-ret
264 265 266
	return srvs
}

267
func (sm *appMonitor) Shutdown() {
268
	sm.quit <- true
269
	sm.wg.Wait()
270
}
Matthijs Mekking's avatar
Matthijs Mekking committed
271

272 273
// getAccessPoint retrieves the requested type of access point from the app.
func getAccessPoint(app *App, accessType string) (*AccessPoint, error) {
Matthijs Mekking's avatar
Matthijs Mekking committed
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
	for _, point := range app.AccessPoints {
		if point.Type != accessType {
			continue
		}

		if point.Port == 0 {
			return nil, errors.Errorf("%s access point does not have port number", accessType)
		} else if len(point.Address) == 0 {
			return nil, errors.Errorf("%s access point does not have address", accessType)
		}

		// found a good access point
		return &point, nil
	}

	return nil, errors.Errorf("%s access point not found", accessType)
}