agent.py


一般来说,neutron中的RPC API分为两个部分,client端和server端,DHCP agent包括一个client API,neutron.agent.dhcp.agent.DhcpPluginAPI。DHCP agent利用这个类回调在neutron server端的远程方法,这个server端定义在neutron.api.rpc.handlers.dhcp_rpc.DhcpRpcCallback,通常由 neutron plugin来决定是否暴露这个DhcpRpcCallback接口。

DhcpAgent类

该类主要继承自manager.Manager,该类中的方法主要实现了RPC接口中的server端,neutron server(RESTful API)主要利用neutron.api.rpc.agentnotifiers.dhcp_rpc_agent_api.DhcpAgentNotifyApi来作为client端执行这里的方法。

# RPC消息通信方式,一般是RabbitMQ实现
target = oslo_messaging.Target(version='1.0')

初始化的方法主要读取配置文件中的driver参数。

    def __init__(self, host=None, conf=None):
        super(DhcpAgent, self).__init__(host=host)
        self.needs_resync_reasons = collections.defaultdict(list)
        # 读取配置信息conf
        self.conf = conf or cfg.CONF
        self.cache = NetworkCache()
        self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
        ctx = context.get_admin_context_without_session()
        # 根据这个创建的DhcpPluginApi类,应该是作为client端向plugin发送RPC消息,topic类型为topics.PLUGIN
        self.plugin_rpc = DhcpPluginApi(topics.PLUGIN,
                                        ctx, self.conf.use_namespaces,
                                        self.conf.host)
        # create dhcp dir to store dhcp info
        # DHCP路径 state_path/dhcp
        dhcp_dir = os.path.dirname("/%s/dhcp/" % self.conf.state_path)
        utils.ensure_dir(dhcp_dir)
        # 检查版本信息
        self.dhcp_version = self.dhcp_driver_cls.check_version()
        self._populate_networks_cache()
        self._process_monitor = external_process.ProcessMonitor(
            config=self.conf,
            resource_type='dhcp')
    def init_host(self):
        self.sync_state()

after_start()方法调用run()方法,主要执行两个动作,一是将neutron中状态同步到本地,另外就是创建一个新的协程来周期性的同步状态。

    def after_start(self):
        self.run()
        LOG.info(_LI("DHCP agent started"))

    def run(self):
        """Activate the DHCP agent."""
        # 同步neutron的状态
        self.sync_state()
        # 周期性的同步状态
        self.periodic_resync()

作为client端,调用sync_state()会发出rpc消息给server端的plugin,通过DhcpRpcCallback获取最新的网络状态,然后更新本地信息,调用dnsmasq进程使之生效。该方法在启动后运行一次。 periodic_resync()方法则孵化一个协程来运行_periodic_resync_helper()方法,该函数是一个无限循环,它周期性的调用sync_state()

sync_state()

    def sync_state(self, networks=None):
        """Sync the local DHCP state with Neutron. If no networks are passed,
        or 'None' is one of the networks, sync all of the networks.
        """
        only_nets = set([] if (not networks or None in networks) else networks)
        LOG.info(_LI('Synchronizing state'))
        # 创建一个协程池
        pool = eventlet.GreenPool(self.conf.num_sync_threads)
        # 得到known_network_ids,active_network_ids计算出deleted_id,来获取最新的网络状态
        known_network_ids = set(self.cache.get_network_ids())

        try:
            active_networks = self.plugin_rpc.get_active_networks_info()
            active_network_ids = set(network.id for network in active_networks)
            for deleted_id in known_network_ids - active_network_ids:
                try:
                    self.disable_dhcp_helper(deleted_id)
                except Exception as e:
                    self.schedule_resync(e, deleted_id)
                    LOG.exception(_LE('Unable to sync network state on '
                                      'deleted network %s'), deleted_id)

            for network in active_networks:
                if (not only_nets or  # specifically resync all
                        network.id not in known_network_ids or  # missing net
                        network.id in only_nets):  # specific network to sync
                    pool.spawn(self.safe_configure_dhcp_for_network, network)
            pool.waitall()
            LOG.info(_LI('Synchronizing state complete'))

        except Exception as e:
            if only_nets:
                for network_id in only_nets:
                    self.schedule_resync(e, network_id)
            else:
                self.schedule_resync(e)
            LOG.exception(_LE('Unable to sync network state.'))

_periodic_resync_helper()

    @utils.exception_logger()
    def _periodic_resync_helper(self):
        """Resync the dhcp state at the configured interval."""
        # 无限循环
        while True:
        # 协程的休眠时间为同步间隔
            eventlet.sleep(self.conf.resync_interval)
            if self.needs_resync_reasons:
                # be careful to avoid a race with additions to list
                # from other threads
                reasons = self.needs_resync_reasons
                self.needs_resync_reasons = collections.defaultdict(list)
                for net, r in reasons.items():
                    if not net:
                        net = "*"
                    LOG.debug("resync (%(network)s): %(reason)s",
                              {"reason": r, "network": net})
        # 周期间隔调用sync_state()
                self.sync_state(reasons.keys())
    # 生成同步的协程
    def periodic_resync(self):
        """Spawn a thread to periodically resync the dhcp state."""
        eventlet.spawn(self._periodic_resync_helper)

DhcpPluginApi类

主要实现RPC通信的client端,向server端的plugin发送RPC消息,plugin通过neutron.api.rpc.handlers.dhcp_rpc.DhcpRpcCallback来实现接口

    def __init__(self, topic, context, use_namespaces, host):
        self.context = context
        self.host = host
        self.use_namespaces = use_namespaces
        # RPC消息实例的实现,获得client端的实例
        target = oslo_messaging.Target(
                topic=topic,
                namespace=constants.RPC_NAMESPACE_DHCP_PLUGIN,
                version='1.0')
        self.client = n_rpc.get_client(target)

主要包含了几个方法

    def get_active_networks_info(self):
        """Make a remote process call to retrieve all network info."""
        cctxt = self.client.prepare(version='1.1')
        networks = cctxt.call(self.context, 'get_active_networks_info',
                              host=self.host)
        return [dhcp.NetModel(self.use_namespaces, n) for n in networks]

    def get_network_info(self, network_id):
        """Make a remote process call to retrieve network info."""
        cctxt = self.client.prepare()
        network = cctxt.call(self.context, 'get_network_info',
                             network_id=network_id, host=self.host)
        if network:
            return dhcp.NetModel(self.use_namespaces, network)

    def create_dhcp_port(self, port):
        """Make a remote process call to create the dhcp port."""
        cctxt = self.client.prepare(version='1.1')
        port = cctxt.call(self.context, 'create_dhcp_port',
                          port=port, host=self.host)
        if port:
            return dhcp.DictModel(port)

    def update_dhcp_port(self, port_id, port):
        """Make a remote process call to update the dhcp port."""
        cctxt = self.client.prepare(version='1.1')
        port = cctxt.call(self.context, 'update_dhcp_port',
                          port_id=port_id, port=port, host=self.host)
        if port:
            return dhcp.DictModel(port)

    def release_dhcp_port(self, network_id, device_id):
        """Make a remote process call to release the dhcp port."""
        cctxt = self.client.prepare()
        return cctxt.call(self.context, 'release_dhcp_port',
                          network_id=network_id, device_id=device_id,
                          host=self.host)

    def release_port_fixed_ip(self, network_id, device_id, subnet_id):
        """Make a remote process call to release a fixed_ip on the port."""
        cctxt = self.client.prepare()
        return cctxt.call(self.context, 'release_port_fixed_ip',
                          network_id=network_id, subnet_id=subnet_id,
                          device_id=device_id, host=self.host)

NetworkCache类

主要作为当前network信息的缓存

    def put_port(self, port):
        network = self.get_network_by_id(port.network_id)
        for index in range(len(network.ports)):
            if network.ports[index].id == port.id:
                network.ports[index] = port
                break
        else:
            network.ports.append(port)

        self.port_lookup[port.id] = network.id

    def remove_port(self, port):
        network = self.get_network_by_port_id(port.id)

        for index in range(len(network.ports)):
            if network.ports[index] == port:
                del network.ports[index]
                del self.port_lookup[port.id]
                break

    def get_port_by_id(self, port_id):
        network = self.get_network_by_port_id(port_id)
        if network:
            for port in network.ports:
                if port.id == port_id:
                    return port

    def get_state(self):
        net_ids = self.get_network_ids()
        num_nets = len(net_ids)
        num_subnets = 0
        num_ports = 0
        for net_id in net_ids:
            network = self.get_network_by_id(net_id)
            num_subnets += len(network.subnets)
            num_ports += len(network.ports)
        return {'networks': num_nets,
                'subnets': num_subnets,
                'ports': num_ports}

DhcpAgentWithStateReport

该类继承自DhcpAgent,主要添加了状态汇报。 汇报状态主要是DhcpAgentWithStateReport初始化中指定了一个agent_rpc.PluginReportStateAPI(topics.PLUGIN)类作为状态汇报rpc消息的处理handler

 # 作为RPC的client端,topic类型为topics.PLUGIN,汇报RPC消息
 self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

这些代码会让_report_state()定期执行来汇报自身状态。

        report_interval = self.conf.AGENT.report_interval
        if report_interval:
            self.heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            self.heartbeat.start(interval=report_interval)

_report_state()具体实现方式

    def _report_state(self):
        try:
            self.agent_state.get('configurations').update(
                self.cache.get_state())
            ctx = context.get_admin_context_without_session()
            agent_status = self.state_rpc.report_state(
                ctx, self.agent_state, True)
            if agent_status == constants.AGENT_REVIVED:
                LOG.info(_LI("Agent has just been revived. "
                             "Scheduling full sync"))
                self.schedule_resync("Agent has just been revived")
        except AttributeError:
            # This means the server does not support report_state
            LOG.warn(_LW("Neutron server does not support state report."
                         " State report for this agent will be disabled."))
            self.heartbeat.stop()
            self.run()
            return
        except Exception:
            LOG.exception(_LE("Failed reporting state!"))
            return
        if self.agent_state.pop('start_flag', None):
            self.run()

results matching ""

    No results matching ""