如何给k8s服务做单元测试

时间 : 19-12-21 栏目 : 根目录 作者 : eekuang 评论 : 0 点击 : 6,780 次

由于工作原因,最近半年在使用Go语言写一些k8s相关的服务,这些服务有的需要往k8s集群写入Pod资源,有的需要监听Node、Pod和其它资源的变更。在早期的时候,我是使用了官方提供的用于与k8s通信的SDK是client-go来做实际代码实现,但在单元测试上是自己实现了一个Mock对象,来做测试。最近仔细研究了client-go的源码后发现,其实client-go官方已经实现了相关的mock对象,在目录 k8s.io/client-go/kubernetes/fake 下。接下来,我们就以实际例子来看下具体怎么使用官方提供的fake包做相关的单元测试。

1.添加k8s资源对象

本文以测试添加一个pod作为例子,向集群中写入一个名为test-pod的pod。首先,给出指定pod名和namespace,往k8s集群写入一个pod资源的代码。特别说明,为了简洁方法的介绍,Pod信息做了精简,在实际情况中需要填入更多的pod参数才能写入到k8s集群。

package k8s

import (

  v1 "k8s.io/api/core/v1"

  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

  "k8s.io/client-go/kubernetes"

)

//API for operate k8s

type API struct {

  Client kubernetes.Interface

}

// NewPodWithNs creates a new pod with namespace
func (k API) NewPodWithNs(name, namespace string) error {

  p := &v1.Pod{

​    ObjectMeta: metav1.ObjectMeta{

​      Name:      name,

​      Namespace: namespace,

​    },

  }

  _, err := k.Client.CoreV1().Pods(namespace).Create(p)
  if err != nil {

​    return err

  }
  return nil
}

在上述例子中,我们定义了一个名为API的对象,其有一个方法NewPodWithNs用于往k8s写入一个pod资源对象。我们就对这个方法进行单元测试。在对k8s应用进行单元测试时,我们一般可选择kubernetes.Interface或者kubernetes.Clientset来进行Mock测试。我选择的是kubernetes.Interface,这样的话就可以对整个k8s的API接口都mock掉,其fake包位置为k8s.io/client-go/kubernetes/fake。具体测试代码如下:

package k8s

import (
	"testing"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	testclient "k8s.io/client-go/kubernetes/fake"
)

const (
	testPod       = "test-pod"
	testNamespace = "test-ns"
)

func TestNewPodWithNs(t *testing.T) {
	cases := []struct {
		name string
		ns   string
	}{
		{
			name: testPod,
			ns:   testNamespace,
		},
	}

	api := &API{
		Client: testclient.NewSimpleClientset(),
	}

	for _, c := range cases {
		// create the postfixed namespace
		err := api.NewPodWithNs(c.name, c.ns)
		if err != nil {
			t.Fatal(err.Error())
		}

		if p, err := api.Client.CoreV1().Pods(c.ns).Get(c.name, metav1.GetOptions{}); nil != err {
			t.Errorf("get pod  err %v", err)
		} else if p.Name != c.name {
			t.Errorf("pod name err")
		}

	}
}

整个测试代码非常简单,使用官方的fake包生成一个client对象,然后往这个虚拟的k8s 集群的指定namespaces写入一个pod。通过Get接口进行获取,如果能获取到Pod则表示写入成功。通过短短几十行代码,就能验证我们内部的逻辑是否正常工作。如果你写入的Pod参数非常多的话,那测试用例也要更多才行。

这是一个简单的例子来验证写入Pod和查询Pod。删除和修改Pod及其它资源对象(namespace,node等)的增删改查也是类似的。如果你有相关的的需求,可以参照上述代码动手试试看。

2.监听资源对象的变更

我们以监听pod添加这一事件作为例子来示范如何做监听资源对象变更的单元测试。具体实现代码如下:

package k8s

import (
	v1 "k8s.io/api/core/v1"
)

//Cache for operate k8s
type Cache struct {
	Pods map[string]*v1.Pod
}

// AddPod add a pod
func (c *Cache) AddPod(p *v1.Pod) error {
	c.Pods[p.Name] = p
	return nil
}

上述代码非常简单,就是本地做一份cache,然后保存Pod信息。测试代码则会复杂很多,具体的测试代码如下:

package k8s

import (
	"context"
	"testing"
	"time"

	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes/fake"
	"k8s.io/client-go/tools/cache"
)

// TestAddPodEvent aim to test add pod event.
func TestAddPodEvent(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Create the fake client.
	client := fake.NewSimpleClientset()

	c := Cache{Pods: make(map[string]*v1.Pod)}
	// We will create an informer that writes added pods to a channel.
	pods := make(chan *v1.Pod, 1)
	informers := informers.NewSharedInformerFactory(client, 0)
	podInformer := informers.Core().V1().Pods().Informer()
	podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			pod := obj.(*v1.Pod)
			pods <- pod
			t.Logf("pod added: %s/%s", pod.Namespace, pod.Name)
			if err := c.AddPod(pod); nil != err {
				t.Errorf("add pod err %v", err)
			}
		},
	})

	// Make sure informers are running.
	informers.Start(ctx.Done())

	// This is not required in tests, but it serves as a proof-of-concept by
	// ensuring that the informer goroutine have warmed up and called List before
	// we send any events to it.
	cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)

	// Inject an event into the fake client.
	p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: TestPod}}
	_, err := client.CoreV1().Pods(TestNamespace).Create(p)
	if err != nil {
		t.Fatalf("error injecting pod add: %v", err)
	}

	select {
	case pod := <-pods:
		if _, ok := c.Pods[pod.Name]; !ok {
			t.Errorf("no pod after add event")
		}
	case <-time.After(wait.ForeverTestTimeout):
		t.Error("Informer did not get the added pod")
	}
}

上述例子仅对Pod的添加事件进行了监听,如果还需要监听更新和删除事件的话(一般的应用中都需要同时监听增删改事件),需要再注册两个方法到informer中,当然其实现也是类似的。同样,这种测试方式可以推广到Node,Namespace等其他方法。

3.再说两句

上述两种测试的方法不仅仅适用于k8s自有的资源对象,同样也适用于CRD。有兴趣的同学可以研究下通过代码工具生成的CRD代码,其中也包含了fake的的代码实现。如有任何问题,欢迎交流~

anyShare分享到:

除非注明,文章均为( eekuang )原创,转载请保留链接: http://www.14en.com/?p=181

如何给k8s服务做单元测试:等您坐沙发呢!

发表评论


-----===== 博主信息 =====-----
腾讯后台开发工程师
介绍:目前从事容器云相关开发工作,主要使用C++/go语言。


0