Commit d72fdded authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge #2737

Using of constants from proto_defs instead of hard-coded magic keywords.
parents 75af5cd5 d3a0ac22
......@@ -42,6 +42,7 @@ import random
import time
import signal
from isc.config import ccsession
import isc.cc.proto_defs
import isc.util.process
import isc.net.parse
from optparse import OptionParser, OptionValueError
......@@ -459,7 +460,8 @@ class CommandControl():
else:
return rcode, {}
else:
errstr = str(answer['result'][1])
errstr = \
str(answer[isc.cc.proto_defs.CC_PAYLOAD_RESULT][1])
except ccsession.ModuleCCSessionError as mcse:
errstr = str("Error in ccsession answer:") + str(mcse)
......
......@@ -143,7 +143,7 @@ class SubscriptionManager:
this group, instance pair. This includes wildcard subscriptions."""
target = (group, instance)
partone = self.find_sub(group, instance)
parttwo = self.find_sub(group, "*")
parttwo = self.find_sub(group, CC_INSTANCE_WILDCARD)
return list(set(partone + parttwo))
class MsgQ:
......@@ -429,19 +429,19 @@ class MsgQ:
"""Process a single command. This will split out into one of the
other functions."""
logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
cmd = routing["type"]
if cmd == 'send':
cmd = routing[CC_HEADER_TYPE]
if cmd == CC_COMMAND_SEND:
self.process_command_send(sock, routing, data)
elif cmd == 'subscribe':
elif cmd == CC_COMMAND_SUBSCRIBE:
self.process_command_subscribe(sock, routing, data)
elif cmd == 'unsubscribe':
elif cmd == CC_COMMAND_UNSUBSCRIBE:
self.process_command_unsubscribe(sock, routing, data)
elif cmd == 'getlname':
elif cmd == CC_COMMAND_GET_LNAME:
self.process_command_getlname(sock, routing, data)
elif cmd == 'ping':
elif cmd == CC_COMMAND_PING:
# Command for testing purposes
self.process_command_ping(sock, routing, data)
elif cmd == 'stop':
elif cmd == CC_COMMAND_STOP:
self.stop()
else:
logger.error(MSGQ_INVALID_CMD, cmd)
......@@ -570,11 +570,12 @@ class MsgQ:
return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
def process_command_ping(self, sock, routing, data):
self.sendmsg(sock, { "type" : "pong" }, data)
self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
def process_command_getlname(self, sock, routing, data):
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_GET_LNAME },
{ CC_PAYLOAD_LNAME : lname })
def process_command_send(self, sock, routing, data):
group = routing[CC_HEADER_GROUP]
......@@ -638,15 +639,15 @@ class MsgQ:
self.send_prepared_msg(sock, errmsg)
def process_command_subscribe(self, sock, routing, data):
group = routing["group"]
instance = routing["instance"]
group = routing[CC_HEADER_GROUP]
instance = routing[CC_HEADER_INSTANCE]
if group == None or instance == None:
return # ignore invalid packets entirely
self.subs.subscribe(group, instance, sock)
def process_command_unsubscribe(self, sock, routing, data):
group = routing["group"]
instance = routing["instance"]
group = routing[CC_HEADER_GROUP]
instance = routing[CC_HEADER_INSTANCE]
if group == None or instance == None:
return # ignore invalid packets entirely
self.subs.unsubscribe(group, instance, sock)
......
......@@ -34,12 +34,22 @@ const char* const CC_HEADER_WANT_ANSWER = "want_answer";
const char* const CC_HEADER_REPLY = "reply";
// The commands in the "type" header
const char* const CC_COMMAND_SEND = "send";
const char* const CC_COMMAND_SUBSCRIBE = "subscribe";
const char* const CC_COMMAND_UNSUBSCRIBE = "unsubscribe";
const char* const CC_COMMAND_GET_LNAME = "getlname";
const char* const CC_COMMAND_PING = "ping";
const char* const CC_COMMAND_PONG = "pong";
const char* const CC_COMMAND_STOP = "stop";
// The wildcards of some headers
const char* const CC_TO_WILDCARD = "*";
const char* const CC_INSTANCE_WILDCARD = "*";
// Reply codes
const int CC_REPLY_NO_RECPT = -1;
const int CC_REPLY_SUCCESS = 0;
// Payload in the message
const char *const CC_PAYLOAD_LNAME = "lname";
const char *const CC_PAYLOAD_RESULT = "result";
const char *const CC_PAYLOAD_COMMAND = "command";
}
}
......@@ -325,14 +325,14 @@ Session::establish(const char* socket_file) {
//
// send a request for our local name, and wait for a response
//
ConstElementPtr get_lname_msg =
Element::fromJSON("{ \"type\": \"getlname\" }");
ElementPtr get_lname_msg(Element::createMap());
get_lname_msg->set(CC_HEADER_TYPE, Element::create(CC_COMMAND_GET_LNAME));
sendmsg(get_lname_msg);
ConstElementPtr routing, msg;
recvmsg(routing, msg, false);
impl_->lname_ = msg->get("lname")->stringValue();
impl_->lname_ = msg->get(CC_PAYLOAD_LNAME)->stringValue();
LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_LNAME_RECEIVED).arg(impl_->lname_);
// At this point there's no risk of resource leak.
......@@ -387,10 +387,10 @@ Session::recvmsg(ConstElementPtr& env, ConstElementPtr& msg,
for (size_t i = 0; i < impl_->queue_->size(); i++) {
q_el = impl_->queue_->get(i);
if (( seq == -1 &&
!q_el->get(0)->contains("reply")
!q_el->get(0)->contains(CC_HEADER_REPLY)
) || (
q_el->get(0)->contains("reply") &&
q_el->get(0)->get("reply")->intValue() == seq
q_el->get(0)->contains(CC_HEADER_REPLY) &&
q_el->get(0)->get(CC_HEADER_REPLY)->intValue() == seq
)
) {
env = q_el->get(0);
......@@ -429,10 +429,10 @@ Session::recvmsg(ConstElementPtr& env, ConstElementPtr& msg,
ConstElementPtr l_msg =
Element::fromWire(body_wire_stream, length - header_length);
if ((seq == -1 &&
!l_env->contains("reply")
!l_env->contains(CC_HEADER_REPLY)
) || (
l_env->contains("reply") &&
l_env->get("reply")->intValue() == seq
l_env->contains(CC_HEADER_REPLY) &&
l_env->get(CC_HEADER_REPLY)->intValue() == seq
)
) {
env = l_env;
......@@ -453,9 +453,9 @@ Session::subscribe(std::string group, std::string instance) {
LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_SUBSCRIBE).arg(group);
ElementPtr env = Element::createMap();
env->set("type", Element::create("subscribe"));
env->set("group", Element::create(group));
env->set("instance", Element::create(instance));
env->set(CC_HEADER_TYPE, Element::create(CC_COMMAND_SUBSCRIBE));
env->set(CC_HEADER_GROUP, Element::create(group));
env->set(CC_HEADER_INSTANCE, Element::create(instance));
sendmsg(env);
}
......@@ -465,9 +465,9 @@ Session::unsubscribe(std::string group, std::string instance) {
LOG_DEBUG(logger, DBG_TRACE_DETAILED, CC_UNSUBSCRIBE).arg(group);
ElementPtr env = Element::createMap();
env->set("type", Element::create("unsubscribe"));
env->set("group", Element::create(group));
env->set("instance", Element::create(instance));
env->set(CC_HEADER_TYPE, Element::create(CC_COMMAND_UNSUBSCRIBE));
env->set(CC_HEADER_GROUP, Element::create(group));
env->set(CC_HEADER_INSTANCE, Element::create(instance));
sendmsg(env);
}
......@@ -516,13 +516,17 @@ Session::reply(ConstElementPtr envelope, ConstElementPtr newmsg) {
ElementPtr env = Element::createMap();
long int nseq = ++impl_->sequence_;
env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(envelope->get("from")->stringValue()));
env->set("group", Element::create(envelope->get("group")->stringValue()));
env->set("instance", Element::create(envelope->get("instance")->stringValue()));
env->set("seq", Element::create(nseq));
env->set("reply", Element::create(envelope->get("seq")->intValue()));
env->set(CC_HEADER_TYPE, Element::create(CC_COMMAND_SEND));
env->set(CC_HEADER_FROM, Element::create(impl_->lname_));
env->set(CC_HEADER_TO,
Element::create(envelope->get(CC_HEADER_FROM)->stringValue()));
env->set(CC_HEADER_GROUP,
Element::create(envelope->get(CC_HEADER_GROUP)->stringValue()));
env->set(CC_HEADER_INSTANCE,
Element::create(envelope->get(CC_HEADER_INSTANCE)->stringValue()));
env->set(CC_HEADER_SEQ, Element::create(nseq));
env->set(CC_HEADER_REPLY,
Element::create(envelope->get(CC_HEADER_SEQ)->intValue()));
sendmsg(env, newmsg);
......
......@@ -57,10 +57,10 @@ namespace config {
/// Creates a standard config/command protocol answer message
ConstElementPtr
createAnswer() {
ElementPtr answer = Element::fromJSON("{\"result\": [] }");
ElementPtr answer = Element::createMap();
ElementPtr answer_content = Element::createList();
answer_content->add(Element::create(0));
answer->set("result", answer_content);
answer_content->add(Element::create(isc::cc::CC_REPLY_SUCCESS));
answer->set(isc::cc::CC_PAYLOAD_RESULT, answer_content);
return (answer);
}
......@@ -70,22 +70,22 @@ createAnswer(const int rcode, ConstElementPtr arg) {
if (rcode != 0 && (!arg || arg->getType() != Element::string)) {
isc_throw(CCSessionError, "Bad or no argument for rcode != 0");
}
ElementPtr answer = Element::fromJSON("{\"result\": [] }");
ElementPtr answer = Element::createMap();
ElementPtr answer_content = Element::createList();
answer_content->add(Element::create(rcode));
answer_content->add(arg);
answer->set("result", answer_content);
answer->set(isc::cc::CC_PAYLOAD_RESULT, answer_content);
return (answer);
}
ConstElementPtr
createAnswer(const int rcode, const std::string& arg) {
ElementPtr answer = Element::fromJSON("{\"result\": [] }");
ElementPtr answer = Element::createMap();
ElementPtr answer_content = Element::createList();
answer_content->add(Element::create(rcode));
answer_content->add(Element::create(arg));
answer->set("result", answer_content);
answer->set(isc::cc::CC_PAYLOAD_RESULT, answer_content);
return (answer);
}
......@@ -94,8 +94,8 @@ ConstElementPtr
parseAnswer(int &rcode, ConstElementPtr msg) {
if (msg &&
msg->getType() == Element::map &&
msg->contains("result")) {
ConstElementPtr result = msg->get("result");
msg->contains(isc::cc::CC_PAYLOAD_RESULT)) {
ConstElementPtr result = msg->get(isc::cc::CC_PAYLOAD_RESULT);
if (result->getType() != Element::list) {
isc_throw(CCSessionError, "Result element in answer message is not a list");
} else if (result->get(0)->getType() != Element::integer) {
......@@ -133,7 +133,7 @@ createCommand(const std::string& command, ConstElementPtr arg) {
if (arg) {
cmd_parts->add(arg);
}
cmd->set("command", cmd_parts);
cmd->set(isc::cc::CC_PAYLOAD_COMMAND, cmd_parts);
return (cmd);
}
......@@ -141,8 +141,8 @@ std::string
parseCommand(ConstElementPtr& arg, ConstElementPtr command) {
if (command &&
command->getType() == Element::map &&
command->contains("command")) {
ConstElementPtr cmd = command->get("command");
command->contains(isc::cc::CC_PAYLOAD_COMMAND)) {
ConstElementPtr cmd = command->get(isc::cc::CC_PAYLOAD_COMMAND);
if (cmd->getType() == Element::list &&
cmd->size() > 0 &&
cmd->get(0)->getType() == Element::string) {
......@@ -463,10 +463,13 @@ ModuleCCSession::ModuleCCSession(
isc_throw(CCSessionInitError, answer->str());
}
setLocalConfig(Element::fromJSON("{}"));
setLocalConfig(Element::createMap());
// get any stored configuration from the manager
if (config_handler_) {
ConstElementPtr cmd = Element::fromJSON("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }");
ConstElementPtr cmd =
createCommand("get_config",
Element::fromJSON("{\"module_name\":\"" +
module_name_ + "\"}"));
seq = session_.group_sendmsg(cmd, "ConfigManager");
session_.group_recvmsg(env, answer, false, seq);
ConstElementPtr new_config = parseAnswer(rcode, answer);
......@@ -608,14 +611,16 @@ ModuleCCSession::checkCommand() {
/* ignore result messages (in case we're out of sync, to prevent
* pingpongs */
if (data->getType() != Element::map || data->contains("result")) {
if (data->getType() != Element::map ||
data->contains(isc::cc::CC_PAYLOAD_RESULT)) {
return (0);
}
ConstElementPtr arg;
ConstElementPtr answer;
try {
std::string cmd_str = parseCommand(arg, data);
std::string target_module = routing->get("group")->stringValue();
std::string target_module =
routing->get(isc::cc::CC_HEADER_GROUP)->stringValue();
if (cmd_str == "config_update") {
answer = checkConfigUpdateCommand(target_module, arg);
} else {
......@@ -832,19 +837,19 @@ bool
ModuleCCSession::requestMatch(const AsyncRecvRequest& request,
const ConstElementPtr& envelope) const
{
if (request.is_reply != envelope->contains("reply")) {
if (request.is_reply != envelope->contains(isc::cc::CC_HEADER_REPLY)) {
// Wrong type of message
return (false);
}
if (request.is_reply &&
(request.seq == -1 ||
request.seq == envelope->get("reply")->intValue())) {
request.seq == envelope->get(isc::cc::CC_HEADER_REPLY)->intValue())) {
// This is the correct reply
return (true);
}
if (!request.is_reply &&
(request.recipient.empty() ||
request.recipient == envelope->get("group")->stringValue())) {
(request.recipient.empty() || request.recipient ==
envelope->get(isc::cc::CC_HEADER_GROUP)->stringValue())) {
// This is the correct command
return (true);
}
......
......@@ -369,8 +369,8 @@ public:
*/
int groupSendMsg(isc::data::ConstElementPtr msg,
std::string group,
std::string instance = "*",
std::string to = "*",
std::string instance = isc::cc::CC_INSTANCE_WILDCARD,
std::string to = isc::cc::CC_TO_WILDCARD,
bool want_answer = false) {
return (session_.group_sendmsg(msg, group, instance, to, want_answer));
};
......
......@@ -57,11 +57,11 @@ class Session:
try:
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._socket.connect(self.socket_file)
self.sendmsg({ "type": "getlname" })
self.sendmsg({ CC_HEADER_TYPE: CC_COMMAND_GET_LNAME })
env, msg = self.recvmsg(False)
if not env:
raise ProtocolError("Could not get local name")
self._lname = msg["lname"]
self._lname = msg[CC_PAYLOAD_LNAME]
if not self._lname:
raise ProtocolError("Could not get local name")
logger.debug(logger.DBGLVL_TRACE_BASIC, PYCC_LNAME_RECEIVED,
......@@ -125,9 +125,10 @@ class Session:
if len(self._queue) > 0:
i = 0;
for env, msg in self._queue:
if seq != None and "reply" in env and seq == env["reply"]:
if seq != None and CC_HEADER_REPLY in env and \
seq == env[CC_HEADER_REPLY]:
return self._queue.pop(i)
elif seq == None and "reply" not in env:
elif seq == None and CC_HEADER_REPLY not in env:
return self._queue.pop(i)
else:
i = i + 1
......@@ -141,7 +142,9 @@ class Session:
if data_length > 0:
env = isc.cc.message.from_wire(data[2:header_length+2])
msg = isc.cc.message.from_wire(data[header_length + 2:])
if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
if (seq == None and CC_HEADER_REPLY not in env) or \
(seq != None and CC_HEADER_REPLY in env and
seq == env[CC_HEADER_REPLY]):
return env, msg
else:
self._queue.append((env,msg))
......@@ -248,18 +251,18 @@ class Session:
self._sequence += 1
return self._sequence
def group_subscribe(self, group, instance = "*"):
def group_subscribe(self, group, instance=CC_INSTANCE_WILDCARD):
self.sendmsg({
"type": "subscribe",
"group": group,
"instance": instance,
CC_HEADER_TYPE: CC_COMMAND_SUBSCRIBE,
CC_HEADER_GROUP: group,
CC_HEADER_INSTANCE: instance,
})
def group_unsubscribe(self, group, instance = "*"):
def group_unsubscribe(self, group, instance=CC_INSTANCE_WILDCARD):
self.sendmsg({
"type": "unsubscribe",
"group": group,
"instance": instance,
CC_HEADER_TYPE: CC_COMMAND_UNSUBSCRIBE,
CC_HEADER_GROUP: group,
CC_HEADER_INSTANCE: instance,
})
def group_sendmsg(self, msg, group, instance=CC_INSTANCE_WILDCARD,
......@@ -308,13 +311,13 @@ class Session:
def group_reply(self, routing, msg):
seq = self._next_sequence()
self.sendmsg({
"type": "send",
"from": self._lname,
"to": routing["from"],
"group": routing["group"],
"instance": routing["instance"],
"seq": seq,
"reply": routing["seq"],
CC_HEADER_TYPE: CC_COMMAND_SEND,
CC_HEADER_FROM: self._lname,
CC_HEADER_TO: routing[CC_HEADER_FROM],
CC_HEADER_GROUP: routing[CC_HEADER_GROUP],
CC_HEADER_INSTANCE: routing[CC_HEADER_INSTANCE],
CC_HEADER_SEQ: seq,
CC_HEADER_REPLY: routing[CC_HEADER_SEQ],
}, isc.cc.message.to_wire(msg))
return seq
......
......@@ -82,22 +82,22 @@ def parse_answer(msg):
containing an error message"""
if type(msg) != dict:
raise ModuleCCSessionError("Answer message is not a dict: " + str(msg))
if 'result' not in msg:
if CC_PAYLOAD_RESULT not in msg:
raise ModuleCCSessionError("answer message does not contain 'result' element")
elif type(msg['result']) != list:
elif type(msg[CC_PAYLOAD_RESULT]) != list:
raise ModuleCCSessionError("wrong result type in answer message")
elif len(msg['result']) < 1:
elif len(msg[CC_PAYLOAD_RESULT]) < 1:
raise ModuleCCSessionError("empty result list in answer message")
elif type(msg['result'][0]) != int:
elif type(msg[CC_PAYLOAD_RESULT][0]) != int:
raise ModuleCCSessionError("wrong rcode type in answer message")
else:
if len(msg['result']) > 1:
if (msg['result'][0] != CC_REPLY_SUCCESS and
type(msg['result'][1]) != str):
if len(msg[CC_PAYLOAD_RESULT]) > 1:
if (msg[CC_PAYLOAD_RESULT][0] != CC_REPLY_SUCCESS and
type(msg[CC_PAYLOAD_RESULT][1]) != str):
raise ModuleCCSessionError("rcode in answer message is non-zero, value is not a string")
return msg['result'][0], msg['result'][1]
return msg[CC_PAYLOAD_RESULT][0], msg[CC_PAYLOAD_RESULT][1]
else:
return msg['result'][0], None
return msg[CC_PAYLOAD_RESULT][0], None
def create_answer(rcode, arg = None):
"""Creates an answer packet for config&commands. rcode must be an
......@@ -109,9 +109,9 @@ def create_answer(rcode, arg = None):
if rcode != CC_REPLY_SUCCESS and type(arg) != str:
raise ModuleCCSessionError("arg in create_answer for rcode != 0 must be a string describing the error")
if arg != None:
return { 'result': [ rcode, arg ] }
return { CC_PAYLOAD_RESULT: [ rcode, arg ] }
else:
return { 'result': [ rcode ] }
return { CC_PAYLOAD_RESULT: [ rcode ] }
# 'fixed' commands
"""Fixed names for command and configuration messages"""
......@@ -133,7 +133,7 @@ def parse_command(msg):
string. If it is not, this function returns None, None"""
if type(msg) == dict and len(msg.items()) == 1:
cmd, value = msg.popitem()
if cmd == "command" and type(value) == list:
if cmd == CC_PAYLOAD_COMMAND and type(value) == list:
if len(value) == 1 and type(value[0]) == str:
return value[0], None
elif len(value) > 1 and type(value[0]) == str:
......@@ -150,7 +150,7 @@ def create_command(command_name, params = None):
cmd = [ command_name ]
if params:
cmd.append(params)
msg = { 'command': cmd }
msg = { CC_PAYLOAD_COMMAND: cmd }
return msg
def default_logconfig_handler(new_config, config_data):
......@@ -215,7 +215,7 @@ class ModuleCCSession(ConfigData):
self._session = Session(socket_file)
else:
self._session = cc_session
self._session.group_subscribe(self._module_name, "*")
self._session.group_subscribe(self._module_name, CC_INSTANCE_WILDCARD)
self._remote_module_configs = {}
self._remote_module_callbacks = {}
......@@ -228,7 +228,8 @@ class ModuleCCSession(ConfigData):
# If the CC Session obejct has been closed, it returns
# immediately.
if self._session._closed: return
self._session.group_unsubscribe(self._module_name, "*")
self._session.group_unsubscribe(self._module_name,
CC_INSTANCE_WILDCARD)
for module_name in self._remote_module_configs:
self._session.group_unsubscribe(module_name)
......@@ -294,10 +295,10 @@ class ModuleCCSession(ConfigData):
functions if present. Responds on the channel if the
handler returns a message."""
# should we default to an answer? success-by-default? unhandled error?
if msg is not None and not 'result' in msg:
if msg is not None and not CC_PAYLOAD_RESULT in msg:
answer = None
try:
module_name = env['group']
module_name = env[CC_HEADER_GROUP]
cmd, arg = isc.config.ccsession.parse_command(msg)
if cmd == COMMAND_CONFIG_UPDATE:
new_config = arg
......
......@@ -28,6 +28,7 @@ import tempfile
import json
import errno
from isc.cc import data
from isc.cc.proto_defs import *
from isc.config import ccsession, config_data, module_spec
from isc.util.file import path_search
import bind10_config
......@@ -603,7 +604,7 @@ class ConfigManager:
# ignore 'None' value (even though they should not occur)
# and messages that are answers to questions we did
# not ask
if msg is not None and not 'result' in msg:
if msg is not None and not CC_PAYLOAD_RESULT in msg:
answer = self.handle_msg(msg);
# Only respond if there actually is something to respond with
if answer is not None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment