This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Welcome to nmid

欢迎使用nmid微服务框架

nmid经典架构

nmid architecture

nmid-discovery架构

nmid architecture

nmid作为K8S sidecar架构

nmid architecture

1 - 🚀开始

👋nmid介绍

nmid意思为中场指挥官,足球场上的中场就是统领进攻防守的核心。咱们这里是服务程序的调度核心。是一个轻量级分布式微服务RPC框架。

1.pkg/server目录为nmid微服务调度服务端go实现,采用协程以及管道的异步通信,带有连接池,自有I/O通信协议,msgpack做通信数据格式。

2.pkg/worker目录为nmid的工作端go实现,目前也有c语言实现,以及php扩展实现,可以实现golang, php, c等作为工作端,从而实现跨语言平台提供功能服务。

3.pkg/client目录为nmid的客户端go实现,目前也有c语言实现,以及php扩展实现,可以实现golang, php, c等作为客户端,从而实现跨语言平台调用功能服务。

4.example目录为demo运行目录。为go实现的客户端示例,调度服务端示例,客户端示例。目前调度服务端只有golang的实现。

5.C语言版本:https://github.com/HughNian/nmid-c

6.PHP扩展:https://github.com/HughNian/nmid-php-ext

7.支持http请求nmid服务

💪what can do

1.作为rpc微服务使用

2.作为http微服务使用

2.作为k8s微服务的sidecar使用

4.作为简单faas的函数运行时

📐建议配置

cat /proc/version Linux version 3.10.0-957.21.3.el7.x86_64 ...(centos7) go version go1.18.5 linux/amd64 gcc --version gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-36) cmake --version cmake version 3.11.4

🔨编译安装步骤

git clone https://github.com/HughNian/nmid.git 1.client cd nmid/example/client/testclient make 2.server cd nmid/cmd/server make 3.worker cd nmid/example/worker/worker1 make

2 - 🌰例子

nmid的使用包含client和worker

Client

普通golang代码中运行

const SERVERHOST = "127.0.0.1" const SERVERPORT = "6808" func main() { var client *cli.Client var err error serverAddr := SERVERHOST + ":" + SERVERPORT client, err = cli.NewClient("tcp", serverAddr).Start() if nil == client || err != nil { log.Println(err) return } defer client.Close() client.ErrHandler = func(e error) { if model.RESTIMEOUT == e { log.Println("time out here") } else { log.Println(e) } fmt.Println("client err here") } respHandler := func(resp *cli.Response) { if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 { if resp.RetLen == 0 { log.Println("ret empty") return } var retStruct model.RetStruct err := msgpack.Unmarshal(resp.Ret, &retStruct) if nil != err { log.Fatalln(err) return } if retStruct.Code != 0 { log.Println(retStruct.Msg) return } fmt.Println(string(retStruct.Data)) } } paramsName1 := make(map[string]interface{}) paramsName1["name"] = "nmid" params1, err := msgpack.Marshal(&paramsName1) if err != nil { log.Fatalln("params msgpack error:", err) os.Exit(1) } err = client.Do("ToUpper", params1, respHandler) if nil != err { fmt.Println(err) } }

Worker

普通golang代码中运行

