欢迎使用nmid微服务框架
This is the multi-page printable view of this section. Click here to print.
Welcome to nmid
- 1: 🚀开始
- 2: 🌰例子
- 3: 🌙SkyWalking例子
- 4: 🔥Prometeus指标
- 5: 📡Discovery注册中心
- 6: ☀部署使用
1 - 🚀开始
👋nmid介绍
nmid意思为中场指挥官,足球场上的中场就是统领进攻防守的核心。咱们这里是服务程序的调度核心。是一个轻量级分布式微服务RPC框架。
1.pkg/server目录为nmid微服务调度服务端go实现,采用协程以及管道的异步通信,带有连接池,自有I/O通信协议,msgpack做通信数据格式。
2.pkg/worker目录为nmid的工作端go实现,目前也有c语言实现,以及php扩展实现,可以实现golang, php, c等作为工作端,从而实现跨语言平台提供功能服务。
3.pkg/client目录为nmid的客户端go实现,目前也有c语言实现,以及php扩展实现,可以实现golang, php, c等作为客户端,从而实现跨语言平台调用功能服务。
4.example目录为demo运行目录。为go实现的客户端示例,调度服务端示例,客户端示例。目前调度服务端只有golang的实现。
5.C语言版本:https://github.com/HughNian/nmid-c
6.PHP扩展:https://github.com/HughNian/nmid-php-ext
7.支持http请求nmid服务
💪what can do
1.作为rpc微服务使用
2.作为http微服务使用
2.作为k8s微服务的sidecar使用
4.作为简单faas的函数运行时
📐建议配置
cat /proc/version
Linux version 3.10.0-957.21.3.el7.x86_64 ...(centos7)
go version
go1.18.5 linux/amd64
gcc --version
gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-36)
cmake --version
cmake version 3.11.4
🔨编译安装步骤
git clone https://github.com/HughNian/nmid.git
1.client
cd nmid/example/client/testclient
make
2.server
cd nmid/cmd/server
make
3.worker
cd nmid/example/worker/worker1
make
2 - 🌰例子
nmid的使用包含client和worker
Client
普通golang代码中运行
const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"
func main() {
var client *cli.Client
var err error
serverAddr := SERVERHOST + ":" + SERVERPORT
client, err = cli.NewClient("tcp", serverAddr).Start()
if nil == client || err != nil {
log.Println(err)
return
}
defer client.Close()
client.ErrHandler = func(e error) {
if model.RESTIMEOUT == e {
log.Println("time out here")
} else {
log.Println(e)
}
fmt.Println("client err here")
}
respHandler := func(resp *cli.Response) {
if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
if resp.RetLen == 0 {
log.Println("ret empty")
return
}
var retStruct model.RetStruct
err := msgpack.Unmarshal(resp.Ret, &retStruct)
if nil != err {
log.Fatalln(err)
return
}
if retStruct.Code != 0 {
log.Println(retStruct.Msg)
return
}
fmt.Println(string(retStruct.Data))
}
}
paramsName1 := make(map[string]interface{})
paramsName1["name"] = "nmid"
params1, err := msgpack.Marshal(¶msName1)
if err != nil {
log.Fatalln("params msgpack error:", err)
os.Exit(1)
}
err = client.Do("ToUpper", params1, respHandler)
if nil != err {
fmt.Println(err)
}
}
Worker
普通golang代码中运行
const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
func ToUpper(job wor.Job) ([]byte, error) {
resp := job.GetResponse()
if nil == resp {
return []byte(``), fmt.Errorf("response data error")
}
if len(resp.ParamsMap) > 0 {
name := resp.ParamsMap["name"].(string)
retStruct := model.GetRetStruct()
retStruct.Msg = "ok"
retStruct.Data = []byte(strings.ToUpper(name))
ret, err := msgpack.Marshal(retStruct)
if nil != err {
return []byte(``), err
}
resp.RetLen = uint32(len(ret))
resp.Ret = ret
return ret, nil
}
return nil, fmt.Errorf("response data error")
}
func main() {
wname := "Worker1"
var worker *wor.Worker
var err error
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
worker = wor.NewWorker().SetWorkerName(wname)
err = worker.AddServer("tcp", serverAddr)
if err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
worker.AddFunction("ToUpper", ToUpper)
//register to discovery server
worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword})
if err = worker.WorkerReady(); err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
go worker.WorkerDo()
quits := make(chan os.Signal, 1)
signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
switch <-quits {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
worker.WorkerClose()
}
}
3 - 🌙SkyWalking例子
client调用worker1,worker1调用worker2,调用层级2层,后面有多层调用以此类推,本示例展示多层调用,并使用 skywalking进行调用链的跟踪查看
Client
普通golang代码中运行
const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"
func main() {
var client *cli.Client
var err error
serverAddr := SERVERHOST + ":" + SERVERPORT
client, err = cli.NewClient("tcp", serverAddr).Start()
if nil == client || err != nil {
log.Println(err)
return
}
defer client.Close()
client.ErrHandler = func(e error) {
if model.RESTIMEOUT == e {
log.Println("time out here")
} else {
log.Println(e)
}
fmt.Println("client err here")
}
respHandler := func(resp *cli.Response) {
if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
if resp.RetLen == 0 {
log.Println("ret empty")
return
}
var retStruct model.RetStruct
err := msgpack.Unmarshal(resp.Ret, &retStruct)
if nil != err {
log.Fatalln(err)
return
}
if retStruct.Code != 0 {
log.Println(retStruct.Msg)
return
}
fmt.Println(string(retStruct.Data))
}
}
paramsName1 := make(map[string]interface{})
paramsName1["name"] = "nmid"
params1, err := msgpack.Marshal(¶msName1)
if err != nil {
log.Fatalln("params msgpack error:", err)
os.Exit(1)
}
err = client.Do("ToUpper", params1, respHandler)
if nil != err {
fmt.Println(err)
}
}
Worker1
worker1使用SkyWalking进行链路追踪
const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
const SKYREPORTERURL = "192.168.10.176:11800" //skywalking的grpc地址
func ToUpper(job wor.Job) ([]byte, error) {
resp := job.GetResponse()
if nil == resp {
return []byte(``), fmt.Errorf("response data error")
}
if len(resp.ParamsMap) > 0 {
name := resp.ParamsMap["name"].(string)
retStruct := model.GetRetStruct()
retStruct.Msg = "ok"
retStruct.Data = []byte(strings.ToUpper(name))
ret, err := msgpack.Marshal(retStruct)
if nil != err {
return []byte(``), err
}
resp.RetLen = uint32(len(ret))
resp.Ret = ret
return ret, nil
}
return nil, fmt.Errorf("response data error")
}
func main() {
wname := "Worker1"
var worker *wor.Worker
var err error
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
err = worker.AddServer("tcp", serverAddr)
if err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
worker.AddFunction("ToUpper", ToUpper)
if err = worker.WorkerReady(); err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
go worker.WorkerDo()
quits := make(chan os.Signal, 1)
signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
switch <-quits {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
worker.WorkerClose()
}
}
Worker2
worker2调用worker1,使用SkyWalking进行链路追踪
const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
const SKYREPORTERURL = "192.168.10.176:11800" //skywalking的grpc地址
func ToUpper2(job wor.Job) (ret []byte, err error) {
resp := job.GetResponse()
if nil == resp {
return []byte(``), fmt.Errorf("response data error")
}
var name string
if len(resp.ParamsMap) > 0 {
name = resp.ParamsMap["name"].(string)
}
errHandler := func(e error) {
if model.RESTIMEOUT == e {
log.Println("time out here")
} else {
log.Println(e)
}
}
respHandler := func(resp *cli.Response) {
if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
if resp.RetLen == 0 {
log.Println("ret empty")
err = errors.New("ret empty")
return
}
var cretStruct model.RetStruct
uerr := msgpack.Unmarshal(resp.Ret, &cretStruct)
if nil != uerr {
log.Fatalln(uerr)
err = uerr
return
}
if cretStruct.Code != 0 {
log.Println(cretStruct.Msg)
err = errors.New(cretStruct.Msg)
return
}
fmt.Println(string(cretStruct.Data))
wretStruct := model.GetRetStruct()
wretStruct.Msg = "ok"
wretStruct.Data = cretStruct.Data
ret, err = msgpack.Marshal(wretStruct)
resp.RetLen = uint32(len(ret))
resp.Ret = ret
}
}
callAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
funcName := "ToUpper"
paramsName1 := make(map[string]interface{})
paramsName1["name"] = name
job.ClientCall(callAddr, funcName, paramsName1, respHandler, errHandler)
return
}
func main() {
wname := "Worker2"
var worker *wor.Worker
var err error
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
err = worker.AddServer("tcp", serverAddr)
if err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
worker.AddFunction("ToUpper2", ToUpper2)
if err = worker.WorkerReady(); err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
go worker.WorkerDo()
quits := make(chan os.Signal, 1)
signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
switch <-quits {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
worker.WorkerClose()
}
}
SkyWalking 后台展示
拓扑结构
worker1链路
worker2链路
4 - 🔥Prometeus指标
1.nmid server内置prometheus监控服务,需要配置文件开启使用,默认metric数据为
1️⃣nmid func调用次数(qps)
2️⃣nmid func调用耗时(during)
3️⃣func总数量
4️⃣每个worker中func数量
5️⃣func调用成功次数
6️⃣func调用失败次数
7️⃣worker关闭次数
2.同时可以使用nmid中已经封装好的prometheus pkg包对你自身服务进行各项prometheus metric监控
config
在配置文件夹中的server.yaml最后加入以下内容,当然配置参数根据自身情况进行修改
Prometheus:
enable: true #是否开启
host: "0.0.0.0"
port: "9081"
path: "/metrics"
Grafana展示
nmid dashboard
func qps
func during
5 - 📡Discovery注册中心
目前nmid只使用etcd为注册中心,client和worker的使用处需要使用相应discovery的方法
Client
client使用discovery,下面是一个http api服务列子
const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"
var client *cli.Client
var err error
var consumer *cli.Consumer
func getClient() *cli.Client {
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
client, err := cli.NewClient("tcp", serverAddr).SetIoTimeOut(30 * time.Second).Start()
if nil == client || err != nil {
logger.Error(err)
}
return client
}
func discovery(funcName string) *cli.Client {
client := consumer.Discovery(funcName)
if client != nil {
client, err := client.SetIoTimeOut(30 * time.Second).Start()
if nil == client || err != nil {
logger.Error(err)
}
} else {
client = getClient()
}
return client
}
func Test(ctx *fasthttp.RequestCtx) {
funcName := "ToUpper"
client := discovery(funcName)
defer client.Close()
if nil == client {
fmt.Fprint(ctx, "nmid client error")
return
}
client.SetParamsType(model.PARAMS_TYPE_JSON)
client.ErrHandler = func(e error) {
if model.RESTIMEOUT == e {
logger.Warn("time out here")
} else {
logger.Error(e)
}
fmt.Fprint(ctx, e.Error())
}
respHandler := func(resp *cli.Response) {
if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
if resp.RetLen == 0 {
logger.Info("ret empty")
return
}
var retStruct model.RetStruct
err := msgpack.Unmarshal(resp.Ret, &retStruct)
if nil != err {
log.Fatalln(err)
return
}
if retStruct.Code != 0 {
log.Println(retStruct.Msg)
return
}
fmt.Println(string(retStruct.Data))
fmt.Fprint(ctx, string(retStruct.Data))
}
}
paramsName := make(map[string]interface{})
paramsName["name"] = "niansong1"
//params, err := msgpack.Marshal(¶msName)
params1, err := json.Marshal(¶msName)
if err != nil {
logger.Fatal("params msgpack error:", err)
}
err = client.Do(funcName, params1, respHandler)
if nil != err {
logger.Error(`do err`, err)
}
}
func main() {
consumer = &cli.Consumer{
EtcdAddrs: discoverys,
Username: disUsername,
Password: disPassword,
}
consumer.EtcdCli = consumer.EtcdClient()
consumer.EtcdWatch()
router := fasthttprouter.New()
router.GET("/test", Test)
err := fasthttp.ListenAndServe(":5981", router.Handler)
fmt.Println(`err info:`, err)
}
Worker1
worker1注册到discovery
const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
var discoverys = []string{"localhost:2379"}
var disUsername = "root"
var disPassword = "123456"
func ToUpper(job wor.Job) ([]byte, error) {
resp := job.GetResponse()
if nil == resp {
return []byte(``), fmt.Errorf("response data error")
}
if len(resp.ParamsMap) > 0 {
name := resp.ParamsMap["name"].(string)
retStruct := model.GetRetStruct()
retStruct.Msg = "ok"
retStruct.Data = []byte(strings.ToUpper(name))
ret, err := msgpack.Marshal(retStruct)
if nil != err {
return []byte(``), err
}
resp.RetLen = uint32(len(ret))
resp.Ret = ret
return ret, nil
}
return nil, fmt.Errorf("response data error")
}
func main() {
wname := "Worker1"
var worker *wor.Worker
var err error
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
err = worker.AddServer("tcp", serverAddr)
if err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
worker.AddFunction("ToUpper", ToUpper)
//worker注册到注册中心
worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword})
if err = worker.WorkerReady(); err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
go worker.WorkerDo()
quits := make(chan os.Signal, 1)
signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
switch <-quits {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
worker.WorkerClose()
}
}
6 - ☀部署使用
nmid的使用可以分为两类,一个是作为微服务的rpc框架使用,同时兼具调度和分发服务的作用,这是最主要的使用场景。 还有一个是可以结合k8s使用,作为k8s微服务容器的sidecar使用。
rpc使用
作为rpc使用,就是典型的cliet,nmid-server,worker三者的使用关系。当然可以中间加入注册中心进行使用。 具体的示例可以查看🌰例子和📡Discovery注册中心
docker使用
从docker.io拉取镜像,开通rpc的6808端口和http的6809端口
docker run -d -p 7808:6808 -p 7809:6809 --name nmid nmid:56
在k8s作为sidecar使用
这里以npartword分词微服务为例
apiVersion: apps/v1
kind: Deployment
metadata:
name: npw-deployment
namespace: npw
labels:
app: npw
spec:
replicas: 3 #根据情况定义副本数
selector:
matchLabels:
app: npw
template:
metadata:
labels:
app: npw
spec:
containers:
- name: nmid-sidecar #使用nmid作为npartword服务sidecar
image: docker.io/hughnian/nmid:56 #这里的tag根据具体镜像tag取值
command: ["/etc/service/run"]
ports:
- name: tcp
containerPort: 6808
- name: http
containerPort: 6809
- name: npartword
image: docker.io/hughnian/npartword:3 #这里的tag根据具体镜像tag取值
command: ["/etc/service/run"]
---
apiVersion: v1
kind: Service
metadata:
name: npw-service
namespace: npw
spec:
selector:
app: npw
ports:
- name: npw-tcp
nodePort: 30688
port: 6808
protocol: TCP
targetPort: 6808
- name: npw-http
nodePort: 30689
port: 6809
protocol: TCP
targetPort: 6809
type: NodePort
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
field.cattle.io/description: npw-service-http
name: npw-service-http
namespace: npw
spec:
defaultBackend:
service:
name: npw-service
port:
number: 6809
rules:
- host: npw.mytest.com #使用自己相应域名
http:
paths:
- backend:
service:
name: npw-service
port:
number: 6809
path: /
pathType: Prefix