📡Discovery注册中心
目前nmid只使用etcd为注册中心,client和worker的使用处需要使用相应discovery的方法
Client
client使用discovery,下面是一个http api服务列子
const SERVERHOST = "127.0.0.1"
const SERVERPORT = "6808"
var client *cli.Client
var err error
var consumer *cli.Consumer
func getClient() *cli.Client {
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
client, err := cli.NewClient("tcp", serverAddr).SetIoTimeOut(30 * time.Second).Start()
if nil == client || err != nil {
logger.Error(err)
}
return client
}
func discovery(funcName string) *cli.Client {
client := consumer.Discovery(funcName)
if client != nil {
client, err := client.SetIoTimeOut(30 * time.Second).Start()
if nil == client || err != nil {
logger.Error(err)
}
} else {
client = getClient()
}
return client
}
func Test(ctx *fasthttp.RequestCtx) {
funcName := "ToUpper"
client := discovery(funcName)
defer client.Close()
if nil == client {
fmt.Fprint(ctx, "nmid client error")
return
}
client.SetParamsType(model.PARAMS_TYPE_JSON)
client.ErrHandler = func(e error) {
if model.RESTIMEOUT == e {
logger.Warn("time out here")
} else {
logger.Error(e)
}
fmt.Fprint(ctx, e.Error())
}
respHandler := func(resp *cli.Response) {
if resp.DataType == model.PDT_S_RETURN_DATA && resp.RetLen != 0 {
if resp.RetLen == 0 {
logger.Info("ret empty")
return
}
var retStruct model.RetStruct
err := msgpack.Unmarshal(resp.Ret, &retStruct)
if nil != err {
log.Fatalln(err)
return
}
if retStruct.Code != 0 {
log.Println(retStruct.Msg)
return
}
fmt.Println(string(retStruct.Data))
fmt.Fprint(ctx, string(retStruct.Data))
}
}
paramsName := make(map[string]interface{})
paramsName["name"] = "niansong1"
//params, err := msgpack.Marshal(¶msName)
params1, err := json.Marshal(¶msName)
if err != nil {
logger.Fatal("params msgpack error:", err)
}
err = client.Do(funcName, params1, respHandler)
if nil != err {
logger.Error(`do err`, err)
}
}
func main() {
consumer = &cli.Consumer{
EtcdAddrs: discoverys,
Username: disUsername,
Password: disPassword,
}
consumer.EtcdCli = consumer.EtcdClient()
consumer.EtcdWatch()
router := fasthttprouter.New()
router.GET("/test", Test)
err := fasthttp.ListenAndServe(":5981", router.Handler)
fmt.Println(`err info:`, err)
}
Worker1
worker1注册到discovery
const NMIDSERVERHOST = "127.0.0.1"
const NMIDSERVERPORT = "6808"
var discoverys = []string{"localhost:2379"}
var disUsername = "root"
var disPassword = "123456"
func ToUpper(job wor.Job) ([]byte, error) {
resp := job.GetResponse()
if nil == resp {
return []byte(``), fmt.Errorf("response data error")
}
if len(resp.ParamsMap) > 0 {
name := resp.ParamsMap["name"].(string)
retStruct := model.GetRetStruct()
retStruct.Msg = "ok"
retStruct.Data = []byte(strings.ToUpper(name))
ret, err := msgpack.Marshal(retStruct)
if nil != err {
return []byte(``), err
}
resp.RetLen = uint32(len(ret))
resp.Ret = ret
return ret, nil
}
return nil, fmt.Errorf("response data error")
}
func main() {
wname := "Worker1"
var worker *wor.Worker
var err error
serverAddr := NMIDSERVERHOST + ":" + NMIDSERVERPORT
worker = wor.NewWorker().SetWorkerName(wname).WithTrace(SKYREPORTERURL)
err = worker.AddServer("tcp", serverAddr)
if err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
worker.AddFunction("ToUpper", ToUpper)
//worker注册到注册中心
worker.Register(wor.EtcdConfig{Addrs: discoverys, Username: disUsername, Password: disPassword})
if err = worker.WorkerReady(); err != nil {
log.Fatalln(err)
worker.WorkerClose()
return
}
go worker.WorkerDo()
quits := make(chan os.Signal, 1)
signal.Notify(quits, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT /*syscall.SIGUSR1*/)
switch <-quits {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
worker.WorkerClose()
}
}
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.
Last modified December 8, 2023: commit (f504b2b)