OpenStack源码学习笔记3

最近看了下关于OpenStack如何统计更新CPU、内存、硬盘等硬件资源的部分,由于历史原因下面的代码来自newton版。

简单说,OpenStack先通过定时任务进行资源统计,入口代码位于nova\compute\manager.pyComputeManager类的update_available_resource函数。默认情况下每分钟更新一次:


@periodic_task.periodic_task(spacing=CONF.update_resources_interval)
def update_available_resource(self, context):
    """See driver.get_available_resource()

    Periodic process that keeps that the compute host's understanding of
    resource availability and usage in sync with the underlying hypervisor.

    :param context: security context
    """

    compute_nodes_in_db = self._get_compute_nodes_in_db(context,
                                                        use_slave=True)
    nodenames = set(self.driver.get_available_nodes())
    for nodename in nodenames:
        self.update_available_resource_for_node(context, nodename)

    self._resource_tracker_dict = {
        k: v for k, v in self._resource_tracker_dict.items()
        if k in nodenames}

    # Delete orphan compute node not reported by driver but still in db
    for cn in compute_nodes_in_db:
        if cn.hypervisor_hostname not in nodenames:
            LOG.info(_LI("Deleting orphan compute node %s"), cn.id)
            cn.destroy()

首先获取所有节点,然后维护了一个名为_resource_tracker_dict的字典用来记录host和ResourceTracker实例的对应关系,所有的资源更新行为都在ResourceTracker中进行处理。

在字典中找到对应的ResourceTracker实例后调用其update_available_resource函数,位于nova\compute\resource_tracker.py

def update_available_resource(self, context):
    """Override in-memory calculations of compute node resource usage based
    on data audited from the hypervisor layer.

    Add in resource claims in progress to account for operations that have
    declared a need for resources, but not necessarily retrieved them from
    the hypervisor layer yet.
    """
    LOG.info(_LI("Auditing locally available compute resources for "
                    "node %(node)s"),
                {'node': self.nodename})
    resources = self.driver.get_available_resource(self.nodename)
    resources['host_ip'] = CONF.my_ip
    # We want the 'cpu_info' to be None from the POV of the
    # virt driver, but the DB requires it to be non-null so
    # just force it to empty string
    if "cpu_info" not in resources or resources["cpu_info"] is None:
        resources["cpu_info"] = ''
    self._verify_resources(resources)
    self._report_hypervisor_resource_view(resources)
    self._update_available_resource(context, resources)

使用不同的dirver获取资源的方法不一样,以使用libvirt为例,相关代码位于nova\virt\libvirt\driver.py中的get_available_resource函数:

def get_available_resource(self, nodename):
    ...
    data = {}
    data["memory_mb"] = self._host.get_memory_mb_total()
    data["memory_mb_used"] = self._host.get_memory_mb_used()
    ....
    return data

这里我以获取内存为例,获取内存信息分为2步,首先调用libvirt库获取总内存,代码位于nova\virt\libvirt\host.py,这里我直接给出相关验证代码:

import libvirt
conn = libvirt.open("qemu:///system")
c = conn.getInfo()[1]

这段代码就是get_memory_mb_total()的底层实现。测试的话注意使用root用户否则会报权限错误。

获取使用内存代码如下:

def get_memory_mb_used(self):
    """Get the used memory size(MB) of physical computer.

    :returns: the total usage of memory(MB).
    """
    if sys.platform.upper() not in ['LINUX2', 'LINUX3']:
        return 0

    with open('/proc/meminfo') as fp:
        m = fp.read().split()
    idx1 = m.index('MemFree:')
    idx2 = m.index('Buffers:')
    idx3 = m.index('Cached:')
    if CONF.libvirt.virt_type == 'xen':
        used = 0
        for guest in self.list_guests(only_guests=False):
            try:
                # TODO(sahid): Use get_info...
                dom_mem = int(guest._get_domain_info(self)[2])
            except libvirt.libvirtError as e:
                LOG.warning(_LW("couldn't obtain the memory from domain:"
                                " %(uuid)s, exception: %(ex)s"),
                            {"uuid": guest.uuid, "ex": e})
                continue
            # skip dom0
            if guest.id != 0:
                used += dom_mem
            else:
                # the mem reported by dom0 is be greater of what
                # it is being used
                used += (dom_mem -
                            (int(m[idx1 + 1]) +
                            int(m[idx2 + 1]) +
                            int(m[idx3 + 1])))
        # Convert it to MB
        return used // units.Ki
    else:
        avail = (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1]))
        # Convert it to MB
        return self.get_memory_mb_total() - avail // units.Ki

可以看出其实是读取的/proc/meminfo

然后更新行为位于_update_available_resource函数中:

@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def _update_available_resource(self, context, resources):

    # initialize the compute node object, creating it
    # if it does not already exist.
    self._init_compute_node(context, resources)

    # if we could not init the compute node the tracker will be
    # disabled and we should quit now
    if self.disabled:
        return

    # Grab all instances assigned to this node:
    instances = objects.InstanceList.get_by_host_and_node(
        context, self.host, self.nodename,
        expected_attrs=['system_metadata',
                        'numa_topology',
                        'flavor', 'migration_context'])

    # Now calculate usage based on instance utilization:
    self._update_usage_from_instances(context, instances)

    # Grab all in-progress migrations:
    migrations = objects.MigrationList.get_in_progress_by_host_and_node(
            context, self.host, self.nodename)

    self._pair_instances_to_migrations(migrations, instances)
    self._update_usage_from_migrations(context, migrations)

    # Detect and account for orphaned instances that may exist on the
    # hypervisor, but are not in the DB:
    orphans = self._find_orphaned_instances()
    self._update_usage_from_orphans(orphans)

    # NOTE(yjiang5): Because pci device tracker status is not cleared in
    # this periodic task, and also because the resource tracker is not
    # notified when instances are deleted, we need remove all usages
    # from deleted instances.
    self.pci_tracker.clean_usage(instances, migrations, orphans)
    dev_pools_obj = self.pci_tracker.stats.to_device_pools_obj()
    self.compute_node.pci_device_pools = dev_pools_obj

    self._report_final_resource_view()

    metrics = self._get_host_metrics(context, self.nodename)
    # TODO(pmurray): metrics should not be a json string in ComputeNode,
    # but it is. This should be changed in ComputeNode
    self.compute_node.metrics = jsonutils.dumps(metrics)

    # update the compute_node
    self._update(context)
    LOG.info(_LI('Compute_service record updated for %(host)s:%(node)s'),
                    {'host': self.host, 'node': self.nodename})

首先调用_init_compute_node()函数初始化,这里首先从数据库中读取数据,然后用刚刚获取到的最新数据进行覆盖,再进行下面的计算。

这里可以分为3类,一种是运行中的实例,一种是迁移中的实例,一种是孤儿实例(这个目前我还没见到过)。这里以第一种为例,跟进_update_usage_from_instances函数:

def _update_usage_from_instances(self, context, instances):
    """Calculate resource usage based on instance utilization.  This is
    different than the hypervisor's view as it will account for all
    instances assigned to the local compute host, even if they are not
    currently powered on.
    """
    self.tracked_instances.clear()

    # set some initial values, reserve room for host/hypervisor:
    self.compute_node.local_gb_used = CONF.reserved_host_disk_mb / 1024
    self.compute_node.memory_mb_used = CONF.reserved_host_memory_mb
    self.compute_node.vcpus_used = 0
    self.compute_node.free_ram_mb = (self.compute_node.memory_mb -
                                        self.compute_node.memory_mb_used)
    self.compute_node.free_disk_gb = (self.compute_node.local_gb -
                                        self.compute_node.local_gb_used)
    self.compute_node.current_workload = 0
    self.compute_node.running_vms = 0

    for instance in instances:
        if instance.vm_state not in vm_states.ALLOW_RESOURCE_REMOVAL:
            self._update_usage_from_instance(context, instance)

    self.scheduler_client.reportclient.remove_deleted_instances(
            self.compute_node, self.tracked_instances.values())

这里有个小细节,关于内存只是使用了调用libvirt获取到的总内存,至于使用内存都是每次重新计算的。

首先减去配置文件中设定的保留内存和保留硬盘,如果状态不是’deleted’或者’shelved_offloaded’开始对实例进行循环计算:


def _update_usage_from_instance(self, context, instance, is_removed=False):
    """Update usage for a single instance."""
    uuid = instance['uuid']
    is_new_instance = uuid not in self.tracked_instances
    # NOTE(sfinucan): Both brand new instances as well as instances that
    # are being unshelved will have is_new_instance == True
    is_removed_instance = not is_new_instance and (is_removed or
        instance['vm_state'] in vm_states.ALLOW_RESOURCE_REMOVAL)
    if is_new_instance:
        self.tracked_instances[uuid] = obj_base.obj_to_primitive(instance)
        sign = 1
    if is_removed_instance:
        self.tracked_instances.pop(uuid)
        sign = -1
    self.stats.update_stats_for_instance(instance, is_removed_instance)
    self.compute_node.stats = copy.deepcopy(self.stats)
    # if it's a new or deleted instance:
    if is_new_instance or is_removed_instance:
        if self.pci_tracker:
            self.pci_tracker.update_pci_for_instance(context,
                                                        instance,
                                                        sign=sign)
        self.scheduler_client.reportclient.update_instance_allocation(
            self.compute_node, instance, sign)
        # new instance, update compute node resource usage:
        self._update_usage(self._get_usage_dict(instance), sign=sign)

    self.compute_node.current_workload = self.stats.calculate_workload()
    if self.pci_tracker:
        obj = self.pci_tracker.stats.to_device_pools_obj()
        self.compute_node.pci_device_pools = obj
    else:
        self.compute_node.pci_device_pools = objects.PciDevicePoolList()

由于很多请求都是异步的,所以在函数中又判断了一次主机是否删除,如果删除了的话sign则为负值。

然后调用_update_usage函数进行最终计算,这里通过sign参数来控制加减:

def _update_usage(self, usage, sign=1):
    mem_usage = usage['memory_mb']
    disk_usage = usage.get('root_gb', 0)

    overhead = self.driver.estimate_instance_overhead(usage)
    mem_usage += overhead['memory_mb']
    disk_usage += overhead.get('disk_gb', 0)

    self.compute_node.memory_mb_used += sign * mem_usage
    self.compute_node.local_gb_used += sign * disk_usage
    self.compute_node.local_gb_used += sign * usage.get('ephemeral_gb', 0)
    self.compute_node.vcpus_used += sign * usage.get('vcpus', 0)

    # free ram and disk may be negative, depending on policy:
    self.compute_node.free_ram_mb = (self.compute_node.memory_mb -
                                        self.compute_node.memory_mb_used)
    self.compute_node.free_disk_gb = (self.compute_node.local_gb -
                                        self.compute_node.local_gb_used)

    self.compute_node.running_vms = self.stats.num_instances

    # Calculate the numa usage
    free = sign == -1
    updated_numa_topology = hardware.get_host_numa_usage_from_instance(
            self.compute_node, usage, free)
    self.compute_node.numa_topology = updated_numa_topology

这里的mem_usage其实是创建主机时指定的flavor的值,关于estimate_instance_overhead函数貌似只有xen和hyperv驱动才有实际用途,对于libvirt返回的就是0,有兴趣的小伙伴可以深入研究下。

然后再经过第二步、第三步处理后将结果存入数据库中。

这里有一点需要注意,如果配置了超分参数,创建迁移主机时并不是直接使用上述计算结果的,而是要乘以配置的超分参数,相关代码位于nova\scheduler\filters\ram_filter.py

def host_passes(self, host_state, spec_obj):
    """Only return hosts with sufficient available RAM."""
    requested_ram = spec_obj.memory_mb
    free_ram_mb = host_state.free_ram_mb
    total_usable_ram_mb = host_state.total_usable_ram_mb
    # Do not allow an instance to overcommit against itself, only against
    # other instances.
    if not total_usable_ram_mb >= requested_ram:
        LOG.debug("%(host_state)s does not have %(requested_ram)s MB "
                    "usable ram before overcommit, it only has "
                    "%(usable_ram)s MB.",
                    {'host_state': host_state,
                    'requested_ram': requested_ram,
                    'usable_ram': total_usable_ram_mb})
        return False
    ram_allocation_ratio = self._get_ram_allocation_ratio(host_state,
                                                            spec_obj)
    memory_mb_limit = total_usable_ram_mb * ram_allocation_ratio
    used_ram_mb = total_usable_ram_mb - free_ram_mb
    usable_ram = memory_mb_limit - used_ram_mb
    if not usable_ram >= requested_ram:
        LOG.debug("%(host_state)s does not have %(requested_ram)s MB "
                "usable ram, it only has %(usable_ram)s MB usable ram.",
                {'host_state': host_state,
                    'requested_ram': requested_ram,
                    'usable_ram': usable_ram})
        return False
    # save oversubscription limit for compute node to test against:
    host_state.limits['memory_mb'] = memory_mb_limit
    return True

这也是为什么有时候看free_ram_mbfree_disk_gb为负的原因。