OpenStack源码学习笔记2

上次学习了Nova创建虚拟机的过程,这次来看一下Glance是如何上传镜像的。相比于Nova,Glance源码使用了大量的代理模式和装饰器模式,阅读代码时候一个不仔细就会一脸懵X。根据上次说的Openstack套路,我们通过setup.cfg直奔主题——glance/cmd/api.py:

def main():
    try:
        config.parse_args()
        config.set_config_defaults()
        wsgi.set_eventlet_hub()
        logging.setup(CONF, 'glance')
        notifier.set_defaults()
        if cfg.CONF.profiler.enabled:
            _notifier = osprofiler.notifier.create("Messaging",
                                                   oslo_messaging, {},
                                                   notifier.get_transport(),
                                                   "glance", "api",
                                                   cfg.CONF.bind_host)
            osprofiler.notifier.set(_notifier)
            osprofiler.web.enable(cfg.CONF.profiler.hmac_keys)
        else:
            osprofiler.web.disable()
        server = wsgi.Server(initialize_glance_store=True)
        server.start(config.load_paste_app('glance-api'), default_port=9292)
        server.wait()
    except KNOWN_EXCEPTIONS as e:
        print(e)
        fail(e)

配置加载与路由绑定

和Nova一样,这个文件主要作用就是加载配置、创建WSGI Server并运行,这里我们注意一下initialize_glance_store=True这里,新版中关于存储的部分已经独立出项目叫做glance_store,这里还对这部分进行了初始化,我们跟进glance/common/wsgi.py中:


def initialize_glance_store():
    """Initialize glance store."""
    glance_store.register_opts(CONF)
    glance_store.create_stores(CONF)
    glance_store.verify_default_store()

class Server(object):
    ......

    def __init__(self, threads=1000, initialize_glance_store=False):
        ......    
        self.initialize_glance_store = initialize_glance_store
        ......

    def start(self, application, default_port):
        self.application = application
        self.default_port = default_port
        self.configure()
        self.start_wsgi()

    def configure(self, old_conf=None, has_changed=None):
        eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
        self.client_socket_timeout = CONF.client_socket_timeout or None
        self.configure_socket(old_conf, has_changed)
        if self.initialize_glance_store:
            initialize_glance_store()

这里我们跟进glance_store/backend.py中:

def create_stores(conf=CONF):
    store_count = 0

    for (store_entry, store_instance) in _load_stores(conf):
        try:
            schemes = store_instance.get_schemes()
            store_instance.configure(re_raise_bsc=False)
        except NotImplementedError:
            continue
        if not schemes:
            raise exceptions.BackendException('Unable to register store %s. '
                                              'No schemes associated with it.'
                                              % store_entry)
        else:
            LOG.debug("Registering store %s with schemes %s",
                      store_entry, schemes)
            scheme_map = {}
            loc_cls = store_instance.get_store_location_class()
            for scheme in schemes:
                scheme_map[scheme] = {
                    'store': store_instance,
                    'location_class': loc_cls,
                    'store_entry': store_entry
                }
            location.register_scheme_map(scheme_map)
            store_count += 1
    return store_count

在这里会根据/etc/glance/glance_api.conf中的配置信息找到对应的driver(位于glance_store/_drivers目录)并配置,然后调用register_scheme_map()进行绑定:

SCHEME_TO_CLS_MAP = {}
def register_scheme_map(scheme_map):
    for (k, v) in scheme_map.items():
        LOG.debug("Registering scheme %s with %s", k, v)
        SCHEME_TO_CLS_MAP[k] = v

这样就完成了所需的准备工作。

镜像上传

glance镜像上传分为2个步骤,首先在数据库中创建一条记录,并返回相关信息,此时使用glance image-list命令可以查看到一个空镜像,状态为queued。然后再上传镜像数据,上传完成后进入active状态,代码均来源于rocky版。

create

根据文档创建镜像是向/v2/images发送POST请求,然后再结合glance-api-paste.ini中的定义:

[pipeline:glance-api]
pipeline = cors healthcheck http_proxy_to_wsgi versionnegotiation osprofiler unauthenticated-context rootapp

[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/: apiversions
/v1: apiv1app
/v2: apiv2app

[app:apiversions]
paste.app_factory = glance.api.versions:create_resource

[app:apiv1app]
paste.app_factory = glance.api.v1.router:API.factory

[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory

根据glance/api/v2/router.py中的API()定义,找到实际处理post请求的函数为glance/api/v2/images.py中的ImagesController类的create方法:

def create(self, req, image, extra_properties, tags):
        image_factory = self.gateway.get_image_factory(req.context)
        image_repo = self.gateway.get_repo(req.context)
        try:
            image = image_factory.new_image(extra_properties=extra_properties,
                                            tags=tags, **image)
            image_repo.add(image)
        except (exception.DuplicateLocation,
                exception.Invalid) as e:
            raise webob.exc.HTTPBadRequest(explanation=e.msg)
        except (exception.ReservedProperty,
                exception.ReadonlyProperty) as e:
            raise webob.exc.HTTPForbidden(explanation=e.msg)
        except exception.Forbidden as e:
            LOG.debug("User not permitted to create image")
            raise webob.exc.HTTPForbidden(explanation=e.msg)
        except exception.LimitExceeded as e:
            LOG.warn(encodeutils.exception_to_unicode(e))
            raise webob.exc.HTTPRequestEntityTooLarge(
                explanation=e.msg, request=req, content_type='text/plain')
        except exception.Duplicate as e:
            raise webob.exc.HTTPConflict(explanation=e.msg)
        except exception.NotAuthenticated as e:
            raise webob.exc.HTTPUnauthorized(explanation=e.msg)
        except TypeError as e:
            LOG.debug(encodeutils.exception_to_unicode(e))
            raise webob.exc.HTTPBadRequest(explanation=e)
        return image

这段代码看上简单,实际内含玄机,如果跟进gateway.get_image_factorygateway.get_repo函数,会发现作者用了大量的装饰器模式和代理模式:

# glance/gateway.py

def get_image_factory(self, context):
    image_factory = glance.domain.ImageFactory()
    store_image_factory = glance.location.ImageFactoryProxy(
        image_factory, context, self.store_api, self.store_utils)
    quota_image_factory = glance.quota.ImageFactoryProxy(
        store_image_factory, context, self.db_api, self.store_utils)
    policy_image_factory = policy.ImageFactoryProxy(
        quota_image_factory, context, self.policy)
    notifier_image_factory = glance.notifier.ImageFactoryProxy(
        policy_image_factory, context, self.notifier)
    if property_utils.is_property_protection_enabled():
        property_rules = property_utils.PropertyRules(self.policy)
        pif = property_protections.ProtectedImageFactoryProxy(
            notifier_image_factory, context, property_rules)
        authorized_image_factory = authorization.ImageFactoryProxy(
            pif, context)
    else:
        authorized_image_factory = authorization.ImageFactoryProxy(
            notifier_image_factory, context)
    return authorized_image_factory

def get_repo(self, context):
    image_repo = glance.db.ImageRepo(context, self.db_api)
    store_image_repo = glance.location.ImageRepoProxy(
        image_repo, context, self.store_api, self.store_utils)
    quota_image_repo = glance.quota.ImageRepoProxy(
        store_image_repo, context, self.db_api, self.store_utils)
    policy_image_repo = policy.ImageRepoProxy(
        quota_image_repo, context, self.policy)
    notifier_image_repo = glance.notifier.ImageRepoProxy(
        policy_image_repo, context, self.notifier)
    if property_utils.is_property_protection_enabled():
        property_rules = property_utils.PropertyRules(self.policy)
        pir = property_protections.ProtectedImageRepoProxy(
            notifier_image_repo, context, property_rules)
        authorized_image_repo = authorization.ImageRepoProxy(
            pir, context)
    else:
        authorized_image_repo = authorization.ImageRepoProxy(
            notifier_image_repo, context)

    return authorized_image_repo

阅读这里的时候一个不仔细逻辑就断了,要像剥洋葱一样,一层一层剥开它的心。这里我就不记录追踪细节了,经过一层层判断后,image_factory.new_image函数最终进入glance/domain/__init__.py中并返回了一个Image类型实例。

然后将这个实例传递给image_repo.add函数,这个函数再经过一层层判断,进入glance/db/__init__.py中调用ImageRepoadd()方法,在这个方法中最终调用了glance/db/sqlalchemy/api.py中的image_create()函数来在数据库中创建新记录。

如果没有发生错误,则返回创建的空镜像信息给客户端。

upload

文档中定义,上传数据行为是向/v2/images/{image_id}/file发送PUT请求,实际处理函数为glance/api/v2/image_data.py中的ImageDataController类的upload方法:

    ......
    @utils.mutating
    def upload(self, req, image_id, data, size):
        backend = None
        image_repo = self.gateway.get_repo(req.context)
        image = None
        refresher = None
        cxt = req.context
        try:
            self.policy.enforce(cxt, 'upload_image', {})
            image = image_repo.get(image_id)
            image.status = 'saving'
            try:
                image_repo.save(image, from_state='queued')
                image.set_data(data, size, backend=backend)
                try:
                    image_repo.save(image, from_state='saving')
                except exception.NotAuthenticated:
                    if refresher is not None:
                        # request a new token to update an image in database
                        cxt.auth_token = refresher.refresh_token()
                        image_repo = self.gateway.get_repo(req.context)
                        image_repo.save(image, from_state='saving')
                    else:
                        raise
        ......

新版本中glance已经可以支持多后端的配置,但还不是稳定版。这里以单后端为例,首先将状态改为saving,然后调用set_data函数。由于某些不可描述的原因,我没法启动程序进行单步调试,只能使用IDE的跳转功能,结果这里兜兜转转饶了很久。这里我就直接给出答案吧,最终调用的是glance/location.pyImageProxy类中的方法。这里面的关键点在于image其实是由ImageProxy实例代理的,转换发生在初始化对象时候创建的Helper类,这个类有一个proxy方法用来将原始Image类型转换成ImageProxy类型。这里具体就不再详细说明了,回到set_data函数定义:

def set_data(self, data, size=None, backend=None):
    ......
    hashing_algo = CONF['hashing_algorithm']
    if CONF.enabled_backends:
        (location, size, checksum,
            multihash, loc_meta) = self.store_api.add_with_multihash(
            CONF,
            self.image.image_id,
            utils.LimitingReader(utils.CooperativeReader(data),
                                    CONF.image_size_cap),
            size,
            backend,
            hashing_algo,
            context=self.context,
            verifier=verifier)
    else:
        (location, size, checksum,
            multihash, loc_meta) = self.store_api.add_to_backend_with_multihash(
            CONF,
            self.image.image_id,
            utils.LimitingReader(utils.CooperativeReader(data),
                                    CONF.image_size_cap),
            size,
            hashing_algo,
            context=self.context,
            verifier=verifier)

    self.image.locations = [{'url': location, 'metadata': loc_meta, 'status': 'active'}]
    self.image.size = size
    self.image.checksum = checksum
    self.image.os_hash_value = multihash
    self.image.os_hash_algo = hashing_algo
    self.image.status = 'active'

这里的store_api默认就是glance_store了,其中add_with_multihash是启用多后端时候调用,add_to_backend_with_multihash启用单后端时候调用。这里以单后端为例:

def add_to_backend_with_multihash(conf, image_id, data, size, hashing_algo,
                                  scheme=None, context=None, verifier=None):
    if scheme is None:
        scheme = conf['glance_store']['default_store']
    store = get_store_from_scheme(scheme)
    return store_add_to_backend_with_multihash(
        image_id, data, size, hashing_algo, store, context, verifier)

其中get_store_from_scheme函数作用是获取到文章开头所说的绑定到SCHEME_TO_CLS_MAP中的对应的driver,然后经过store_add_to_backend_with_multihash函数进入相应的driveradd方法,这里以Ceph的块存储RBD(RADOS Block Device)为例,函数位于glance_store/_drivers/rbd.py:

    @driver.back_compat_add
    @capabilities.check
    def add(self, image_id, image_file, image_size, hashing_algo, context=None,
            verifier=None):
        checksum = hashlib.md5()
        os_hash_value = hashlib.new(str(hashing_algo))
        image_name = str(image_id)
        with self.get_connection(conffile=self.conf_file,
                                 rados_id=self.user) as conn:
            fsid = None
            if hasattr(conn, 'get_fsid'):
                fsid = encodeutils.safe_decode(conn.get_fsid())
            with conn.open_ioctx(self.pool) as ioctx:
                order = int(math.log(self.WRITE_CHUNKSIZE, 2))
                LOG.debug('creating image %s with order %d and size %d',
                          image_name, order, image_size)
                if image_size == 0:
                    LOG.warning(_("since image size is zero we will be doing "
                                  "resize-before-write for each chunk which "
                                  "will be considerably slower than normal"))

                try:
                    loc = self._create_image(fsid, conn, ioctx, image_name,
                                             image_size, order)
                except rbd.ImageExists:
                    msg = _('RBD image %s already exists') % image_id
                    raise exceptions.Duplicate(message=msg)

                try:
                    with rbd.Image(ioctx, image_name) as image:
                        bytes_written = 0
                        offset = 0
                        chunks = utils.chunkreadable(image_file,
                                                     self.WRITE_CHUNKSIZE)
                        for chunk in chunks:
                            # If the image size provided is zero we need to do
                            # a resize for the amount we are writing. This will
                            # be slower so setting a higher chunk size may
                            # speed things up a bit.
                            if image_size == 0:
                                chunk_length = len(chunk)
                                length = offset + chunk_length
                                bytes_written += chunk_length
                                LOG.debug(_("resizing image to %s KiB") %
                                          (length / units.Ki))
                                image.resize(length)
                            LOG.debug(_("writing chunk at offset %s") %
                                      (offset))
                            offset += image.write(chunk, offset)
                            os_hash_value.update(chunk)
                            checksum.update(chunk)
                            if verifier:
                                verifier.update(chunk)
                        if loc.snapshot:
                            image.create_snap(loc.snapshot)
                            image.protect_snap(loc.snapshot)
        ......
        if image_size == 0:
            image_size = bytes_written
        metadata = {}
        if self.backend_group:
            metadata['store'] = u"%s" % self.backend_group
        return (loc.get_uri(),
                image_size,
                checksum.hexdigest(),
                os_hash_value.hexdigest(),
                metadata)

这个函数首先计算hash值,然后创建连接,再判断镜像是否存在,不存在则上传,然后将数据返回给调用者。最后glance中修改状态为active,整个镜像上传过程就结束了。