const NMIDSERVERHOST = "127.0.0.1" const NMIDSERVERPORT = "6808" func ToUpper(job wor.Job) ([]byte, error) { resp := job.GetResponse() if nil == resp { return []byte(``), fmt.Errorf("response data error") } if len(resp.ParamsMap) > 0 { name := resp.ParamsMap["name"].(string) retStruct := model.GetRetStruct() retStruct.Msg = "ok" retStruct.Data = []byte(strings.ToUpper(name)) ret, err := msgpack.Marshal(retStruct) if nil != err { return []byte(``), err } resp.RetLen = uint32(len(ret)) resp.Ret = ret return ret, nil } return nil, fmt.Errorf("response data error") } func main() { wname := "Worker1" var worker *wor.Worker var err error serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT worker = wor.NewWorker().SetWorkerName(wname) err = worker.AddServer("tcp", serverAddr) if err != nil { log.Fatalln(err) worker.WorkerClose() return } worker.AddFunction("ToUpper", ToUpper) //register to discovery server worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword}) if err = worker.WorkerReady(); err != nil { log.Fatalln(err) worker.WorkerClose() return } go worker.WorkerDo() quits := make(chan os.Signal, 1) signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/) switch <-quits { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: worker.WorkerClose() } }

3 - 🌙SkyWalking例子

client调用worker1,worker1调用worker2,调用层级2层,后面有多层调用以此类推,本示例展示多层调用,并使用 skywalking进行调用链的跟踪查看

Client

普通golang代码中运行

const SERVERHOST = "127.0.0.1" const SERVERPORT = "6808" func main() { var client *cli.Client var err error serverAddr := SERVERHOST + ":" + SERVERPORT client, err = cli.NewClient("tcp", serverAddr).Start() if nil == client || err != nil { log.Println(err) return } defer client.Close() client.ErrHandler = func(e error) { if model.RESTIMEOUT == e { log.Println("time out here") } else { log.Println(e) } fmt.Println("client err here") } respHandler := func(resp *cli.Response) { if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 { if resp.RetLen == 0 { log.Println("ret empty") return } var retStruct model.RetStruct err := msgpack.Unmarshal(resp.Ret, &retStruct) if nil != err { log.Fatalln(err) return } if retStruct.Code != 0 { log.Println(retStruct.Msg) return } fmt.Println(string(retStruct.Data)) } } paramsName1 := make(map[string]interface{}) paramsName1["name"] = "nmid" params1, err := msgpack.Marshal(&paramsName1) if err != nil { log.Fatalln("params msgpack error:", err) os.Exit(1) } err = client.Do("ToUpper", params1, respHandler) if nil != err { fmt.Println(err) } }

Worker1

worker1使用SkyWalking进行链路追踪

const NMIDSERVERHOST = "127.0.0.1" const NMIDSERVERPORT = "6808" const SKYREPORTERURL = "192.168.10.176:11800" //skywalking的grpc地址 func ToUpper(job wor.Job) ([]byte, error) { resp := job.GetResponse() if nil == resp { return []byte(``), fmt.Errorf("response data error") } if len(resp.ParamsMap) > 0 { name := resp.ParamsMap["name"].(string) retStruct := model.GetRetStruct() retStruct.Msg = "ok" retStruct.Data = []byte(strings.ToUpper(name)) ret, err := msgpack.Marshal(retStruct) if nil != err { return []byte(``), err } resp.RetLen = uint32(len(ret)) resp.Ret = ret return ret, nil } return nil, fmt.Errorf("response data error") } func main() { wname := "Worker1" var worker *wor.Worker var err error serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL) err = worker.AddServer("tcp", serverAddr) if err != nil { log.Fatalln(err) worker.WorkerClose() return } worker.AddFunction("ToUpper", ToUpper) if err = worker.WorkerReady(); err != nil { log.Fatalln(err) worker.WorkerClose() return } go worker.WorkerDo() quits := make(chan os.Signal, 1) signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/) switch <-quits { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: worker.WorkerClose() } }

Worker2

worker2调用worker1,使用SkyWalking进行链路追踪

const NMIDSERVERHOST = "127.0.0.1" const NMIDSERVERPORT = "6808" const SKYREPORTERURL = "192.168.10.176:11800" //skywalking的grpc地址 func ToUpper2(job wor.Job) (ret []byte, err error) { resp := job.GetResponse() if nil == resp { return []byte(``), fmt.Errorf("response data error") } var name string if len(resp.ParamsMap) > 0 { name = resp.ParamsMap["name"].(string) } errHandler := func(e error) { if model.RESTIMEOUT == e { log.Println("time out here") } else { log.Println(e) } } respHandler := func(resp *cli.Response) { if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 { if resp.RetLen == 0 { log.Println("ret empty") err = errors.New("ret empty") return } var cretStruct model.RetStruct uerr := msgpack.Unmarshal(resp.Ret, &cretStruct) if nil != uerr { log.Fatalln(uerr) err = uerr return } if cretStruct.Code != 0 { log.Println(cretStruct.Msg) err = errors.New(cretStruct.Msg) return } fmt.Println(string(cretStruct.Data)) wretStruct := model.GetRetStruct() wretStruct.Msg = "ok" wretStruct.Data = cretStruct.Data ret, err = msgpack.Marshal(wretStruct) resp.RetLen = uint32(len(ret)) resp.Ret = ret } } callAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT funcName := "ToUpper" paramsName1 := make(map[string]interface{}) paramsName1["name"] = name job.ClientCall(callAddr, funcName, paramsName1, respHandler, errHandler) return } func main() { wname := "Worker2" var worker *wor.Worker var err error serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL) err = worker.AddServer("tcp", serverAddr) if err != nil { log.Fatalln(err) worker.WorkerClose() return } worker.AddFunction("ToUpper2", ToUpper2) if err = worker.WorkerReady(); err != nil { log.Fatalln(err) worker.WorkerClose() return } go worker.WorkerDo() quits := make(chan os.Signal, 1) signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/) switch <-quits { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: worker.WorkerClose() } }

SkyWalking 后台展示

拓扑结构

nmid architecture

worker1链路

nmid architecture

worker2链路

nmid architecture

4 - 🔥Prometeus指标

1.nmid server内置prometheus监控服务,需要配置文件开启使用,默认metric数据为

1️⃣nmid func调用次数(qps)
2️⃣nmid func调用耗时(during)
3️⃣func总数量
4️⃣每个worker中func数量
5️⃣func调用成功次数
6️⃣func调用失败次数
7️⃣worker关闭次数

2.同时可以使用nmid中已经封装好的prometheus pkg包对你自身服务进行各项prometheus metric监控

config

在配置文件夹中的server.yaml最后加入以下内容,当然配置参数根据自身情况进行修改

Prometheus: enable: true #是否开启 host: "0.0.0.0" port: "9081" path: "/metrics"

Grafana展示

nmid dashboard

nmid architecture

func qps

nmid architecture

func during

nmid architecture

5 - 📡Discovery注册中心

目前nmid只使用etcd为注册中心,client和worker的使用处需要使用相应discovery的方法

Client

client使用discovery,下面是一个http api服务列子

const SERVERHOST = "127.0.0.1" const SERVERPORT = "6808" var client *cli.Client var err error var consumer *cli.Consumer func getClient() *cli.Client { serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT client, err := cli.NewClient("tcp", serverAddr).SetIoTimeOut(30 * time.Second).Start() if nil == client || err != nil { logger.Error(err) } return client } func discovery(funcName string) *cli.Client { client := consumer.Discovery(funcName) if client != nil { client, err := client.SetIoTimeOut(30 * time.Second).Start() if nil == client || err != nil { logger.Error(err) } } else { client = getClient() } return client } func Test(ctx *fasthttp.RequestCtx) { funcName := "ToUpper" client := discovery(funcName) defer client.Close() if nil == client { fmt.Fprint(ctx, "nmid client error") return } client.SetParamsType(model.PARAMS_TYPE_JSON) client.ErrHandler = func(e error) { if model.RESTIMEOUT == e { logger.Warn("time out here") } else { logger.Error(e) } fmt.Fprint(ctx, e.Error()) } respHandler := func(resp *cli.Response) { if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 { if resp.RetLen == 0 { logger.Info("ret empty") return } var retStruct model.RetStruct err := msgpack.Unmarshal(resp.Ret, &retStruct) if nil != err { log.Fatalln(err) return } if retStruct.Code != 0 { log.Println(retStruct.Msg) return } fmt.Println(string(retStruct.Data)) fmt.Fprint(ctx, string(retStruct.Data)) } } paramsName := make(map[string]interface{}) paramsName["name"] = "niansong1" //params, err := msgpack.Marshal(&paramsName) params1, err := json.Marshal(&paramsName) if err != nil { logger.Fatal("params msgpack error:", err) } err = client.Do(funcName, params1, respHandler) if nil != err { logger.Error(`do err`, err) } } func main() { consumer = &cli.Consumer{ EtcdAddrs: discoverys, Username: disUsername, Password: disPassword, } consumer.EtcdCli = consumer.EtcdClient() consumer.EtcdWatch() router := fasthttprouter.New() router.GET("/test", Test) err := fasthttp.ListenAndServe(":5981", router.Handler) fmt.Println(`err info:`, err) }

Worker1

worker1注册到discovery

const NMIDSERVERHOST = "127.0.0.1" const NMIDSERVERPORT = "6808" var discoverys = []string{"localhost:2379"} var disUsername = "root" var disPassword = "123456" func ToUpper(job wor.Job) ([]byte, error) { resp := job.GetResponse() if nil == resp { return []byte(``), fmt.Errorf("response data error") } if len(resp.ParamsMap) > 0 { name := resp.ParamsMap["name"].(string) retStruct := model.GetRetStruct() retStruct.Msg = "ok" retStruct.Data = []byte(strings.ToUpper(name)) ret, err := msgpack.Marshal(retStruct) if nil != err { return []byte(``), err } resp.RetLen = uint32(len(ret)) resp.Ret = ret return ret, nil } return nil, fmt.Errorf("response data error") } func main() { wname := "Worker1" var worker *wor.Worker var err error serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL) err = worker.AddServer("tcp", serverAddr) if err != nil { log.Fatalln(err) worker.WorkerClose() return } worker.AddFunction("ToUpper", ToUpper) //worker注册到注册中心 worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword}) if err = worker.WorkerReady(); err != nil { log.Fatalln(err) worker.WorkerClose() return } go worker.WorkerDo() quits := make(chan os.Signal, 1) signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/) switch <-quits { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: worker.WorkerClose() } }

6 - ☀部署使用

nmid的使用可以分为两类,一个是作为微服务的rpc框架使用,同时兼具调度和分发服务的作用,这是最主要的使用场景。 还有一个是可以结合k8s使用,作为k8s微服务容器的sidecar使用。

rpc使用

作为rpc使用,就是典型的cliet,nmid-server,worker三者的使用关系。当然可以中间加入注册中心进行使用。 具体的示例可以查看🌰例子📡Discovery注册中心

docker使用

从docker.io拉取镜像,开通rpc的6808端口和http的6809端口

docker run -d -p 7808:6808 -p 7809:6809 --name nmid nmid:56

在k8s作为sidecar使用

这里以npartword分词微服务为例

apiVersion: apps/v1 kind: Deployment metadata: name: npw-deployment namespace: npw labels: app: npw spec: replicas: 3 #根据情况定义副本数 selector: matchLabels: app: npw template: metadata: labels: app: npw spec: containers: - name: nmid-sidecar #使用nmid作为npartword服务sidecar image: docker.io/hughnian/nmid:56 #这里的tag根据具体镜像tag取值 command: ["/etc/service/run"] ports: - name: tcp containerPort: 6808 - name: http containerPort: 6809 - name: npartword image: docker.io/hughnian/npartword:3 #这里的tag根据具体镜像tag取值 command: ["/etc/service/run"] --- apiVersion: v1 kind: Service metadata: name: npw-service namespace: npw spec: selector: app: npw ports: - name: npw-tcp nodePort: 30688 port: 6808 protocol: TCP targetPort: 6808 - name: npw-http nodePort: 30689 port: 6809 protocol: TCP targetPort: 6809 type: NodePort --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: annotations: field.cattle.io/description: npw-service-http name: npw-service-http namespace: npw spec: defaultBackend: service: name: npw-service port: number: 6809 rules: - host: npw.mytest.com #使用自己相应域名 http: paths: - backend: service: name: npw-service port: number: 6809 path: / pathType: Prefix