Commit af7e915a authored by Thomas Markwalder's avatar Thomas Markwalder

[#252] Removed PeriodicPulller from RPS

backend/server/apps/kea/rps.go
    Renamed RpsPuller to RpsWorker, no longer contains a PeriodicPuller
    deleted the following functions:
    Shutdown()
    pullStats()
    getStatsFromApp()

backend/server/apps/kea/rps_test.go
    Reworked tests
    Removed those which primarily only exercized pullStats()/
    getSTatsFromApp()
parent 6377189f
package kea
import (
"context"
"fmt"
"time"
......@@ -15,8 +14,8 @@ import (
)
// Periodic Puller that generates RPS interval data.
type RpsPuller struct {
*agentcomm.PeriodicPuller
type RpsWorker struct {
db *pg.DB
PreviousRps map[int64]StatSample // map of last known values per Daemon
Interval1 time.Duration
Interval2 time.Duration
......@@ -74,166 +73,32 @@ type ResponseArguments6 struct {
Samples []interface{} `json:"pkt6-reply-sent"`
}
// Create a RpsPuller object that in background pulls Kea RPS stats.
// Beneath it spawns a goroutine that pulls the response sent statistics
// periodically from Kea apps (that are stored in database). These are
// used to calculate and store RPS interval data per Kea daemon in the database.
// If it is blank, the RpsPuller will not start its PeriodicPuller, allowing
// it to driven externally. If intervalName is not blank, the RpsPuller will
// start its PeriodicPuller, using the value as the name of the db setting for
// the periodic interval.
func NewRpsPuller(db *pg.DB, agents agentcomm.ConnectedAgents, intervalName string) (*RpsPuller, error) {
rpsPuller := &RpsPuller{}
// Run our own polling. Otherwise someone else is calling the shots.
if intervalName != "" {
periodicPuller, err := agentcomm.NewPeriodicPuller(db, agents, "Kea RPS Stats", intervalName, rpsPuller.pullStats)
if err != nil {
return nil, err
}
// Create a RpsWorker object for building Kea API commands and using
// their responses to populate RPS statistics.
func NewRpsWorker(db *pg.DB) (*RpsWorker, error) {
rpsWorker := &RpsWorker{}
rpsPuller.PeriodicPuller = periodicPuller
} else {
// We still need a puller instance for its Db member.
periodicPuller := &agentcomm.PeriodicPuller{
Db: db,
}
rpsPuller.PeriodicPuller = periodicPuller
}
rpsWorker.db = db
rpsWorker.PreviousRps = map[int64]StatSample{}
rpsPuller.PreviousRps = map[int64]StatSample{}
// The interval values may some day be configurable
rpsPuller.Interval1 = (time.Minute * 15)
rpsPuller.Interval2 = (time.Hour * 24)
return rpsPuller, nil
}
// Shutdown RpsPuller. It stops goroutine that pulls stats.
func (rpsPuller *RpsPuller) Shutdown() {
if rpsPuller.PeriodicPuller.Active {
rpsPuller.PeriodicPuller.Shutdown()
}
}
// Pull RPS stats periodically for all Kea apps which Stork is monitoring. The function
// returns the number of apps for which the stats were successfully pulled and last
// encountered error. This used if the RpsPuller is running it's own PeriodicPuller.
func (rpsPuller *RpsPuller) pullStats() (int, error) {
// Delete obsolete data.
err := rpsPuller.AgeOffRpsIntervals()
if err != nil {
log.Errorf("error aging of RPS interval data: %+v", err)
}
// Get list of all kea apps from database
dbApps, err := dbmodel.GetAppsByType(rpsPuller.Db, dbmodel.AppTypeKea)
if err != nil {
return 0, err
}
// Get RPS stats from each kea app
var lastErr error
appsOkCnt := 0
for _, dbApp := range dbApps {
dbApp2 := dbApp
err := rpsPuller.getStatsFromApp(&dbApp2)
if err != nil {
lastErr = err
log.Errorf("error occurred while getting RPS stats from app %d: %+v", dbApp.ID, err)
} else {
appsOkCnt++
}
}
log.Printf("completed pulling RPS stats from Kea apps: %d/%d succeeded", appsOkCnt, len(dbApps))
return appsOkCnt, lastErr
}
// Generates RPS interval data for each daemon in a given Kea app
// This used if the RpsPuller is running it's own PeriodicPuller.
func (rpsPuller *RpsPuller) getStatsFromApp(dbApp *dbmodel.App) error {
// Prepare URL to CA
ctrlPoint, err := dbApp.GetAccessPoint(dbmodel.AccessPointControl)
if err != nil {
return err
}
// Slices for tracking commands, the daemons they're sent to, and the responses
cmds := []*agentcomm.KeaCommand{}
responses := []interface{}{}
cmdDaemons := []*dbmodel.Daemon{}
dhcp4Daemons := make(agentcomm.KeaDaemons)
dhcp6Daemons := make(agentcomm.KeaDaemons)
// Iterate over active daemons, adding commands for dhcp4 and dhcp6 daemons
// Since we might have dhcp4 only, dhcp6 only or both, we build an array
// of expected responses.
for _, d := range dbApp.Daemons {
if d.KeaDaemon != nil && d.Active {
switch d.Name {
case dhcp4:
cmdDaemons = append(cmdDaemons, d)
dhcp4Daemons[dhcp4] = true
responses = append(responses, rpsPuller.AddCmd4(&cmds, &dhcp4Daemons))
case dhcp6:
cmdDaemons = append(cmdDaemons, d)
dhcp6Daemons[dhcp6] = true
responses = append(responses, rpsPuller.AddCmd6(&cmds, &dhcp6Daemons))
}
}
}
// If there are no commands, nothing to do
if len(cmds) == 0 {
return nil
}
// forward commands to kea
ctx := context.Background()
cmdsResult, err := rpsPuller.Agents.ForwardToKeaOverHTTP(ctx, dbApp.Machine.Address, dbApp.Machine.AgentPort, ctrlPoint.Address, ctrlPoint.Port, cmds, responses...)
if err != nil {
return err
}
if cmdsResult.Error != nil {
return cmdsResult.Error
}
var lastErr error
for idx := 0; idx < len(cmds); idx++ {
switch cmdDaemons[idx].Name {
case dhcp4:
err = rpsPuller.Response4Handler(cmdDaemons[idx], responses[idx])
if err != nil {
log.Errorf("error handling statistic-get (v4) response: %+v", err)
lastErr = err
}
case dhcp6:
err = rpsPuller.Response6Handler(cmdDaemons[idx], responses[idx])
if err != nil {
log.Errorf("error handling statistic-get (v6) response: %+v", err)
lastErr = err
}
}
}
rpsWorker.Interval1 = (time.Minute * 15)
rpsWorker.Interval2 = (time.Hour * 24)
return lastErr
return rpsWorker, nil
}
// Ages off obsolete RPS interval data.
func (rpsPuller *RpsPuller) AgeOffRpsIntervals() error {
func (rpsWorker *RpsWorker) AgeOffRpsIntervals() error {
// Age off records more than Interval2 old.
deleteTime := storkutil.UTCNow().Add(-rpsPuller.Interval2)
err := dbmodel.AgeOffRpsInterval(rpsPuller.Db, deleteTime)
deleteTime := storkutil.UTCNow().Add(-rpsWorker.Interval2)
err := dbmodel.AgeOffRpsInterval(rpsWorker.db, deleteTime)
return err
}
// Appends the statistic-get command for DHCP4 to the given commond list. It returns
// an instance of the expected response type.
func (rpsPuller *RpsPuller) AddCmd4(cmds *[]*agentcomm.KeaCommand, dhcp4Daemons *agentcomm.KeaDaemons) interface{} {
func (rpsWorker *RpsWorker) AddCmd4(cmds *[]*agentcomm.KeaCommand, dhcp4Daemons *agentcomm.KeaDaemons) interface{} {
dhcp4Arguments := RpsGetDhcp4Arguments()
*cmds = append(*cmds, &agentcomm.KeaCommand{
Command: "statistic-get",
......@@ -244,7 +109,7 @@ func (rpsPuller *RpsPuller) AddCmd4(cmds *[]*agentcomm.KeaCommand, dhcp4Daemons
// Appends the statistic-get command for DHCP4 to the given commond list. It returns
// an instance of the expected response type.
func (rpsPuller *RpsPuller) AddCmd6(cmds *[]*agentcomm.KeaCommand, dhcp6Daemons *agentcomm.KeaDaemons) interface{} {
func (rpsWorker *RpsWorker) AddCmd6(cmds *[]*agentcomm.KeaCommand, dhcp6Daemons *agentcomm.KeaDaemons) interface{} {
dhcp6Arguments := RpsGetDhcp6Arguments()
*cmds = append(*cmds, &agentcomm.KeaCommand{
Command: "statistic-get",
......@@ -254,20 +119,20 @@ func (rpsPuller *RpsPuller) AddCmd6(cmds *[]*agentcomm.KeaCommand, dhcp6Daemons
}
// Processes the statistic-get command response for DHCP4.
func (rpsPuller *RpsPuller) Response4Handler(daemon *dbmodel.Daemon, response interface{}) error {
func (rpsWorker *RpsWorker) Response4Handler(daemon *dbmodel.Daemon, response interface{}) error {
statsResp4, ok := response.(*[]StatGetResponse4)
if !ok {
return fmt.Errorf("response type is invalid: %+v", response)
}
samples, err := rpsPuller.extractSamples4(*statsResp4)
samples, err := rpsWorker.extractSamples4(*statsResp4)
if err == nil {
// Calculate and store the RPS interval for this daemon for this cycle
err = rpsPuller.updateDaemonRpsIntervals(daemon, samples)
err = rpsWorker.updateDaemonRpsIntervals(daemon, samples)
// Now we'll update the Kea RPS statistics based on the updated interval data
if err == nil {
err = rpsPuller.updateKeaDaemonRpsStats(daemon)
err = rpsWorker.updateKeaDaemonRpsStats(daemon)
}
}
......@@ -279,20 +144,20 @@ func (rpsPuller *RpsPuller) Response4Handler(daemon *dbmodel.Daemon, response in
}
// Processes the statistic-get command response for DHCP4.
func (rpsPuller *RpsPuller) Response6Handler(daemon *dbmodel.Daemon, response interface{}) error {
func (rpsWorker *RpsWorker) Response6Handler(daemon *dbmodel.Daemon, response interface{}) error {
statsResp6, ok := response.(*[]StatGetResponse6)
if !ok {
return fmt.Errorf("response type is invalid: %+v", response)
}
samples, err := rpsPuller.extractSamples6(*statsResp6)
samples, err := rpsWorker.extractSamples6(*statsResp6)
if err == nil {
// Calculate and store the RPS interval for this daemon for this cycle
err = rpsPuller.updateDaemonRpsIntervals(daemon, samples)
err = rpsWorker.updateDaemonRpsIntervals(daemon, samples)
// Now we'll update the Kea RPS statistics based on the updated interval data
if err == nil {
err = rpsPuller.updateKeaDaemonRpsStats(daemon)
err = rpsWorker.updateKeaDaemonRpsStats(daemon)
}
}
......@@ -304,7 +169,7 @@ func (rpsPuller *RpsPuller) Response6Handler(daemon *dbmodel.Daemon, response in
}
// Exract the list of statistic samples from a dhcp4 statistic-get response if the response is valid.
func (rpsPuller *RpsPuller) extractSamples4(statsResp []StatGetResponse4) ([]interface{}, error) {
func (rpsWorker *RpsWorker) extractSamples4(statsResp []StatGetResponse4) ([]interface{}, error) {
if len(statsResp) == 0 {
err := fmt.Errorf("empty RPS response")
return nil, err
......@@ -329,7 +194,7 @@ func (rpsPuller *RpsPuller) extractSamples4(statsResp []StatGetResponse4) ([]int
}
// Exract the list of statistic samples from a dhcp6 statistic-get response if the response is valid.
func (rpsPuller *RpsPuller) extractSamples6(statsResp []StatGetResponse6) ([]interface{}, error) {
func (rpsWorker *RpsWorker) extractSamples6(statsResp []StatGetResponse6) ([]interface{}, error) {
if len(statsResp) == 0 {
err := fmt.Errorf("empty RPS response")
return nil, err
......@@ -355,7 +220,7 @@ func (rpsPuller *RpsPuller) extractSamples6(statsResp []StatGetResponse6) ([]int
// Uses the most recent Kea statistic value for packets sent to calculate and
// store an RPS interval row for the current interval for the given daemon.
func (rpsPuller *RpsPuller) updateDaemonRpsIntervals(daemon *dbmodel.Daemon, samples []interface{}) error {
func (rpsWorker *RpsWorker) updateDaemonRpsIntervals(daemon *dbmodel.Daemon, samples []interface{}) error {
// The first row of the samples is the most recent value and the only
// one we care about. Fetch it.
value, sampledAt, err := getFirstSample(samples)
......@@ -371,7 +236,7 @@ func (rpsPuller *RpsPuller) updateDaemonRpsIntervals(daemon *dbmodel.Daemon, sam
}
// If we have a previous recording, calculate a delta row for it
if previous, exist := rpsPuller.PreviousRps[daemonID]; exist {
if previous, exist := rpsWorker.PreviousRps[daemonID]; exist {
// Make a new interval
interval := &dbmodel.RpsInterval{}
interval.KeaDaemonID = daemonID
......@@ -390,11 +255,11 @@ func (rpsPuller *RpsPuller) updateDaemonRpsIntervals(daemon *dbmodel.Daemon, sam
interval.Responses = value
}
err = dbmodel.AddRpsInterval(rpsPuller.Db, interval)
err = dbmodel.AddRpsInterval(rpsWorker.db, interval)
}
// Always update the last reported values for the Daemon.
rpsPuller.PreviousRps[daemonID] = StatSample{sampledAt, value}
rpsWorker.PreviousRps[daemonID] = StatSample{sampledAt, value}
return err
}
......@@ -402,13 +267,13 @@ func (rpsPuller *RpsPuller) updateDaemonRpsIntervals(daemon *dbmodel.Daemon, sam
// Update the RPS value for both intervals for given daemon.
// Uses the RpsInterval table contents to get the total responses and duration
// for both intervals and then updates the Daemon's statistics in the db.
func (rpsPuller *RpsPuller) updateKeaDaemonRpsStats(daemon *dbmodel.Daemon) error {
func (rpsWorker *RpsWorker) updateKeaDaemonRpsStats(daemon *dbmodel.Daemon) error {
endTime := storkutil.UTCNow()
startTime1 := endTime.Add(-rpsPuller.Interval1)
startTime1 := endTime.Add(-rpsWorker.Interval1)
daemonID := daemon.KeaDaemon.DaemonID
// Fetch interval totals for interval 1.
rps1, err := dbmodel.GetTotalRpsOverIntervalForDaemon(rpsPuller.Db, startTime1, endTime, daemonID)
rps1, err := dbmodel.GetTotalRpsOverIntervalForDaemon(rpsWorker.db, startTime1, endTime, daemonID)
if err != nil {
return errors.WithMessagef(err, "query for RPS interval 1 data failed")
}
......@@ -417,8 +282,8 @@ func (rpsPuller *RpsPuller) updateKeaDaemonRpsStats(daemon *dbmodel.Daemon) erro
daemon.KeaDaemon.KeaDHCPDaemon.Stats.RPS1 = calculateRps(rps1)
// Fetch interval totals for interval 1.
startTime2 := endTime.Add(-rpsPuller.Interval2)
rps2, err := dbmodel.GetTotalRpsOverIntervalForDaemon(rpsPuller.Db, startTime2, endTime, daemonID)
startTime2 := endTime.Add(-rpsWorker.Interval2)
rps2, err := dbmodel.GetTotalRpsOverIntervalForDaemon(rpsWorker.db, startTime2, endTime, daemonID)
if err != nil {
return errors.WithMessagef(err, "query for RPS interval 2 data failed")
}
......@@ -428,7 +293,7 @@ func (rpsPuller *RpsPuller) updateKeaDaemonRpsStats(daemon *dbmodel.Daemon) erro
// Update the daemon statistics.
log.Printf("Updating KeaDHCPDaemonStats: %+v", daemon.KeaDaemon.KeaDHCPDaemon.Stats)
return dbmodel.UpdateDaemon(rpsPuller.Db, daemon)
return dbmodel.UpdateDaemon(rpsWorker.db, daemon)
}
// Calculate the RPS for the first row in a set of RpsIntervals
......
This diff is collapsed.
......@@ -14,12 +14,12 @@ import (
type StatsPuller struct {
*agentcomm.PeriodicPuller
*RpsPuller
*RpsWorker
}
// Create a StatsPuller object that in background pulls Kea stats about leases.
// Beneath it spawns a goroutine that pulls stats periodically from Kea apps (that are stored in database).
func NewStatsPuller(db *pg.DB, agents agentcomm.ConnectedAgents, includeRpsPuller bool) (*StatsPuller, error) {
func NewStatsPuller(db *pg.DB, agents agentcomm.ConnectedAgents, includeRpsWorker bool) (*StatsPuller, error) {
statsPuller := &StatsPuller{}
periodicPuller, err := agentcomm.NewPeriodicPuller(db, agents, "Kea Stats", "kea_stats_puller_interval",
statsPuller.pullStats)
......@@ -28,14 +28,14 @@ func NewStatsPuller(db *pg.DB, agents agentcomm.ConnectedAgents, includeRpsPulle
}
statsPuller.PeriodicPuller = periodicPuller
if includeRpsPuller {
// Create RpsPuller instance without its own PeriodicPuller
rpsPuller, err := NewRpsPuller(db, agents, "")
if includeRpsWorker {
// Create RpsWorker instance without its own PeriodicPuller
rpsWorker, err := NewRpsWorker(db)
if err != nil {
return nil, err
}
statsPuller.RpsPuller = rpsPuller
statsPuller.RpsWorker = rpsWorker
}
return statsPuller, nil
......@@ -305,8 +305,8 @@ func (statsPuller *StatsPuller) getStatsFromApp(dbApp *dbmodel.App) error {
}
// If we're running RPS, age off obsolete RPS data.
if statsPuller.RpsPuller != nil {
_ = statsPuller.RpsPuller.AgeOffRpsIntervals()
if statsPuller.RpsWorker != nil {
_ = statsPuller.RpsWorker.AgeOffRpsIntervals()
}
// Slices for tracking commands, the daemons they're sent to, and the responses
......@@ -330,10 +330,10 @@ func (statsPuller *StatsPuller) getStatsFromApp(dbApp *dbmodel.App) error {
responses = append(responses, &[]StatLeaseGetResponse{})
// Add daemon, cmd and response for DHCP4 RPS stats if we have an RpsPuller
if statsPuller.RpsPuller != nil {
// Add daemon, cmd and response for DHCP4 RPS stats if we have an RpsWorker
if statsPuller.RpsWorker != nil {
cmdDaemons = append(cmdDaemons, d)
responses = append(responses, statsPuller.RpsPuller.AddCmd4(&cmds, dhcp4Daemons))
responses = append(responses, statsPuller.RpsWorker.AddCmd4(&cmds, dhcp4Daemons))
}
case dhcp6:
......@@ -347,10 +347,10 @@ func (statsPuller *StatsPuller) getStatsFromApp(dbApp *dbmodel.App) error {
responses = append(responses, &[]StatLeaseGetResponse{})
// Add daemon, cmd and response for DHCP6 RPS stats if we have an RpsPuller
if statsPuller.RpsPuller != nil {
// Add daemon, cmd and response for DHCP6 RPS stats if we have an RpsWorker
if statsPuller.RpsWorker != nil {
cmdDaemons = append(cmdDaemons, d)
responses = append(responses, statsPuller.RpsPuller.AddCmd6(&cmds, dhcp6Daemons))
responses = append(responses, statsPuller.RpsWorker.AddCmd6(&cmds, dhcp6Daemons))
}
}
}
......@@ -405,7 +405,7 @@ func (statsPuller *StatsPuller) processAppResponses(dbApp *dbmodel.App, cmds []*
lastErr = err
}
case "statistic-get":
err = statsPuller.RpsPuller.Response4Handler(cmdDaemons[idx], responses[idx])
err = statsPuller.RpsWorker.Response4Handler(cmdDaemons[idx], responses[idx])
if err != nil {
log.Errorf("error handling statistic-get (v4) response: %+v", err)
lastErr = err
......@@ -421,7 +421,7 @@ func (statsPuller *StatsPuller) processAppResponses(dbApp *dbmodel.App, cmds []*
lastErr = err
}
case "statistic-get":
err = statsPuller.RpsPuller.Response6Handler(cmdDaemons[idx], responses[idx])
err = statsPuller.RpsWorker.Response6Handler(cmdDaemons[idx], responses[idx])
if err != nil {
log.Errorf("error handling statistic-get (v6) response: %+v", err)
lastErr = err
......
......@@ -30,7 +30,7 @@ func TestStatsPullerBasic(t *testing.T) {
fa := storktest.NewFakeAgents(nil, nil)
sp, _ := NewStatsPuller(db, fa, true)
require.NotEmpty(t, sp.RpsPuller)
require.NotEmpty(t, sp.RpsWorker)
sp.Shutdown()
}
......@@ -114,7 +114,7 @@ func TestStatsPullerPullStats(t *testing.T) {
err = db.Insert(&setting)
require.NoError(t, err)
// prepare stats puller without RpsPuller
// prepare stats puller without RpsWorker
sp, err := NewStatsPuller(db, fa, false)
require.NoError(t, err)
......@@ -246,7 +246,7 @@ func TestStatsPullerEmptyResponse(t *testing.T) {
err = db.Insert(&setting)
require.NoError(t, err)
// prepare stats puller without RpsPuller
// prepare stats puller without RpsWorker
sp, err := NewStatsPuller(db, fa, false)
require.NoError(t, err)
// shutdown stats puller at the end
......@@ -259,7 +259,7 @@ func TestStatsPullerEmptyResponse(t *testing.T) {
}
// Check if pulling stats works when RPS is included.
// RpsPuller has a thorough set of unit tests so in this
// RpsWorker has a thorough set of unit tests so in this
// we verify only that it has entries in its internal
// Map of statistics fetched. This is enough to demonstrate
// that it is operational. We repeat the lease stats
......@@ -365,7 +365,7 @@ func TestStatsPullerPullStatsWithRps(t *testing.T) {
err = db.Insert(&setting)
require.NoError(t, err)
// prepare stats puller with RpsPuller
// prepare stats puller with RpsWorker
sp, err := NewStatsPuller(db, fa, true)
require.NoError(t, err)
......@@ -421,16 +421,16 @@ func TestStatsPullerPullStatsWithRps(t *testing.T) {
}
require.Equal(t, 5, snCnt)
// We should have two rows in RpsPuller.PreviousRps map one for each daemon
require.Equal(t, 2, len(sp.RpsPuller.PreviousRps))
// We should have two rows in RpsWorker.PreviousRps map one for each daemon
require.Equal(t, 2, len(sp.RpsWorker.PreviousRps))
// Entry for ID 1 should be dhcp4 daemon, it should have an RPS value of 44
previous := sp.RpsPuller.PreviousRps[1]
previous := sp.RpsWorker.PreviousRps[1]
require.NotEqual(t, nil, previous)
require.EqualValues(t, 44, previous.Value)
// Entry for ID 2 should be dhcp6 daemon, it should have an RPS value of 66
previous = sp.RpsPuller.PreviousRps[2]
previous = sp.RpsWorker.PreviousRps[2]
require.NotEqual(t, nil, previous)
require.EqualValues(t, 66, previous.Value)
}
......@@ -32,7 +32,6 @@ type StorkServer struct {
KeaStatsPuller *kea.StatsPuller
KeaHostsPuller *kea.HostsPuller
StatusPuller *kea.StatusPuller
KeaRpsPuller *kea.RpsPuller
EventCenter eventcenter.EventCenter
}
......@@ -177,7 +176,6 @@ func (ss *StorkServer) Shutdown() {
ss.RestAPI.Shutdown()
ss.KeaHostsPuller.Shutdown()
ss.KeaStatsPuller.Shutdown()
ss.KeaRpsPuller.Shutdown()
ss.Bind9StatsPuller.Shutdown()
ss.StatusPuller.Shutdown()
ss.EventCenter.Shutdown()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment