This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Welcome to nmid

欢迎使用nmid微服务框架

nmid经典架构

nmid architecture

nmid-discovery架构

nmid architecture

nmid作为K8S sidecar架构

nmid architecture

1 - 🚀开始

👋nmid介绍

nmid意思为中场指挥官,足球场上的中场就是统领进攻防守的核心。咱们这里是服务程序的调度核心。是一个轻量级分布式微服务RPC框架。

1.pkg/server目录为nmid微服务调度服务端go实现,采用协程以及管道的异步通信,带有连接池,自有I/O通信协议,msgpack做通信数据格式。

2.pkg/worker目录为nmid的工作端go实现,目前也有c语言实现,以及php扩展实现,可以实现golang, php, c等作为工作端,从而实现跨语言平台提供功能服务。

3.pkg/client目录为nmid的客户端go实现,目前也有c语言实现,以及php扩展实现,可以实现golang, php, c等作为客户端,从而实现跨语言平台调用功能服务。

4.example目录为demo运行目录。为go实现的客户端示例,调度服务端示例,客户端示例。目前调度服务端只有golang的实现。

5.C语言版本:https://github.com/HughNian/nmid-c

6.PHP扩展:https://github.com/HughNian/nmid-php-ext

7.支持http请求nmid服务

💪what can do

1.作为rpc微服务使用

2.作为http微服务使用

2.作为k8s微服务的sidecar使用

4.作为简单faas的函数运行时

📐建议配置

cat /proc/version
Linux version 3.10.0-957.21.3.el7.x86_64 ...(centos7)

go version
go1.18.5 linux/amd64

gcc --version
gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-36)

cmake --version
cmake version 3.11.4

🔨编译安装步骤

git clone https://github.com/HughNian/nmid.git

1.client
cd nmid/example/client/testclient
make

2.server
cd nmid/cmd/server
make

3.worker
cd nmid/example/worker/worker1
make

2 - 🌰例子

nmid的使用包含client和worker

Client

普通golang代码中运行

const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"

func main() {
	var client *cli.Client
	var err error

	serverAddr := SERVERHOST + ":" + SERVERPORT
	client, err = cli.NewClient("tcp", serverAddr).Start()
	if nil == client || err != nil {
		log.Println(err)
		return
	}
	defer client.Close()

	client.ErrHandler = func(e error) {
		if model.RESTIMEOUT == e {
			log.Println("time out here")
		} else {
			log.Println(e)
		}
		fmt.Println("client err here")
	}

	respHandler := func(resp *cli.Response) {
		if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
			if resp.RetLen == 0 {
				log.Println("ret empty")
				return
			}

			var retStruct model.RetStruct
			err := msgpack.Unmarshal(resp.Ret, &retStruct)
			if nil != err {
				log.Fatalln(err)
				return
			}

			if retStruct.Code != 0 {
				log.Println(retStruct.Msg)
				return
			}

			fmt.Println(string(retStruct.Data))
		}
	}

	paramsName1 := make(map[string]interface{})
	paramsName1["name"] = "nmid"
	params1, err := msgpack.Marshal(&paramsName1)
	if err != nil {
		log.Fatalln("params msgpack error:", err)
		os.Exit(1)
	}
	err = client.Do("ToUpper", params1, respHandler)
	if nil != err {
		fmt.Println(err)
	}
}

Worker

普通golang代码中运行

const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"

func ToUpper(job wor.Job) ([]byte, error) {
	resp := job.GetResponse()
	if nil == resp {
		return []byte(``), fmt.Errorf("response data error")
	}

	if len(resp.ParamsMap) > 0 {
		name := resp.ParamsMap["name"].(string)

		retStruct := model.GetRetStruct()
		retStruct.Msg = "ok"
		retStruct.Data = []byte(strings.ToUpper(name))
		ret, err := msgpack.Marshal(retStruct)
		if nil != err {
			return []byte(``), err
		}

		resp.RetLen = uint32(len(ret))
		resp.Ret = ret

		return ret, nil
	}

	return nil, fmt.Errorf("response data error")
}

func main() {
	wname := "Worker1"

	var worker *wor.Worker
	var err error

	serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
	worker = wor.NewWorker().SetWorkerName(wname)
	err = worker.AddServer("tcp", serverAddr)
	if err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	worker.AddFunction("ToUpper", ToUpper)
	//register to discovery server
	worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword})

	if err = worker.WorkerReady(); err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	go worker.WorkerDo()

	quits := make(chan os.Signal, 1)
	signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
	switch <-quits {
	case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
		worker.WorkerClose()
	}
}

3 - 🌙SkyWalking例子

client调用worker1,worker1调用worker2,调用层级2层,后面有多层调用以此类推,本示例展示多层调用,并使用 skywalking进行调用链的跟踪查看

Client

普通golang代码中运行

const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"

func main() {
	var client *cli.Client
	var err error

	serverAddr := SERVERHOST + ":" + SERVERPORT
	client, err = cli.NewClient("tcp", serverAddr).Start()
	if nil == client || err != nil {
		log.Println(err)
		return
	}
	defer client.Close()

	client.ErrHandler = func(e error) {
		if model.RESTIMEOUT == e {
			log.Println("time out here")
		} else {
			log.Println(e)
		}
		fmt.Println("client err here")
	}

	respHandler := func(resp *cli.Response) {
		if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
			if resp.RetLen == 0 {
				log.Println("ret empty")
				return
			}

			var retStruct model.RetStruct
			err := msgpack.Unmarshal(resp.Ret, &retStruct)
			if nil != err {
				log.Fatalln(err)
				return
			}

			if retStruct.Code != 0 {
				log.Println(retStruct.Msg)
				return
			}

			fmt.Println(string(retStruct.Data))
		}
	}

	paramsName1 := make(map[string]interface{})
	paramsName1["name"] = "nmid"
	params1, err := msgpack.Marshal(&paramsName1)
	if err != nil {
		log.Fatalln("params msgpack error:", err)
		os.Exit(1)
	}
	err = client.Do("ToUpper", params1, respHandler)
	if nil != err {
		fmt.Println(err)
	}
}

Worker1

worker1使用SkyWalking进行链路追踪

const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
const SKYREPORTERURL = "192.168.10.176:11800" //skywalking的grpc地址

func ToUpper(job wor.Job) ([]byte, error) {
	resp := job.GetResponse()
	if nil == resp {
		return []byte(``), fmt.Errorf("response data error")
	}

	if len(resp.ParamsMap) > 0 {
		name := resp.ParamsMap["name"].(string)

		retStruct := model.GetRetStruct()
		retStruct.Msg = "ok"
		retStruct.Data = []byte(strings.ToUpper(name))
		ret, err := msgpack.Marshal(retStruct)
		if nil != err {
			return []byte(``), err
		}

		resp.RetLen = uint32(len(ret))
		resp.Ret = ret

		return ret, nil
	}

	return nil, fmt.Errorf("response data error")
}

func main() {
	wname := "Worker1"

	var worker *wor.Worker
	var err error

	serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
	worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
	err = worker.AddServer("tcp", serverAddr)
	if err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	worker.AddFunction("ToUpper", ToUpper)

	if err = worker.WorkerReady(); err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	go worker.WorkerDo()

	quits := make(chan os.Signal, 1)
	signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
	switch <-quits {
	case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
		worker.WorkerClose()
	}
}

Worker2

worker2调用worker1,使用SkyWalking进行链路追踪

const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
const SKYREPORTERURL = "192.168.10.176:11800" //skywalking的grpc地址

func ToUpper2(job wor.Job) (ret []byte, err error) {
	resp := job.GetResponse()
	if nil == resp {
		return []byte(``), fmt.Errorf("response data error")
	}

	var name string
	if len(resp.ParamsMap) > 0 {
		name = resp.ParamsMap["name"].(string)
	}

	errHandler := func(e error) {
		if model.RESTIMEOUT == e {
			log.Println("time out here")
		} else {
			log.Println(e)
		}
	}

	respHandler := func(resp *cli.Response) {
		if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
			if resp.RetLen == 0 {
				log.Println("ret empty")
				err = errors.New("ret empty")
				return
			}

			var cretStruct model.RetStruct
			uerr := msgpack.Unmarshal(resp.Ret, &cretStruct)
			if nil != uerr {
				log.Fatalln(uerr)
				err = uerr
				return
			}

			if cretStruct.Code != 0 {
				log.Println(cretStruct.Msg)
				err = errors.New(cretStruct.Msg)
				return
			}
			fmt.Println(string(cretStruct.Data))

			wretStruct := model.GetRetStruct()
			wretStruct.Msg = "ok"
			wretStruct.Data = cretStruct.Data
			ret, err = msgpack.Marshal(wretStruct)

			resp.RetLen = uint32(len(ret))
			resp.Ret = ret
		}
	}

	callAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
	funcName := "ToUpper"
	paramsName1 := make(map[string]interface{})
	paramsName1["name"] = name
	job.ClientCall(callAddr, funcName, paramsName1, respHandler, errHandler)

	return
}

func main() {
	wname := "Worker2"

	var worker *wor.Worker
	var err error

	serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
	worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
	err = worker.AddServer("tcp", serverAddr)
	if err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	worker.AddFunction("ToUpper2", ToUpper2)

	if err = worker.WorkerReady(); err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	go worker.WorkerDo()

	quits := make(chan os.Signal, 1)
	signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
	switch <-quits {
	case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
		worker.WorkerClose()
	}
}

SkyWalking 后台展示

拓扑结构

nmid architecture

worker1链路

nmid architecture

worker2链路

nmid architecture

4 - 🔥Prometeus指标

1.nmid server内置prometheus监控服务,需要配置文件开启使用,默认metric数据为

1️⃣nmid func调用次数(qps)
2️⃣nmid func调用耗时(during)
3️⃣func总数量
4️⃣每个worker中func数量
5️⃣func调用成功次数
6️⃣func调用失败次数
7️⃣worker关闭次数

2.同时可以使用nmid中已经封装好的prometheus pkg包对你自身服务进行各项prometheus metric监控

config

在配置文件夹中的server.yaml最后加入以下内容,当然配置参数根据自身情况进行修改

Prometheus:
  enable: true     #是否开启
  host: "0.0.0.0"
  port: "9081"
  path: "/metrics"

Grafana展示

nmid dashboard

nmid architecture

func qps

nmid architecture

func during

nmid architecture

5 - 📡Discovery注册中心

目前nmid只使用etcd为注册中心,client和worker的使用处需要使用相应discovery的方法

Client

client使用discovery,下面是一个http api服务列子

const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"

var client *cli.Client
var err error
var consumer *cli.Consumer

func getClient() *cli.Client {
	serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
	client, err := cli.NewClient("tcp", serverAddr).SetIoTimeOut(30 * time.Second).Start()
	if nil == client || err != nil {
		logger.Error(err)
	}

	return client
}

func discovery(funcName string) *cli.Client {
	client := consumer.Discovery(funcName)
	if client != nil {
		client, err := client.SetIoTimeOut(30 * time.Second).Start()
		if nil == client || err != nil {
			logger.Error(err)
		}
	} else {
		client = getClient()
	}

	return client
}

func Test(ctx *fasthttp.RequestCtx) {
	funcName := "ToUpper"

	client := discovery(funcName)
	defer client.Close()

	if nil == client {
		fmt.Fprint(ctx, "nmid client error")
		return
	}

	client.SetParamsType(model.PARAMS_TYPE_JSON)

	client.ErrHandler = func(e error) {
		if model.RESTIMEOUT == e {
			logger.Warn("time out here")
		} else {
			logger.Error(e)
		}

		fmt.Fprint(ctx, e.Error())
	}

	respHandler := func(resp *cli.Response) {
		if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
			if resp.RetLen == 0 {
				logger.Info("ret empty")
				return
			}

			var retStruct model.RetStruct
			err := msgpack.Unmarshal(resp.Ret, &retStruct)
			if nil != err {
				log.Fatalln(err)
				return
			}

			if retStruct.Code != 0 {
				log.Println(retStruct.Msg)
				return
			}

			fmt.Println(string(retStruct.Data))

			fmt.Fprint(ctx, string(retStruct.Data))
		}
	}

	paramsName := make(map[string]interface{})
	paramsName["name"] = "niansong1"
	//params, err := msgpack.Marshal(&paramsName)
	params1, err := json.Marshal(&paramsName)
	if err != nil {
		logger.Fatal("params msgpack error:", err)
	}
	err = client.Do(funcName, params1, respHandler)
	if nil != err {
		logger.Error(`do err`, err)
	}
}

func main() {
	consumer = &cli.Consumer{
		EtcdAddrs: discoverys,
		Username:  disUsername,
		Password:  disPassword,
	}
	consumer.EtcdCli = consumer.EtcdClient()
	consumer.EtcdWatch()

	router := fasthttprouter.New()
	router.GET("/test", Test)
	err := fasthttp.ListenAndServe(":5981", router.Handler)
	fmt.Println(`err info:`, err)
}

Worker1

worker1注册到discovery

const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"

var discoverys = []string{"localhost:2379"}
var disUsername = "root"
var disPassword = "123456"

func ToUpper(job wor.Job) ([]byte, error) {
	resp := job.GetResponse()
	if nil == resp {
		return []byte(``), fmt.Errorf("response data error")
	}

	if len(resp.ParamsMap) > 0 {
		name := resp.ParamsMap["name"].(string)

		retStruct := model.GetRetStruct()
		retStruct.Msg = "ok"
		retStruct.Data = []byte(strings.ToUpper(name))
		ret, err := msgpack.Marshal(retStruct)
		if nil != err {
			return []byte(``), err
		}

		resp.RetLen = uint32(len(ret))
		resp.Ret = ret

		return ret, nil
	}

	return nil, fmt.Errorf("response data error")
}

func main() {
	wname := "Worker1"

	var worker *wor.Worker
	var err error

	serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
	worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
	err = worker.AddServer("tcp", serverAddr)
	if err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	worker.AddFunction("ToUpper", ToUpper)
	//worker注册到注册中心
	worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword})

	if err = worker.WorkerReady(); err != nil {
		log.Fatalln(err)
		worker.WorkerClose()
		return
	}

	go worker.WorkerDo()

	quits := make(chan os.Signal, 1)
	signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
	switch <-quits {
	case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
		worker.WorkerClose()
	}
}

6 - ☀部署使用

nmid的使用可以分为两类,一个是作为微服务的rpc框架使用,同时兼具调度和分发服务的作用,这是最主要的使用场景。 还有一个是可以结合k8s使用,作为k8s微服务容器的sidecar使用。

rpc使用

作为rpc使用,就是典型的cliet,nmid-server,worker三者的使用关系。当然可以中间加入注册中心进行使用。 具体的示例可以查看🌰例子📡Discovery注册中心

docker使用

从docker.io拉取镜像,开通rpc的6808端口和http的6809端口

docker run -d -p 7808:6808 -p 7809:6809 --name nmid nmid:56

在k8s作为sidecar使用

这里以npartword分词微服务为例

apiVersion: apps/v1
kind: Deployment
metadata:
  name: npw-deployment
  namespace: npw
  labels:
    app: npw
spec:
  replicas: 3 #根据情况定义副本数
  selector:
    matchLabels:
      app: npw
  template:
    metadata:
      labels:
        app: npw
    spec:
      containers:
        - name: nmid-sidecar #使用nmid作为npartword服务sidecar
          image: docker.io/hughnian/nmid:56 #这里的tag根据具体镜像tag取值
          command: ["/etc/service/run"]
          ports:
            - name: tcp
              containerPort: 6808
            - name: http
              containerPort: 6809
        - name: npartword
          image: docker.io/hughnian/npartword:3 #这里的tag根据具体镜像tag取值
          command: ["/etc/service/run"]
---
apiVersion: v1
kind: Service
metadata:
  name: npw-service
  namespace: npw
spec:
  selector:
    app: npw
  ports:
    - name: npw-tcp
      nodePort: 30688
      port: 6808
      protocol: TCP
      targetPort: 6808
    - name: npw-http
      nodePort: 30689
      port: 6809
      protocol: TCP
      targetPort: 6809
  type: NodePort
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    field.cattle.io/description: npw-service-http
  name: npw-service-http
  namespace: npw
spec:
  defaultBackend:
    service:
      name: npw-service
      port:
        number: 6809
  rules:
    - host: npw.mytest.com #使用自己相应域名
      http:
        paths:
          - backend:
              service:
                name: npw-service
                port:
                  number: 6809
            path: /
            pathType: Prefix