Commit 6377189f authored by Thomas Markwalder's avatar Thomas Markwalder

[#252] Addressed most of review comments

Addressed all comments except those relevant to removing RpsPuller.pullStats() and
RpsPuller.getStatsFromApp()
parent 68fcdf64
......@@ -78,6 +78,10 @@ type ResponseArguments6 struct {
// Beneath it spawns a goroutine that pulls the response sent statistics
// periodically from Kea apps (that are stored in database). These are
// used to calculate and store RPS interval data per Kea daemon in the database.
// If it is blank, the RpsPuller will not start its PeriodicPuller, allowing
// it to driven externally. If intervalName is not blank, the RpsPuller will
// start its PeriodicPuller, using the value as the name of the db setting for
// the periodic interval.
func NewRpsPuller(db *pg.DB, agents agentcomm.ConnectedAgents, intervalName string) (*RpsPuller, error) {
rpsPuller := &RpsPuller{}
......@@ -98,8 +102,9 @@ func NewRpsPuller(db *pg.DB, agents agentcomm.ConnectedAgents, intervalName stri
}
rpsPuller.PreviousRps = map[int64]StatSample{}
rpsPuller.Interval1 = (time.Second * 15 * 60)
rpsPuller.Interval2 = (time.Second * 24 * 60 * 60)
// The interval values may some day be configurable
rpsPuller.Interval1 = (time.Minute * 15)
rpsPuller.Interval2 = (time.Hour * 24)
return rpsPuller, nil
}
......@@ -267,7 +272,7 @@ func (rpsPuller *RpsPuller) Response4Handler(daemon *dbmodel.Daemon, response in
}
if err != nil {
return fmt.Errorf("could not update dhcp4 RPS data for %+v, %s", daemon, err)
return errors.WithMessagef(err, "could not update dhcp4 RPS data for %+v", daemon)
}
return nil
......@@ -292,7 +297,7 @@ func (rpsPuller *RpsPuller) Response6Handler(daemon *dbmodel.Daemon, response in
}
if err != nil {
return fmt.Errorf("could not update dhcp6 RPS data for %+v, %s", daemon, err)
return errors.WithMessagef(err, "could not update dhcp4 RPS data for %+v", daemon)
}
return nil
......@@ -353,9 +358,9 @@ func (rpsPuller *RpsPuller) extractSamples6(statsResp []StatGetResponse6) ([]int
func (rpsPuller *RpsPuller) updateDaemonRpsIntervals(daemon *dbmodel.Daemon, samples []interface{}) error {
// The first row of the samples is the most recent value and the only
// one we care about. Fetch it.
value, sampledAt, err := getSampleRow(samples, 0)
value, sampledAt, err := getFirstSample(samples)
if err != nil {
return fmt.Errorf("could not extract RPS stat: %s", err)
return errors.WithMessagef(err, "could not extract RPS statistic")
}
daemonID := daemon.KeaDaemon.DaemonID
......@@ -405,7 +410,7 @@ func (rpsPuller *RpsPuller) updateKeaDaemonRpsStats(daemon *dbmodel.Daemon) erro
// Fetch interval totals for interval 1.
rps1, err := dbmodel.GetTotalRpsOverIntervalForDaemon(rpsPuller.Db, startTime1, endTime, daemonID)
if err != nil {
return fmt.Errorf("query for RPS interval 1 data failed: %s", err)
return errors.WithMessagef(err, "query for RPS interval 1 data failed")
}
// Calculate RPS for interval 1.
......@@ -415,7 +420,7 @@ func (rpsPuller *RpsPuller) updateKeaDaemonRpsStats(daemon *dbmodel.Daemon) erro
startTime2 := endTime.Add(-rpsPuller.Interval2)
rps2, err := dbmodel.GetTotalRpsOverIntervalForDaemon(rpsPuller.Db, startTime2, endTime, daemonID)
if err != nil {
return fmt.Errorf("query for RPS interval 2 data failed: %s", err)
return errors.WithMessagef(err, "query for RPS interval 2 data failed")
}
// Calculate RPS for interval 2.
......@@ -452,21 +457,20 @@ func calculateRps(totals []*dbmodel.RpsInterval) int {
// We use current Stork server time so interval times across Daemons are
// consistent and relative to us. In other words, we don't care when Kea
// modified the value, we care about when we got it.
func getSampleRow(samples []interface{}, rowIdx int) (int64, time.Time, error) {
func getFirstSample(samples []interface{}) (int64, time.Time, error) {
sampledAt := storkutil.UTCNow()
if samples == nil {
return 0, sampledAt, errors.New("samples cannot be nil")
}
if len(samples) < (rowIdx + 1) {
if len(samples) == 0 {
// Not enough rows
return 0, sampledAt, fmt.Errorf("sampleList does not a row at idx: %d", rowIdx)
return 0, sampledAt, fmt.Errorf("sampleList is empty")
}
row, ok := samples[rowIdx].([]interface{})
row, ok := samples[0].([]interface{})
if !ok {
return 0, sampledAt, fmt.Errorf("problem with casting sample row: %+v",
samples[rowIdx])
return 0, sampledAt, fmt.Errorf("problem with casting sample row: %+v", samples[0])
}
if len(row) != 2 {
......
......@@ -111,7 +111,7 @@ func (statsPuller *StatsPuller) pullStats() (int, error) {
appsOkCnt++
}
}
fmt.Printf("completed pulling lease stats from Kea apps: %d/%d succeeded", appsOkCnt, len(dbApps))
log.Printf("completed pulling lease stats from Kea apps: %d/%d succeeded", appsOkCnt, len(dbApps))
// estimate addresses utilization for subnets
subnets, err := dbmodel.GetSubnetsWithLocalSubnets(statsPuller.Db)
......
......@@ -28,32 +28,8 @@ func GetAllRpsIntervals(db *pg.DB) ([]*RpsInterval, error) {
return rpsIntervals, nil
}
// Returns an array of the total RPS per daemon within a given time frame
// One element per daemon, ordered by daemon id where:
// RpsInterval.StartTime = 0 (unused)
// RpsInterval.Responses = total of number of responses
// RpsInterval.Duration = total of the interval durations
func GetTotalRpsOverInterval(db *pg.DB, startTime time.Time, endTime time.Time) ([]*RpsInterval, error) {
rpsTotals := []*RpsInterval{}
q := db.Model(&rpsTotals)
q = q.Column("kea_daemon_id")
q = q.ColumnExpr("sum(responses) as responses")
q = q.ColumnExpr("sum(duration) as duration")
q = q.Order("kea_daemon_id")
q = q.Group("kea_daemon_id")
q = q.Where("start_time >= ? and start_time <= ?", startTime, endTime)
err := q.Select()
if err != nil {
return nil, errors.Wrapf(err, "problem with getting all RPS intervals")
}
return rpsTotals, nil
}
// Returns an array of the total RPS per daemon within a given time frame
// One element per daemon, ordered by daemon id where:
// Returns an array of the total RPS for a given daemon within a given time frame
// One element for the given daemon id where:
// RpsInterval.StartTime = 0 (unused)
// RpsInterval.Responses = total of number of responses
// RpsInterval.Duration = total of the interval durations
......
......@@ -64,7 +64,7 @@ func TestRpsIntervalBasics(t *testing.T) {
require.Len(t, rpsIntervals, 0)
}
// Verifies operation of rps_interval.GetTotalRpsOverInterval()
// Verifies operation of rps_interval.GetTotalRpsOverIntervalForDaemon()
// It populates the RpsInterval and then tests four invocations
// with varying time frames.
func TestRpsIntervalTotals(t *testing.T) {
......@@ -81,11 +81,6 @@ func TestRpsIntervalTotals(t *testing.T) {
timeZero := storkutil.UTCNow().Round(time.Second)
timeZero = timeZero.Add(time.Duration(-60) * time.Second)
// Getting totals with no rows should return no rows.
rpsTotals, err := GetTotalRpsOverInterval(db, timeZero, storkutil.UTCNow())
require.NoError(t, err)
require.Len(t, rpsTotals, 0)
startTime := timeZero
intervals := 5
daemons := 3
......@@ -109,101 +104,54 @@ func TestRpsIntervalTotals(t *testing.T) {
// Get totals that span the whole table
startTime = timeZero
endTime := storkutil.UTCNow()
rpsTotals, err = GetTotalRpsOverInterval(db, startTime, endTime)
require.NoError(t, err)
require.Len(t, rpsTotals, 3)
expDuration := (intervals * 5)
expDuration := int64(intervals * 5)
// Verify the totals.
for row := 0; row < daemons; row++ {
interval := rpsTotals[row]
daemon := row + 1
require.EqualValues(t, daemon, interval.KeaDaemonID)
require.EqualValues(t, expDuration, interval.Duration)
for daemon := 1; daemon <= daemons; daemon++ {
var expResponses int64
for interval := 1; interval <= intervals; interval++ {
expResponses += int64(interval) * int64(math.Pow10(daemon))
}
require.EqualValues(t, expResponses, interval.Responses)
// Now check the RPS values when pulled for a single daemon.
checkIntervalPerDaemon(t, db, startTime, endTime,
interval.KeaDaemonID, interval.Responses, interval.Duration)
int64(daemon), expResponses, expDuration)
}
// Fetch totals for a time frame containing only the first two intervals
startTime = timeZero
endTime = timeZero.Add(time.Duration(7) * time.Second)
rpsTotals, err = GetTotalRpsOverInterval(db, startTime, endTime)
require.NoError(t, err)
require.Len(t, rpsTotals, 3)
// Verify the totals.
for row := 0; row < daemons; row++ {
interval := rpsTotals[row]
daemon := row + 1
require.EqualValues(t, daemon, interval.KeaDaemonID)
require.EqualValues(t, 10, interval.Duration)
var expResponses int64
for interval := 1; interval <= 2; interval++ {
expResponses += int64(interval) * int64(math.Pow10(daemon))
}
require.EqualValues(t, expResponses, interval.Responses)
// Now check the RPS values when pulled for a single daemon.
checkIntervalPerDaemon(t, db, startTime, endTime,
interval.KeaDaemonID, interval.Responses, interval.Duration)
}
// Fetch totals for a time frame containing only the middle three intervals
startTime = timeZero.Add(time.Duration(5) * time.Second)
endTime = startTime.Add(time.Duration(10) * time.Second)
rpsTotals, err = GetTotalRpsOverInterval(db, startTime, endTime)
require.NoError(t, err)
require.Len(t, rpsTotals, 3)
intervals = 2
// Verify the totals.
for row := 0; row < daemons; row++ {
interval := rpsTotals[row]
daemon := row + 1
require.EqualValues(t, daemon, interval.KeaDaemonID)
require.EqualValues(t, 15, interval.Duration)
expDuration = int64(intervals * 5)
for daemon := 1; daemon <= daemons; daemon++ {
var expResponses int64
for interval := 2; interval <= 4; interval++ {
for interval := 1; interval <= intervals; interval++ {
expResponses += int64(interval) * int64(math.Pow10(daemon))
}
require.EqualValues(t, expResponses, interval.Responses)
// Now check the RPS values when pulled for a single daemon.
checkIntervalPerDaemon(t, db, startTime, endTime,
interval.KeaDaemonID, interval.Responses, interval.Duration)
int64(daemon), expResponses, expDuration)
}
// Fetch totals for a time frame containing only the last interval
startTime = timeZero.Add(time.Duration(20) * time.Second)
endTime = storkutil.UTCNow().Round(time.Second)
rpsTotals, err = GetTotalRpsOverInterval(db, startTime, endTime)
require.NoError(t, err)
require.Len(t, rpsTotals, 3)
intervals = 1
// Verify the totals.
for row := 0; row < daemons; row++ {
interval := rpsTotals[row]
daemon := row + 1
require.EqualValues(t, daemon, interval.KeaDaemonID)
require.EqualValues(t, 5, interval.Duration)
expResponses := int64(5) * int64(math.Pow10(daemon))
require.EqualValues(t, expResponses, interval.Responses)
expDuration = int64(intervals * 5)
for daemon := 1; daemon <= daemons; daemon++ {
var expResponses int64
for interval := 1; interval <= intervals; interval++ {
expResponses += int64(5) * int64(math.Pow10(daemon))
}
// Now check the RPS values when pulled for a single daemon.
checkIntervalPerDaemon(t, db, startTime, endTime,
interval.KeaDaemonID, interval.Responses, interval.Duration)
int64(daemon), expResponses, expDuration)
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment