Commit e6aee8db authored by Petr Špaček's avatar Petr Špaček
Browse files

Merge branch 'config' into 'main'

configuration  file

See merge request !2
parents e447d2e7 96967436
Pipeline #106231 passed with stage
in 1 minute and 24 seconds
[flake8]
# default from black
max-line-length = 88
extend-ignore = E203, E501
__pychache__
*.pyc
......@@ -17,4 +17,4 @@ pylint:
mypy:
script:
- mypy --ignore-missing-imports $(git ls-files '*.py')
- mypy --install-types --non-interactive --ignore-missing-imports $(git ls-files '*.py')
......@@ -16,31 +16,14 @@ from pathlib import Path
import signal
import sys
import time
from typing import List, Optional, Union
from typing import List, Optional, Union, Tuple
# pylint: disable=wrong-import-order,wrong-import-position
sys.path.insert(0, str(Path(__file__).resolve().parent))
import constants # noqa
from resmon import constants, config # noqa
# pylint: enable=wrong-import-order,wrong-import-position
# files to watch for each matching cgroup separately
PATHS_CGROUP = [
"io.pressure",
"io.stat",
"cpu.pressure",
"cpu.stat",
"memory.pressure",
"memory.stat",
"memory.current",
]
PATHS_SYSTEMWIDE = [
"/proc/net/dev",
"/proc/net/sockstat",
"/proc/net/sockstat6",
"/proc/diskstats",
"/proc/stat",
]
# globals for signal handler
RUNNING = True
PRODUCERS = [] # type: List[asyncio.Task]
......@@ -67,21 +50,35 @@ async def read_to_json(fileobj) -> str:
return json.dumps(result)
async def watch_file(samples_q: asyncio.Queue, file_path: str, interval: float) -> None:
async def watch_file(
samples_q: asyncio.Queue, file_path: Path, interval: float
) -> None:
"""Sample file and put log records into queue."""
with open(file_path, encoding="utf-8") as fileobj:
try:
while True:
record = await read_to_json(fileobj)
fileobj.seek(0)
await samples_q.put(record)
await asyncio.sleep(interval)
except asyncio.exceptions.CancelledError:
return
except OSError as ex:
logging.critical("file %s went away, terminating (%s)", fileobj.name, ex)
sigint(None, None)
return
try:
with open(file_path, encoding="utf-8") as fileobj:
try:
while True:
record = await read_to_json(fileobj)
fileobj.seek(0)
await samples_q.put(record)
await asyncio.sleep(interval)
except asyncio.exceptions.CancelledError:
return
except OSError as ex:
logging.critical(
"file %s went away, terminating (%s)", fileobj.name, ex
)
sigint(None, None)
return
except FileNotFoundError as ex:
logging.critical(
"file %s not found, terminating (%s)\n\n"
"If the file above is a cgroup file, refer to README for possible solution",
file_path,
ex,
)
sigint(None, None)
return
async def write_queue(output_fobj, samples_q: asyncio.Queue) -> None:
......@@ -92,13 +89,14 @@ async def write_queue(output_fobj, samples_q: asyncio.Queue) -> None:
samples_q.task_done()
async def watch_and_write(interval: float, output: Optional[str], paths: List[str]):
async def watch_and_write(output: Optional[str], paths: List[Tuple[Path, float]]):
"""Watch all specified files in paralell and periodically dump them into output."""
samples_q = asyncio.Queue() # type: asyncio.Queue
global PRODUCERS # signal handler, pylint: disable=global-statement
PRODUCERS = [
asyncio.create_task(watch_file(samples_q, path, interval)) for path in paths
asyncio.create_task(watch_file(samples_q, path, interval))
for path, interval in paths
]
if output is None:
output_fobj = sys.stdout
......@@ -116,7 +114,9 @@ async def watch_and_write(interval: float, output: Optional[str], paths: List[st
consumer.cancel()
def get_cgroup_paths(base: Union[str, Path], glob=None):
def get_cgroup_paths(
base: Union[str, Path], cgroup_filenames: List[Tuple[str, float]], glob=None
) -> List[Tuple[Path, float]]:
"""
Generate paths to statistics files for cgroups matching glob.
glob=None generates paths for the base cgroup.
......@@ -132,44 +132,53 @@ def get_cgroup_paths(base: Union[str, Path], glob=None):
else:
cgrps = [base_path]
cgroup_paths = []
for cgrp in cgrps:
for file in PATHS_CGROUP:
yield cgrp / file
for filename, interval in cgroup_filenames:
cgroup_paths.append((cgrp / filename, interval))
return cgroup_paths
def wait_for_files(cgroup_base_dir: Path, cgroup_glob: Optional[str]):
def wait_for_files(
cgroup_base_dir: Path,
systemwide_paths: List[Tuple[Path, float]],
cgroup_filenames: List[Tuple[str, float]],
cgroup_glob: Optional[str],
) -> List[Tuple[Path, float]]:
"""
Wait until at least one cgroup matching specified glob exists,
and return all paths to statistical files.
Wait until at least one cgroup matching specified glob exists, and return
all paths to statistical files and the intervals to watch them.
"""
paths = list(systemwide_paths)
while RUNNING:
try:
paths = PATHS_SYSTEMWIDE + list(
get_cgroup_paths(cgroup_base_dir, cgroup_glob)
cgroup_paths = get_cgroup_paths(
cgroup_base_dir, cgroup_filenames, cgroup_glob
)
return paths
except ValueError as ex:
logging.info("waiting: %s", ex)
time.sleep(0.1)
else:
return paths + cgroup_paths
return paths # fallback return to cover all branches of code
def main():
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(message)s")
signal.signal(signal.SIGINT, sigint)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--interval",
type=float,
default=1,
help="interval between samples in seconds",
conf_action = parser.add_argument(
"-c",
"--config",
type=Path,
default=constants.RESMON_CONFIG_PATH,
action=config.ConfigAction,
help="configuration file",
)
parser.add_argument(
"--cgroup-base-dir",
type=Path,
default="/sys/fs/cgroup/system.slice",
default=constants.CGROUP_BASE_DIR,
help="base cgroup path, glob is applied here",
)
parser.add_argument(
......@@ -186,12 +195,31 @@ def main():
help="output captured resmon JSON file",
)
args = parser.parse_args()
if args.config == constants.RESMON_CONFIG_PATH:
# invoke action that parses config manually (it is not applied to default=)
conf_action(parser, args, conf_action.default, "-c/--config")
return args
paths = wait_for_files(args.cgroup_base_dir, args.cgroup_glob)
def main():
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(message)s")
signal.signal(signal.SIGINT, sigint)
args = parse_args()
systemwide_paths = [
(Path(item["path"]), item["interval"])
for item in args.config["systemwide_paths"]
]
cgroup_filenames = [
(item["name"], item["interval"]) for item in args.config["cgroup_files"]
]
paths = wait_for_files(
args.cgroup_base_dir, systemwide_paths, cgroup_filenames, args.cgroup_glob
)
if RUNNING:
logging.debug("gathering content of %s", paths)
asyncio.run(watch_and_write(args.interval, args.output, paths))
asyncio.run(watch_and_write(args.output, paths))
if __name__ == "__main__":
......
#!/usr/bin/python3
from abc import ABC, abstractmethod
import argparse
import collections
import json
import logging
from pathlib import Path
import re
import sys
from typing import Dict, List, Tuple, Union
# pylint: disable=wrong-import-order,wrong-import-position
sys.path.insert(0, str(Path(__file__).resolve().parent))
import constants # noqa
from resmon import constants, config, parsers # noqa
# pylint: enable=wrong-import-order,wrong-import-position
......@@ -23,346 +22,37 @@ def load_json(infile):
yield json.loads(line)
class StatParser(ABC):
@abstractmethod
def parse(self, record):
"""Stateless parser: Parse whole input text an return data items."""
def get_parsers(conf) -> Dict[Union[str, Path], parsers.StatParser]:
path_parsers = {}
@abstractmethod
def process(self, record):
"""
Transform one record into zero or more data points in format:
(metric name, timestamp from, timestamp to, value)
Point in time metrics: timestamp from == to
Cumulative metrics: timestamp (from, to]
For cumulative values, do not yield anything if parser currently
does not have enough data yet.
"""
def add_parser(key: Union[str, Path], parser_name: str):
try:
parser = getattr(parsers, parser_name)
except AttributeError:
logging.warning("unknown parser: %s", parser_name)
else:
path_parsers[key] = parser
for item in conf["systemwide_paths"]:
try:
key = item["path"]
parser_name = item["parser"]
except KeyError:
continue
add_parser(Path(key), parser_name)
class MemoryCurrent(StatParser):
"""parse current memory.current value (point in time), stateless"""
def __init__(self, _):
pass
def parse(self, record):
return int(record["text"])
def process(self, record):
yield ("memory.current", record["ts"], record["ts"], self.parse(record))
class SockStat(StatParser):
"""parse socket statistics, stateless"""
def __init__(self, _):
pass
def parse(self, record):
data = {} # protocol: metric: value
for line in record["text"].split("\n"):
if not line:
continue
protocol, metrics_text = line.split(": ")
metrics = metrics_text.split()
assert len(metrics) >= 2 and len(metrics) % 2 == 0
data[protocol] = {}
for m_idx in range(0, len(metrics), 2):
metric_name = metrics[m_idx]
metric_val = int(metrics[m_idx + 1])
data[protocol][metric_name] = metric_val
return data
def process(self, record):
data = self.parse(record)
for protocol, metrics in data.items():
for metric, value in metrics.items():
yield (
f"sockstat.{protocol}.{metric}",
record["ts"],
record["ts"],
value,
)
class Pressure(StatParser):
"""parse cpu/memory/io.some values; cumulative values"""
regex = re.compile("^some.*total=([0-9]+)$", flags=re.MULTILINE)
name = ""
def __init__(self, record):
self.last_ts = record["ts"]
self.last_total = self.parse(record)
def parse(self, record):
m = self.regex.search(record["text"])
assert m, "pressure file format did not match"
return int(m.group(1))
def process(self, record):
now = record["ts"]
if now == self.last_ts:
return # nothing to do, we need another data point
new_total = self.parse(record)
percent = (new_total - self.last_total) / (now - self.last_ts) / 1000000 * 100
yield (f"{self.name}.pressure.some", self.last_ts, now, percent)
self.last_ts = now
self.last_total = new_total
class CPUPressure(Pressure):
name = "cpu"
class IOPressure(Pressure):
name = "io"
class MemoryPressure(Pressure):
name = "memory"
class NetworkDevIO(StatParser):
def __init__(self, record):
self.last_ts = record["ts"]
self.last_data = self.parse(record)
def parse(self, record):
data = {} # interface -> tx/rx-stat -> value
lines = record["text"].split("\n")[2:]
column_names = (
"rx.bytes",
"rx.packets",
"rx.errs",
"rx.drop",
"rx.fifo",
"rx.frame",
"rx.compressed",
"rx.multicast",
"tx.bytes",
"tx.packets",
"tx.errs",
"tx.drop",
"tx.fifo",
"tx.colls",
"tx.carrier",
"tx.compressed",
)
for line in lines:
if not line:
continue
in_columns = line.split()
assert in_columns[0][-1] == ":", "unexpected interface name"
iface_name = in_columns[0][:-1]
assert len(column_names) == len(in_columns) - 1, "unexpected columns"
num_columns = list(int(val) for val in in_columns[1:])
data[iface_name] = dict(zip(column_names, num_columns))
return data
def process(self, record):
now = record["ts"]
if now == self.last_ts:
return # nothing to do, we need another data point
new_data = self.parse(record)
for iface, iface_data in new_data.items():
if iface not in self.last_data:
continue # new iface, no data
for key in iface_data:
value = iface_data[key] - self.last_data[iface][key]
yield (f"net.{iface}.{key}", self.last_ts, now, value)
self.last_ts = now
self.last_data = new_data
class CPUStat(StatParser):
def __init__(self, record):
self.last_ts = record["ts"]
self.last_data = self.parse(record)
def parse(self, record):
data = {} # metric -> value
lines = record["text"].split("\n")
line_names = ("usage_usec", "user_usec", "system_usec")
for line in lines:
if not line:
continue
in_columns = line.split()
assert len(in_columns) == 2, "unexpected line format"
name, value = in_columns
if name not in line_names:
continue
data[name] = int(value)
return data
def process(self, record):
now = record["ts"]
if now == self.last_ts:
return # nothing to do, we need another data point
new_data = self.parse(record)
for key, cpu_data in new_data.items():
utilization_pct = (
(cpu_data - self.last_data[key]) / 10**6 / (now - self.last_ts) * 100
)
yield (
f'cpu.{key.replace("usec", "percent")}.cg',
self.last_ts,
now,
utilization_pct,
)
self.last_ts = now
self.last_data = new_data
class DiskStat(StatParser):
def __init__(self, record):
self.last_ts = record["ts"]
self.last_data = self.parse(record)
self.keys_with_abs_values = set(["io.inprogress"])
def parse(self, record):
data = {}
lines = record["text"].split("\n")[2:]
column_names = ( # '_major', '_minor', '_device',
"read.completed", # count, cumulative
"read.merged", # count, cumulative
"read.sectors", # count, cumulative
"read.time", # ms, cumulative
"write.completed", # count, cumulative
"write.merged", # count, cumulative
"write.sectors", # count, cumulative
"write.time", # ms, cumulative
"io.inprogress", # count, at the moment
"io.time", # ms, cumulative
"io.time.weighted", # count, cumulative
"discard.completed", # count, cumulative
"discard.merged", # count, cumulative
"discard.sectors", # count, cumulative
"discard.time", # ms, cumulative
"flush.completed", # count, cumulative
"flush.time", # ms, cumulative
)
for line in lines:
if not line:
continue
in_columns = line.split()
disk_name = in_columns[2]
assert len(column_names) == len(in_columns) - 3, "unexpected columns"
num_columns = list(int(val) for val in in_columns[3:])
data[disk_name] = dict(zip(column_names, num_columns))
return data
def process(self, record):
now = record["ts"]
if now == self.last_ts:
return # nothing to do, we need another data point
new_data = self.parse(record)
for disk_name, disk_data in new_data.items():
if disk_name not in self.last_data:
continue # new disk_name, no data
for key in disk_data:
if key in self.keys_with_abs_values:
new_val = disk_data[key]
else: # cumulative metric
new_val = disk_data[key] - self.last_data[disk_name][key]
yield (f"disk.{disk_name}.{key}", self.last_ts, now, new_val)
self.last_ts = now
self.last_data = new_data
class DeltaDict(dict):
def __sub__(self, other):
assert len(self) == len(other)
return DeltaDict({key: my_val - other[key] for key, my_val in self.items()})
class ProcStat(StatParser):
"""/proc/stat; cumulative values"""
def __init__(self, record):
self.last_ts = record["ts"]
self.last_data = self.parse(record)
def parse(self, record):
data = {} # metric -> value columns
lines = record["text"].split("\n")
# taken from man 5 proc
cpu_column_names = (
"user",
"nice",
"system",
"idle",
"iowait",
"irq",
"softirq",
"steal",
"guest",
"guest_nice",
)
for line in lines:
if not line:
continue
in_columns = line.split()
name = in_columns[0]
if not name.startswith("cpu"):
continue
assert len(in_columns) >= 11, "unexpected line format"
data[name] = DeltaDict(
{
col_name: int(value)
for col_name, value in zip(cpu_column_names, in_columns[1:])
}
)
return data
def process(self, record):
now = record["ts"]
if now == self.last_ts:
return # nothing to do, we need another data point
new_data = self.parse(record)
n_cpus = len(new_data) - 1 # - total
for cpu_name, cpu_data in new_data.items():
assert cpu_name in self.last_data, f"CPU {cpu_name} disappeared"
cpu_deltas = cpu_data - self.last_data[cpu_name]
cpu_sumtime = sum(cpu_deltas.values())
for metric, value in cpu_deltas.items():
percent = value / cpu_sumtime * 100
if cpu_name == "cpu":
# when dealing with aggregate stats, assume 100 % per 1 CPU
percent *= n_cpus
yield (f"{cpu_name}.{metric}_percent", self.last_ts, now, percent)
self.last_ts = now
self.last_data = new_data
for item in conf["cgroup_files"]:
try:
key = item["name"]
parser_name = item["parser"]
except KeyError:
continue
add_parser(key, parser_name)
path_parsers = {
"memory.current": MemoryCurrent,
"memory.pressure": MemoryPressure,
"cpu.pressure": CPUPressure,
"cpu.stat": CPUStat,
"io.pressure": IOPressure,
"dev": NetworkDevIO,
# 'diskstats': DiskStat,
"sockstat": SockStat,
"sockstat6": SockStat,
"stat": ProcStat,
}
return path_parsers
def parse_all(infile):
def parse_all(infile, path_parsers: Dict[Union[str, Path], parsers.StatParser]):