1、go Http Post 发送文件流
package mainimport ("net/http""net/url""fmt""io/ioutil"_ "io""bytes")func main() {postFile()}func post() {//这是一个Post 参数会被返回的地址strinUrl:="http://localhost:8080/aaa"`这里写代码片`resopne,err:= http.PostForm(strinUrl,url.Values{"num":{"456"},"num1":{"123"}})if err !=nil {fmt.Println("err=",err)}defer func() {resopne.Body.Close()fmt.Println("finish")}()body,err:=ioutil.ReadAll(resopne.Body)if err!=nil {fmt.Println(" post err=",err)}fmt.Println(string(body))}func postFile(){//这是一个Post 参数会被返回的地址strinUrl:="http://localhost:8080/aaa"byte,err:=ioutil.ReadFile("post.txt")resopne,err :=http.Post(strinUrl,"multipart/form-data",bytes.NewReader(byte))if err !=nil {fmt.Println("err=",err)}defer func() {resopne.Body.Close()fmt.Println("finish")}()body,err:=ioutil.ReadAll(resopne.Body)if err!=nil {fmt.Println(" post err=",err)}fmt.Println(string(body))}
二、sync.Map
package mainimport ("fmt""sync")type Class struct {Students sync.Map}func handler(key, value interface{}) bool {fmt.Printf("Name :%s %s\n", key, value)return true}func main() {class := &Class{}//存储值class.Students.Store("Zhao", "class 1")class.Students.Store("Qian", "class 2")class.Students.Store("Sun", "class 3")//遍历,传入一个函数,遍历的时候函数返回false则停止遍历class.Students.Range(handler)//查询if _, ok := class.Students.Load("Li"); !ok {fmt.Println("-->Li not found")}//查询或者追加_, loaded := class.Students.LoadOrStore("Li", "class 4")if loaded {fmt.Println("-->Load Li success")} else {fmt.Println("-->Store Li success")}//删除class.Students.Delete("Sun")fmt.Println("-->Delete Sun success")//遍历class.Students.Range(handler)}
三、路由器
package mainimport ("fmt""net/http")type MyMux struct {}func (p *MyMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {if r.URL.Path == "/" {sayhelloName(w, r)return}http.NotFound(w, r)return}func sayhelloName(w http.ResponseWriter, r *http.Request) {fmt.Fprintf(w, "Hello myroute!")}func main() {mux := &MyMux{}http.ListenAndServe(":9090", mux)}
四、解析yaml文件
package mainimport ("fmt""log""gopkg.in/yaml.v2")var data = `blog: xiaorui.ccbest_authors: ["fengyun","lee","park"]desc:counter: 521plist: [3, 4]`type T struct {Blog stringAuthors []string `yaml:"best_authors,flow"`Desc struct {Counter int `yaml:"Counter"`Plist []int `yaml:",flow"`}}func main() {t := T{}//把yaml形式的字符串解析成struct类型err := yaml.Unmarshal([]byte(data), &t)//修改struct里面的记录t.Blog = "this is Blog"t.Authors = append(t.Authors, "myself")t.Desc.Counter = 99fmt.Printf("--- t:\n%v\n\n", t)//转换成yaml字符串类型d, err := yaml.Marshal(&t)if err != nil {log.Fatalf("error: %v", err)}fmt.Printf("--- t dump:\n%s\n\n", string(d))}
五、mq
package mainimport ("flag""log""strconv""time""github.com/nats-io/nats.go""github.com/pborman/uuid")const (//url = "nats://192.168.3.125:4222"url = nats.DefaultURL)var (nc *nats.Connerr error)func init() {// if nc, err = nats.Connect(url); checkErr(err) {// fmt.// }}func main() {}//发送消息func sendMessage() {var (subj = flag.String("subj", "", "subject name"))flag.Parse()log.Println(*subj)startClient(*subj)time.Sleep(time.Second)}func startClient(subj string) {for i := 0; i < 1; i++ {id := uuid.New()log.Println(id)nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i)))nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))}}//接收消息func receiveMessage() {var (servername = flag.String("servername", "y", "name for server")queueGroup = flag.String("group", "", "group name for Subscribe")subj = flag.String("subj", "", "subject name"))flag.Parse()log.Println(*servername, *queueGroup, *subj)startService(*subj, *servername+" worker1", *queueGroup)startService(*subj, *servername+" worker2", *queueGroup)startService(*subj, *servername+" worker3", *queueGroup)select {}}func startService(subj, name, queue string) {go async(nc, subj, name, queue)}func async(nc *nats.Conn, subj, name, queue string) {nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {log.Println(name, "Received a message From Async : ", string(msg.Data))})}func checkErr(err error) bool {if err != nil {log.Println(err)return false}return true}
package mainimport ("fmt""log""github.com/streadway/amqp")func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)panic(fmt.Sprintf("%s: %s", msg, err))}}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()failOnError(err, "Failed to declare a queue")err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, //global)failOnError(err, "Failed to set Qos")//发送消息funcSend(ch)//收消息funcRecv(ch)}//生产者func funcSend(ch *amqp.Channel) {q, err := ch.QueueDeclare("queue", //namefalse, //durablesfalse, //delete when unusedfalse, //exclusivefalse, //no waitnil, //args)failOnError(err, "Failed to declare a queue")body := "Hello World!"error := ch.Publish("", // exchangeq.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})log.Printf("[x] Sent %s", body)failOnError(error, "Failed to publish a message")}//消费者func funcRecv(ch *amqp.Channel) {q, err := ch.QueueDeclare("queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}
六、map
package mainimport "fmt"import "sync"import "time"func main(){var mTest sync.Mapgo SyncTest(mTest)go SyncTest(mTest)time.Sleep(time.Second * 20)}func SyncTest(mTest sync.Map){for j := 0; j < 1000; j++{fmt.Println(mTest.Load("A"))//Load 读取mTest.Store("A", "a") //Store 存储fmt.Println(mTest.Load("A"))mTest.Delete("A") //Delete 删除fmt.Println(mTest.Load("A"))mTest.LoadOrStore("B","b") //LoadOrStore 读取失败则存储fmt.Println(mTest.Load("B"))}}func funcMap() {// 先声明mapvar m1 map[string]string// 再使用make函数创建一个非nil的map,nil map不能赋值m1 = make(map[string]string)// 最后给已声明的map赋值m1["a"] = "aa"m1["b"] = "bb"// 直接创建m2 := make(map[string]string)// 然后赋值m2["a"] = "aa"m2["b"] = "bb"// 初始化 + 赋值一体化m3 := map[string]string{"a": "aa","b": "bb",}fmt.Println(m3)// ==========================================// 查找键值是否存在if v, ok := m1["a"]; ok {fmt.Println(v)} else {fmt.Println("Key Not Found")}// 遍历mapfor k, v := range m1 {fmt.Println(k, v)}}
七、多路复用
package mainimport ("fmt""net/http")// 设置多个处理器函数func handler1(w http.ResponseWriter, r *http.Request) {fmt.Fprintf(w, "1 欢迎访问 www.ydook.com !")}func handler2(w http.ResponseWriter, r *http.Request) {fmt.Fprintf(w, "2 欢迎访问 www.ydook.com !")}func handler3(w http.ResponseWriter, r *http.Request) {fmt.Fprintf(w, "3 欢迎访问 www.ydook.com !")}func main() {// 设置多路复用处理函数mux := http.NewServeMux()mux.HandleFunc("/h1", handler1)mux.HandleFunc("/h2", handler2)mux.HandleFunc("/h3", handler3)// 设置服务器server := &http.Server{Addr: "127.0.0.1:8000",Handler: mux,}// 设置服务器监听请求端口server.ListenAndServe()}
八、for循环
package mainimport ("fmt""time")func main() {str := []string{"I","am","Sergey"}for _,v := range str{go func() {fmt.Println(v)}()}time.Sleep(3 * time.Second)}
package mainimport ("fmt""time")func main() {str := []string{"I","am","Sergey"}for _,v := range str{go func(v string) {fmt.Println(v)}(v)}time.Sleep(3 * time.Second)}
package mainimport "fmt"func main() {slice := []int{0, 1, 2, 3}myMap := make(map[int]*int)for index , value := range slice {myMap[index] = &value}prtMap(myMap)}func prtMap(myMap map[int]*int) {for key, value := range myMap {fmt.Printf("map[%v]=%v\n", key, *value)}}
package mainimport "fmt"func main() {slice := []int{0, 1, 2, 3}myMap := make(map[int]*int)for index , value := range slice {v := valuemyMap[index] = &v}prtMap(myMap)}func prtMap(myMap map[int]*int) {for key, value := range myMap {fmt.Printf("map[%v]=%v\n", key, *value)}}
九、Stringer接口
package mainimport "fmt"type Person struct {Name stringAge int}func (p Person) String() string {return fmt.Sprintf("%v (%v years)", p.Name, p.Age)}func main() {a := Person{"Arthur Dent", 42}z := Person{"Zaphod Beeblebrox", 9001}fmt.Println(a, z)}
十、抽象类实现
type IPeople interface {GetName() stringSetName(string)GetAge() intSetAge(int)Run()}type AbstractPeople struct {IPeoplename stringage int}func (a AbstractPeople) GetName() string {return a.name}func (a *AbstractPeople) SetName(newName string) {a.name = newName}func (a AbstractPeople) GetAge() int {return a.age}func (a *AbstractPeople) SetAge(newAge int) {a.age = newAge}// user.func UsePeople(p IPeople) {fmt.Println(p.GetName())fmt.Println(p.GetAge())p.SetName("Alice")p.SetAge(p.GetAge() + 1)p.Run()}
十一、接口
package mainimport ("fmt")// 定义一个数据写入器type DataWriter interface {WriteData(data interface{}) error}// 定义文件结构,用于实现DataWritertype file struct {}// 实现DataWriter接口的WriteData方法func (d *file) WriteData(data interface{}) error {// 模拟写入数据fmt.Println("WriteData:", data)return nil}func main() {// 实例化filef := new(file)// 声明一个DataWriter的接口var writer DataWriter// 将接口赋值f,也就是*file类型writer = f// 使用DataWriter接口进行数据写入writer.WriteData("data")}
十二、多线程
package mainimport ("runtime""sync")var (cpunum = runtime.NumCPU()-1)func main(){ch := make(chan string)runtime.GOMAXPROCS(cpunum)wg := sync.WaitGroup{}for i := 0; i < cpunum; i++{go WgReadLogs(ch, &wg)}wg.Add(2)ch <- "./health/stat1.rec"ch <- "./report/stat2.rec"wg.Wait()close(ch)}func WgReadLogs(ch chan string,wg *sync.WaitGroup){for true{tmp,ok := <-chif !ok{break}ReadLogs(tmp)wg.Done()}}func ReadLogs(logname string){fmt.Println(logname)}
十三、websocket
package mainimport ("bytes""fmt""log""net/http""unsafe""golang.org/x/net/websocket")func BytePtrToString(p *byte) string {bufs := bytes.NewBufferString("")for ps := p; *ps != byte(0); ps = (*byte)(unsafe.Pointer(1 + (uintptr)(unsafe.Pointer(ps)))) {bufs.WriteByte(*ps)}return bufs.String()}func main() {fmt.Println("websocket地址: ws://127.0.0.1:8080/runtime")http.Handle("/runtime", websocket.Handler(echo))if err := http.ListenAndServe(":8080", nil); err != nil {log.Fatal(err)}}//测试地址: http://coolaf.com/tool/chattest 首字母大写是公有的 Echo,首字母小写是私有的 echofunc echo(w *websocket.Conn) {var error errorfor {//只支持string类型var reply stringif error = websocket.Message.Receive(w, &reply); error != nil {log.Println("websocket出现异常", error)break}fmt.Println("收到客户端消息:" + reply)msg := reply + ", 我是服务端"fmt.Println("发送客户端消息:" + msg)if error = websocket.Message.Send(w, msg); error != nil {log.Println("websocket出现异常", error)break}}}
十四、协程池
package mainimport ("fmt""time")// 任务的属性应该是一个业务函数type Task struct {f func() error // 函数名f, 无参,返回值为error}// 创建Task任务func NewTask(arg_f func() error) *Task {task := Task{f: arg_f,}return &task}// Task绑定业务方法func (task *Task) Execute() {task.f() // 调用任务中已经绑定好的业务方法}// ------------------------------------------------type Pool struct {EntryChannel chan *Task // 对外的Task入口JobsChannel chan *Task // 内部的Task队列workerNum int // 协程池中最大的woker数量}// 创建Poolfunc NewPool(cap int) *Pool {pool := Pool{EntryChannel: make(chan *Task),JobsChannel: make(chan *Task),workerNum: cap,}return &pool}// Pool绑定干活的方法func (pool *Pool) worker(workID int) {// worker工作 : 永久从JobsChannel取任务 然后执行任务for task := range pool.JobsChannel {task.Execute()fmt.Println("work ID ", workID, " has executed")}}// Pool绑定协程池工作方法func (pool *Pool) run() {// 定义worker数量for i := 0; i < pool.workerNum; i++ {go pool.worker(i)}// 从EntryChannel去任务,发送给JobsChannelfor task := range pool.EntryChannel {pool.JobsChannel <- task // 添加task优先级排序逻辑}}// ------------------------------------------------func main() {// 创建一些任务task := NewTask(func() error { // 匿名函数fmt.Println(time.Now())return nil})// 创建协程池pool := NewPool(4)// 创建多任务,抛给协程池go func() { // 开启新的协程,防止阻塞for {pool.EntryChannel <- task}}()// 启动协程池pool.run()}
package mainimport ("fmt""sync""time")// 每个协程都会运行该函数。// 注意,WaitGroup 必须通过指针传递给函数。func worker(id int, wg *sync.WaitGroup) {fmt.Printf("Worker %d starting\n", id)// 睡眠一秒钟,以此来模拟耗时的任务。time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)// 通知 WaitGroup ,当前协程的工作已经完成。wg.Done()}func main() {// 这个 WaitGroup 被用于等待该函数开启的所有协程。var wg sync.WaitGroup// 开启几个协程,并为其递增 WaitGroup 的计数器。for i := 1; i <= 5; i++ {wg.Add(1)go worker(i, &wg)}// 阻塞,直到 WaitGroup 计数器恢复为 0,即所有协程的工作都已经完成。wg.Wait()}
package mainimport ("fmt""time")// 这里是 worker,我们将并发执行多个 worker。// worker 将从 `jobs` 通道接收任务,并且通过 `results` 发送对应的结果。// 我们将让每个任务间隔 1s 来模仿一个耗时的任务。func worker(id int, jobs <-chan int, results chan<- int) {for j := range jobs {fmt.Println("worker", id, "processing job", j)time.Sleep(time.Second)results <- j * 2}}func main() {// 为了使用 worker 线程池并且收集他们的结果,我们需要 2 个通道。jobs := make(chan int, 100)results := make(chan int, 100)// 这里启动了 3 个 worker,初始是阻塞的,因为还没有传递任务。for w := 1; w <= 3; w++ {go worker(w, jobs, results)}// 这里我们发送 9 个 `jobs`,然后 `close` 这些通道// 来表示这些就是所有的任务了。for j := 1; j <= 9; j++ {jobs <- j}close(jobs)// 最后,我们收集所有这些任务的返回值。for a := 1; a <= 9; a++ {<-results}}
package mainimport ("context""fmt""strconv""sync/atomic"pool "github.com/jolestar/go-commons-pool")func Example_simple() {type myPoolObject struct {s string}v := uint64(0)factory := pool.NewPooledObjectFactorySimple(func(context.Context) (interface{}, error) {return &myPoolObject{s: strconv.FormatUint(atomic.AddUint64(&v, 1), 10),},nil})ctx := context.Background()p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)obj, err := p.BorrowObject(ctx)if err != nil {panic(err)}o := obj.(*myPoolObject)fmt.Println(o.s)err = p.ReturnObject(ctx, obj)if err != nil {panic(err)}// Output: 1}
package mainimport ("io/ioutil""net/http""runtime""github.com/Jeffail/tunny")func main() {numCPUs := runtime.NumCPU()pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {var result []byte// TODO: Something CPU heavy with payloadreturn result})defer pool.Close()http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {input, err := ioutil.ReadAll(r.Body)if err != nil {http.Error(w, "Internal error", http.StatusInternalServerError)}defer r.Body.Close()// Funnel this work into our pool. This call is synchronous and will// block until the job is completed.result := pool.Process(input)w.Write(result.([]byte))})http.ListenAndServe(":8080", nil)}
package mainimport ("errors""fmt""log""sync""sync/atomic""time")var (// ErrInvalidPoolCap return if pool size <= 0ErrInvalidPoolCap = errors.New("invalid pool cap")// ErrPoolAlreadyClosed put task but pool already closedErrPoolAlreadyClosed = errors.New("pool already closed"))const (// RUNNING pool is runningRUNNING = 1// STOPED pool is stopedSTOPED = 0)// Task task to-dotype Task struct {Handler func(v ...interface{})Params []interface{}}// Pool task pooltype Pool struct {capacity uint64runningWorkers uint64state int64taskC chan *TaskPanicHandler func(interface{})sync.Mutex}// NewPool init poolfunc NewPool(capacity uint64) (*Pool, error) {if capacity <= 0 {return nil, ErrInvalidPoolCap}return &Pool{capacity: capacity,state: RUNNING,taskC: make(chan *Task, capacity),}, nil}// GetCap get capacityfunc (p *Pool) GetCap() uint64 {return p.capacity}// GetRunningWorkers get running workersfunc (p *Pool) GetRunningWorkers() uint64 {return atomic.LoadUint64(&p.runningWorkers)}func (p *Pool) incRunning() {atomic.AddUint64(&p.runningWorkers, 1)}func (p *Pool) decRunning() {atomic.AddUint64(&p.runningWorkers, ^uint64(0))}// Put put a task to poolfunc (p *Pool) Put(task *Task) error {if p.getState() == STOPED {return ErrPoolAlreadyClosed}// safe run workerp.Lock()if p.GetRunningWorkers() < p.GetCap() {p.run()}p.Unlock()// send task safep.Lock()if p.state == RUNNING {p.taskC <- task}p.Unlock()return nil}func (p *Pool) run() {p.incRunning()go func() {defer func() {p.decRunning()if r := recover(); r != nil {if p.PanicHandler != nil {p.PanicHandler(r)} else {log.Printf("Worker panic: %s\n", r)}}}()for {select {case task, ok := <-p.taskC:if !ok {return}task.Handler(task.Params...)}}}()}func (p *Pool) getState() int64 {p.Lock()defer p.Unlock()return p.state}func (p *Pool) setState(state int64) {p.Lock()defer p.Unlock()p.state = state}// close safefunc (p *Pool) close() {p.Lock()defer p.Unlock()close(p.taskC)}// Close close pool gracefulfunc (p *Pool) Close() {if p.getState() == STOPED {return}p.setState(STOPED) // stop put taskfor len(p.taskC) > 0 { // wait all task be consumedtime.Sleep(1e6) // reduce CPU load}p.close()}//https://github.com/wazsmwazsm/mortarfunc main() {// 创建容量为 10 的任务池pool, err := NewPool(10)if err != nil {panic(err)}wg := new(sync.WaitGroup)for i := 0; i < 1000; i++ {wg.Add(1)// 创建任务task := &Task{Handler: func(v ...interface{}) {wg.Done()fmt.Println(v)},}// 添加任务函数的参数task.Params = []interface{}{i, i * 2, "hello"}// 将任务放入任务池pool.Put(task)}wg.Add(1)// 再创建一个任务pool.Put(&Task{Handler: func(v ...interface{}) {wg.Done()fmt.Println(v)},Params: []interface{}{"hi!"}, // 也可以在创建任务时设置参数})wg.Wait()// 安全关闭任务池(保证已加入池中的任务被消费完)pool.Close()// 如果任务池已经关闭, Put() 方法会返回 ErrPoolAlreadyClosed 错误err = pool.Put(&Task{Handler: func(v ...interface{}) {},})if err != nil {fmt.Println(err) // print: pool already closed}}
十五、go调用C语言
注意:
rpc(远程函数调用)提供client端通过网络调用远程server端的函数的服务。
rpc-server端需要提供较高的吞吐能力,支持较大的并发连接。
epoll监听多个连接fd,实现IO复用
1)epoll没有最大并发连接的限制,上限是最大可以打开文件的数目,一般远大于2048<br /> 2)epoll效率高,只管活跃的连接,而跟连接总数无关<br /> 3)使用共享内存,省去了内存拷贝
生产者&消费者模式对请求任务进行管理
一个IO-thread线程维护连接池中的连接,连接有3种状态:NO_USED、READY、BUSY。<br /> 1)NO_USED状态,fd位置空闲,连接位尚未使用<br /> 2)READY状态,已经与client端连接连接的fd,等待请求事件<br /> 3)BUSY状态,已经收到请求事件,将fd放入任务队列,等待work-thread处理
```go package main
/*
include
include
void say(){ printf(“hello world\n”); } */ import “C”
import ( “fmt” “net/http” “runtime” “syscall” “time”
"github.com/gin-gonic/gin"
)
func main() { runtime.LockOSThread() router := gin.Default() router.GET(“/ping”, func(c gin.Context) { fmt.Println(“worker”, syscall.Getegid()) c.JSON(200, gin.H{ “message”: “pong”, }) }) s := &http.Server{ Addr: “:18282”, Handler: router, ReadTimeout: 600 time.Second, WriteTimeout: 600 * time.Second, MaxHeaderBytes: 1 << 20, } fmt.Println(“listen:”, syscall.Getegid()) s.ListenAndServe() fmt.Println(“main”, C.say()) }
func testMain() { runtime.GOMAXPROCS(runtime.NumCPU()) ch1 := make(chan bool) ch2 := make(chan bool) fmt.Println(“main”, C.say()) go func() { runtime.LockOSThread() fmt.Println(“locked”, C.say()) go func() { fmt.Println(“locked child”, C.say()) ch1 <- true }() ch2 <- true }() <-ch1 <-ch2 }
<a name="AlQ0C"></a># 十六、go RPC```gopackage mainimport ("context""log""net""os""time""google.golang.org/grpc"pb "google.golang.org/grpc/examples/helloworld/helloworld""google.golang.org/grpc/reflection")func main() {println(f(1))}const (port = ":50051"address = "localhost:50051"defaultName = "world")type server struct{} //服务对象// SayHello 实现服务的接口 在proto中定义的所有服务都是接口func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {return &pb.HelloReply{Message: "Hello " + in.Name}, nil}func serverMain() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer() //起一个服务pb.RegisterGreeterServer(s, &server{})// 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系reflection.Register(s)if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func clientMain() {//建立链接conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)// Contact the server and print out its response.name := defaultNameif len(os.Args) > 1 {name = os.Args[1]}// 1秒的上下文ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.Message)}func f(x int) (_, __ int) {_, __ = x, xreturn}
十七、go调用tol物联网
package mainimport ("fmt""sync""time"mqtt "github.com/eclipse/paho.mqtt.golang")const broker = "tcp://你的服务器.mqtt.iot.gz.baidubce.com:1883"const username = "你的服务器/client"const password = "UJq**密码**msq"const ClientID = "随意"//message的回调var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload())}var wg sync.WaitGroupvar client mqtt.Clientfunc main() {//连接MQTT服务器mqttConnect()defer client.Disconnect(250) //注册销毁wg.Add(1)go mqttSubScribe("test1")wg.Add(1)go testPublish()wg.Wait()}func mqttConnect() {//配置clinetOptions := mqtt.NewClientOptions().AddBroker(broker).SetUsername(username).SetPassword(password)clinetOptions.SetClientID(ClientID)clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)//连接client = mqtt.NewClient(clinetOptions)//客户端连接判断if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {panic(token.Error())}}func mqttSubScribe(topic string) {defer wg.Done()for {token := client.Subscribe(topic, 1, onMessage)token.Wait()}}//测试 3秒发送一次,然后自己接收func testPublish() {defer wg.Done()for {client.Publish("test1", 1, false, "TEST")time.Sleep(time.Duration(3) * time.Second)}}
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""log""time"mqtt "github.com/eclipse/paho.mqtt.golang")var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {fmt.Println("Connected")}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("Connect lost: %v", err)}//https://cloud.emqx.cn/console/deployments/0?oper=new// ClientOptions:用于设置 broker,端口,客户端 id ,用户名密码等选项// messagePubHandler:全局 MQTT pub 消息处理// connectHandler:连接的回调// connectLostHandler:连接丢失的回调func main() {var broker = "broker.emqx.io"var port = 1883opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))opts.SetClientID("go_mqtt_client")opts.SetUsername("emqx")opts.SetPassword("public")opts.SetDefaultPublishHandler(messagePubHandler)opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandlerclient := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}sub(client)publish(client)client.Disconnect(250)}//发布消息func publish(client mqtt.Client) {num := 10for i := 0; i < num; i++ {text := fmt.Sprintf("Message %d", i)token := client.Publish("topic/test", 0, false, text)token.Wait()time.Sleep(time.Second)}}//订阅func sub(client mqtt.Client) {topic := "topic/test"token := client.Subscribe(topic, 1, nil)token.Wait()fmt.Printf("Subscribed to topic: %s", topic)}//如果想使用 TLS 连接,可以如下设置:func NewTlsConfig() *tls.Config {certpool := x509.NewCertPool()ca, err := ioutil.ReadFile("ca.pem")if err != nil {log.Fatalln(err.Error())}certpool.AppendCertsFromPEM(ca)// Import client certificate/key pairclientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")if err != nil {panic(err)}return &tls.Config{RootCAs: certpool,ClientAuth: tls.NoClientCert,ClientCAs: nil,InsecureSkipVerify: true,Certificates: []tls.Certificate{clientKeyPair},}}//如果不设置客户端证书,可以如下设置:func NewTlsConfigs() *tls.Config {certpool := x509.NewCertPool()ca, err := ioutil.ReadFile("ca.pem")if err != nil {log.Fatalln(err.Error())}certpool.AppendCertsFromPEM(ca)return &tls.Config{RootCAs: certpool,ClientAuth: tls.NoClientCert,ClientCAs: nil,InsecureSkipVerify: true,}}
十八、乐观锁 参考:https://github.com/crossoverJie/gorm-optimistic
Quick start
func BenchmarkUpdateWithOptimistic(b *testing.B) {dsn := "root:abc123@/test?charset=utf8&parseTime=True&loc=Local"db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})if err != nil {fmt.Println(err)return}b.RunParallel(func(pb *testing.PB) {var out Optimisticdb.First(&out, Optimistic{Id: 1})out.Amount = out.Amount + 10err = UpdateWithOptimistic(db, &out, func(model Lock) Lock {bizModel := model.(*Optimistic)bizModel.Amount = bizModel.Amount + 10return bizModel}, 5, 0)if err != nil {fmt.Println(err)}})}
Model
type Optimistic struct {Id int64 `gorm:"column:id;primary_key;AUTO_INCREMENT" json:"id"`UserId string `gorm:"column:user_id;default:0;NOT NULL" json:"user_id"` // 用户IDAmount float32 `gorm:"column:amount;NOT NULL" json:"amount"` // 金额Version int64 `gorm:"column:version;default:0;NOT NULL" json:"version"` // 版本}func (o *Optimistic) TableName() string {return "t_optimistic"}func (o *Optimistic) GetVersion() int64 {return o.Version}func (o *Optimistic) SetVersion(version int64) {o.Version = version}
十九、并发条件下的有序输出
package mainimport ("fmt""sync""time")func main() {q := make(chan int)wg := new(sync.WaitGroup)wg.Add(1)go func(wg *sync.WaitGroup, q chan<- int) {defer wg.Done()defer close(q)for i := 1; i <= 100; i++ {q <- itime.Sleep(time.Microsecond) // 利用休眠实现对生产者的控制,但这种方案不可靠}}(wg, q)for i := 0; i < 3; i++ {wg.Add(1)go func(wg *sync.WaitGroup, q <-chan int) {defer wg.Done()for {if v, ok := <-q; ok {fmt.Println(v)} else {return}}}(wg, q)}wg.Wait()}// 利用WaitGroup实现安全退出,使生产者休眠来进行控制,但是如果消费者的操作是耗时是不可靠谱的,就会变得不可靠
package mainimport ("fmt""time")func print(ch <-chan int){for{fmt.Println(<-ch)}}func main() {ch0 :=make(chan int)ch1 :=make(chan int)ch2 :=make(chan int)go print(ch0)go print(ch1)go print(ch2)for i:=1; i<101; i++{switch{case i%3 == 0:ch0 <- icase i%3 == 1:ch1 <- icase i%3 == 2:ch2 <-i}time.Sleep(time.Microsecond)}}// 利用for循环实现安全退出,同样是使生产者休眠来进行控制,但是如果消费者的操作是耗时是不可靠谱的,就会变得不可靠
package mainimport ("fmt""sync""time")func main() {q := make(chan int,100)// 填充管道for i := 1; i <= 100; i++ {q<-i}cond := sync.NewCond(&sync.Mutex{})for i := 0; i < 3; i++ {go func(c *sync.Cond, q <-chan int) {for {c.L.Lock()c.Wait()if v, ok := <-q; ok {fmt.Println(v)c.L.Unlock()} else {c.L.Unlock()return}}}(cond, q)}time.Sleep(time.Second) // 使所有的goroutine进入等待状态for i := 1; i <= 100; i++ {cond.Signal() // 唤醒一个等待状态的goroutinetime.Sleep(time.Microsecond)}close(q)time.Sleep(time.Second)}// 利用cond是所有goroutine进入阻塞状态直到被唤醒来实现有序输出,cond和其他锁不一样,临界资源被锁之后goroutine不会进行轮询,而是等待调度,减少资源开销
package mainimport ("fmt""sync""time")func main() {q := make(chan int)wg := new(sync.Mutex)go func( q chan<- int) {defer close(q)for i := 1; i <= 100; i++ {q <- i}}(q)for i := 0; i < 3; i++ {go func(wg *sync.Mutex, q <-chan int) {for {wg.Lock()if v, ok := <-q; ok {fmt.Println(v)wg.Unlock()} else {wg.Unlock()return}}}(wg, q)}time.Sleep(time.Second) // 安全退出}// 使用互斥锁阻塞其他协程来实现抢占,使用休眠实现安全退出,这种方案是可靠的,但最好能够使用WaitGroup来实现安全退出
package mainimport ("fmt""time")func main() {a := 0fn := func() int {a++return a}ch := make(chan int, 1)defer close(ch)for i := 0; i < 3; i++ {go func() {for {ch <- 1n := fn()if n > 100 {break}fmt.Println(n)<-ch}}()}time.Sleep(time.Second) // 安全退出}// 使用管道来塞其他协程来实现抢占,使用休眠实现安全退出,这种方案是可靠的,同理,最好能够使用WaitGroup来实现安全退出
go.mod
module datacentergo 1.15require (github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480github.com/brianvoe/gofakeit/v6 v6.0.0github.com/chanxuehong/wechat v0.0.0-20201110083048-0180211b69fdgithub.com/coreos/go-semver v0.3.0 // indirectgithub.com/dgrijalva/jwt-go v3.2.0+incompatiblegithub.com/fortytw2/leaktest v1.3.0 // indirectgithub.com/gin-gonic/gin v1.6.3github.com/go-playground/validator/v10 v10.4.1 // indirectgithub.com/go-redis/redis v6.15.9+incompatiblegithub.com/go-sql-driver/mysql v1.5.0github.com/gobwas/httphead v0.1.0 // indirectgithub.com/gobwas/pool v0.2.1 // indirectgithub.com/gobwas/ws v1.0.3github.com/golang/protobuf v1.4.2github.com/gotomicro/ego v0.3.7github.com/jmoiron/sqlx v1.3.1github.com/jolestar/go-commons-pool v2.0.0+incompatiblegithub.com/mattn/go-sqlite3 v2.0.1+incompatible // indirectgithub.com/nats-io/nats-server/v2 v2.1.7 // indirectgithub.com/nats-io/nats.go v1.10.0github.com/pborman/uuid v1.2.1github.com/qiniu/api.v7/v7 v7.7.0github.com/spf13/pflag v1.0.5 // indirectgithub.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71github.com/tal-tech/go-zero v1.0.28github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirectgolang.org/x/crypto v0.0.0-20200622213623-75b288015ac9golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbbgolang.org/x/time v0.0.0-20191024005414-555d28b269f0google.golang.org/grpc v1.29.1google.golang.org/protobuf v1.25.0github.com/eclipse/paho.mqtt.golang v1.2.0)
