1、go Http Post 发送文件流
package main
import (
"net/http"
"net/url"
"fmt"
"io/ioutil"
_ "io"
"bytes"
)
func main() {
postFile()
}
func post() {
//这是一个Post 参数会被返回的地址
strinUrl:="http://localhost:8080/aaa"`这里写代码片`
resopne,err:= http.PostForm(strinUrl,url.Values{"num":{"456"},"num1":{"123"}})
if err !=nil {
fmt.Println("err=",err)
}
defer func() {
resopne.Body.Close()
fmt.Println("finish")
}()
body,err:=ioutil.ReadAll(resopne.Body)
if err!=nil {
fmt.Println(" post err=",err)
}
fmt.Println(string(body))
}
func postFile(){
//这是一个Post 参数会被返回的地址
strinUrl:="http://localhost:8080/aaa"
byte,err:=ioutil.ReadFile("post.txt")
resopne,err :=http.Post(strinUrl,"multipart/form-data",bytes.NewReader(byte))
if err !=nil {
fmt.Println("err=",err)
}
defer func() {
resopne.Body.Close()
fmt.Println("finish")
}()
body,err:=ioutil.ReadAll(resopne.Body)
if err!=nil {
fmt.Println(" post err=",err)
}
fmt.Println(string(body))
}
二、sync.Map
package main
import (
"fmt"
"sync"
)
type Class struct {
Students sync.Map
}
func handler(key, value interface{}) bool {
fmt.Printf("Name :%s %s\n", key, value)
return true
}
func main() {
class := &Class{}
//存储值
class.Students.Store("Zhao", "class 1")
class.Students.Store("Qian", "class 2")
class.Students.Store("Sun", "class 3")
//遍历,传入一个函数,遍历的时候函数返回false则停止遍历
class.Students.Range(handler)
//查询
if _, ok := class.Students.Load("Li"); !ok {
fmt.Println("-->Li not found")
}
//查询或者追加
_, loaded := class.Students.LoadOrStore("Li", "class 4")
if loaded {
fmt.Println("-->Load Li success")
} else {
fmt.Println("-->Store Li success")
}
//删除
class.Students.Delete("Sun")
fmt.Println("-->Delete Sun success")
//遍历
class.Students.Range(handler)
}
三、路由器
package main
import (
"fmt"
"net/http"
)
type MyMux struct {
}
func (p *MyMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
sayhelloName(w, r)
return
}
http.NotFound(w, r)
return
}
func sayhelloName(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello myroute!")
}
func main() {
mux := &MyMux{}
http.ListenAndServe(":9090", mux)
}
四、解析yaml文件
package main
import (
"fmt"
"log"
"gopkg.in/yaml.v2"
)
var data = `
blog: xiaorui.cc
best_authors: ["fengyun","lee","park"]
desc:
counter: 521
plist: [3, 4]
`
type T struct {
Blog string
Authors []string `yaml:"best_authors,flow"`
Desc struct {
Counter int `yaml:"Counter"`
Plist []int `yaml:",flow"`
}
}
func main() {
t := T{}
//把yaml形式的字符串解析成struct类型
err := yaml.Unmarshal([]byte(data), &t)
//修改struct里面的记录
t.Blog = "this is Blog"
t.Authors = append(t.Authors, "myself")
t.Desc.Counter = 99
fmt.Printf("--- t:\n%v\n\n", t)
//转换成yaml字符串类型
d, err := yaml.Marshal(&t)
if err != nil {
log.Fatalf("error: %v", err)
}
fmt.Printf("--- t dump:\n%s\n\n", string(d))
}
五、mq
package main
import (
"flag"
"log"
"strconv"
"time"
"github.com/nats-io/nats.go"
"github.com/pborman/uuid"
)
const (
//url = "nats://192.168.3.125:4222"
url = nats.DefaultURL
)
var (
nc *nats.Conn
err error
)
func init() {
// if nc, err = nats.Connect(url); checkErr(err) {
// fmt.
// }
}
func main() {
}
//发送消息
func sendMessage() {
var (
subj = flag.String("subj", "", "subject name")
)
flag.Parse()
log.Println(*subj)
startClient(*subj)
time.Sleep(time.Second)
}
func startClient(subj string) {
for i := 0; i < 1; i++ {
id := uuid.New()
log.Println(id)
nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i)))
nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))
nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))
nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))
}
}
//接收消息
func receiveMessage() {
var (
servername = flag.String("servername", "y", "name for server")
queueGroup = flag.String("group", "", "group name for Subscribe")
subj = flag.String("subj", "", "subject name")
)
flag.Parse()
log.Println(*servername, *queueGroup, *subj)
startService(*subj, *servername+" worker1", *queueGroup)
startService(*subj, *servername+" worker2", *queueGroup)
startService(*subj, *servername+" worker3", *queueGroup)
select {}
}
func startService(subj, name, queue string) {
go async(nc, subj, name, queue)
}
func async(nc *nats.Conn, subj, name, queue string) {
nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
log.Println(name, "Received a message From Async : ", string(msg.Data))
})
}
func checkErr(err error) bool {
if err != nil {
log.Println(err)
return false
}
return true
}
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
//发送消息
funcSend(ch)
//收消息
funcRecv(ch)
}
//生产者
func funcSend(ch *amqp.Channel) {
q, err := ch.QueueDeclare(
"queue", //name
false, //durables
false, //delete when unused
false, //exclusive
false, //no wait
nil, //args
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
error := ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
log.Printf("[x] Sent %s", body)
failOnError(error, "Failed to publish a message")
}
//消费者
func funcRecv(ch *amqp.Channel) {
q, err := ch.QueueDeclare(
"queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
六、map
package main
import "fmt"
import "sync"
import "time"
func main(){
var mTest sync.Map
go SyncTest(mTest)
go SyncTest(mTest)
time.Sleep(time.Second * 20)
}
func SyncTest(mTest sync.Map){
for j := 0; j < 1000; j++{
fmt.Println(mTest.Load("A"))//Load 读取
mTest.Store("A", "a") //Store 存储
fmt.Println(mTest.Load("A"))
mTest.Delete("A") //Delete 删除
fmt.Println(mTest.Load("A"))
mTest.LoadOrStore("B","b") //LoadOrStore 读取失败则存储
fmt.Println(mTest.Load("B"))
}
}
func funcMap() {
// 先声明map
var m1 map[string]string
// 再使用make函数创建一个非nil的map,nil map不能赋值
m1 = make(map[string]string)
// 最后给已声明的map赋值
m1["a"] = "aa"
m1["b"] = "bb"
// 直接创建
m2 := make(map[string]string)
// 然后赋值
m2["a"] = "aa"
m2["b"] = "bb"
// 初始化 + 赋值一体化
m3 := map[string]string{
"a": "aa",
"b": "bb",
}
fmt.Println(m3)
// ==========================================
// 查找键值是否存在
if v, ok := m1["a"]; ok {
fmt.Println(v)
} else {
fmt.Println("Key Not Found")
}
// 遍历map
for k, v := range m1 {
fmt.Println(k, v)
}
}
七、多路复用
package main
import (
"fmt"
"net/http"
)
// 设置多个处理器函数
func handler1(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "1 欢迎访问 www.ydook.com !")
}
func handler2(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "2 欢迎访问 www.ydook.com !")
}
func handler3(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "3 欢迎访问 www.ydook.com !")
}
func main() {
// 设置多路复用处理函数
mux := http.NewServeMux()
mux.HandleFunc("/h1", handler1)
mux.HandleFunc("/h2", handler2)
mux.HandleFunc("/h3", handler3)
// 设置服务器
server := &http.Server{
Addr: "127.0.0.1:8000",
Handler: mux,
}
// 设置服务器监听请求端口
server.ListenAndServe()
}
八、for循环
package main
import (
"fmt"
"time"
)
func main() {
str := []string{"I","am","Sergey"}
for _,v := range str{
go func() {
fmt.Println(v)
}()
}
time.Sleep(3 * time.Second)
}
package main
import (
"fmt"
"time"
)
func main() {
str := []string{"I","am","Sergey"}
for _,v := range str{
go func(v string) {
fmt.Println(v)
}(v)
}
time.Sleep(3 * time.Second)
}
package main
import "fmt"
func main() {
slice := []int{0, 1, 2, 3}
myMap := make(map[int]*int)
for index , value := range slice {
myMap[index] = &value
}
prtMap(myMap)
}
func prtMap(myMap map[int]*int) {
for key, value := range myMap {
fmt.Printf("map[%v]=%v\n", key, *value)
}
}
package main
import "fmt"
func main() {
slice := []int{0, 1, 2, 3}
myMap := make(map[int]*int)
for index , value := range slice {
v := value
myMap[index] = &v
}
prtMap(myMap)
}
func prtMap(myMap map[int]*int) {
for key, value := range myMap {
fmt.Printf("map[%v]=%v\n", key, *value)
}
}
九、Stringer接口
package main
import "fmt"
type Person struct {
Name string
Age int
}
func (p Person) String() string {
return fmt.Sprintf("%v (%v years)", p.Name, p.Age)
}
func main() {
a := Person{"Arthur Dent", 42}
z := Person{"Zaphod Beeblebrox", 9001}
fmt.Println(a, z)
}
十、抽象类实现
type IPeople interface {
GetName() string
SetName(string)
GetAge() int
SetAge(int)
Run()
}
type AbstractPeople struct {
IPeople
name string
age int
}
func (a AbstractPeople) GetName() string {
return a.name
}
func (a *AbstractPeople) SetName(newName string) {
a.name = newName
}
func (a AbstractPeople) GetAge() int {
return a.age
}
func (a *AbstractPeople) SetAge(newAge int) {
a.age = newAge
}
// user.
func UsePeople(p IPeople) {
fmt.Println(p.GetName())
fmt.Println(p.GetAge())
p.SetName("Alice")
p.SetAge(p.GetAge() + 1)
p.Run()
}
十一、接口
package main
import (
"fmt"
)
// 定义一个数据写入器
type DataWriter interface {
WriteData(data interface{}) error
}
// 定义文件结构,用于实现DataWriter
type file struct {
}
// 实现DataWriter接口的WriteData方法
func (d *file) WriteData(data interface{}) error {
// 模拟写入数据
fmt.Println("WriteData:", data)
return nil
}
func main() {
// 实例化file
f := new(file)
// 声明一个DataWriter的接口
var writer DataWriter
// 将接口赋值f,也就是*file类型
writer = f
// 使用DataWriter接口进行数据写入
writer.WriteData("data")
}
十二、多线程
package main
import (
"runtime"
"sync"
)
var (
cpunum = runtime.NumCPU()-1
)
func main(){
ch := make(chan string)
runtime.GOMAXPROCS(cpunum)
wg := sync.WaitGroup{}
for i := 0; i < cpunum; i++{
go WgReadLogs(ch, &wg)
}
wg.Add(2)
ch <- "./health/stat1.rec"
ch <- "./report/stat2.rec"
wg.Wait()
close(ch)
}
func WgReadLogs(ch chan string,wg *sync.WaitGroup){
for true{
tmp,ok := <-ch
if !ok{
break
}
ReadLogs(tmp)
wg.Done()
}
}
func ReadLogs(logname string){
fmt.Println(logname)
}
十三、websocket
package main
import (
"bytes"
"fmt"
"log"
"net/http"
"unsafe"
"golang.org/x/net/websocket"
)
func BytePtrToString(p *byte) string {
bufs := bytes.NewBufferString("")
for ps := p; *ps != byte(0); ps = (*byte)(unsafe.Pointer(1 + (uintptr)(unsafe.Pointer(ps)))) {
bufs.WriteByte(*ps)
}
return bufs.String()
}
func main() {
fmt.Println("websocket地址: ws://127.0.0.1:8080/runtime")
http.Handle("/runtime", websocket.Handler(echo))
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
//测试地址: http://coolaf.com/tool/chattest 首字母大写是公有的 Echo,首字母小写是私有的 echo
func echo(w *websocket.Conn) {
var error error
for {
//只支持string类型
var reply string
if error = websocket.Message.Receive(w, &reply); error != nil {
log.Println("websocket出现异常", error)
break
}
fmt.Println("收到客户端消息:" + reply)
msg := reply + ", 我是服务端"
fmt.Println("发送客户端消息:" + msg)
if error = websocket.Message.Send(w, msg); error != nil {
log.Println("websocket出现异常", error)
break
}
}
}
十四、协程池
package main
import (
"fmt"
"time"
)
// 任务的属性应该是一个业务函数
type Task struct {
f func() error // 函数名f, 无参,返回值为error
}
// 创建Task任务
func NewTask(arg_f func() error) *Task {
task := Task{
f: arg_f,
}
return &task
}
// Task绑定业务方法
func (task *Task) Execute() {
task.f() // 调用任务中已经绑定好的业务方法
}
// ------------------------------------------------
type Pool struct {
EntryChannel chan *Task // 对外的Task入口
JobsChannel chan *Task // 内部的Task队列
workerNum int // 协程池中最大的woker数量
}
// 创建Pool
func NewPool(cap int) *Pool {
pool := Pool{
EntryChannel: make(chan *Task),
JobsChannel: make(chan *Task),
workerNum: cap,
}
return &pool
}
// Pool绑定干活的方法
func (pool *Pool) worker(workID int) {
// worker工作 : 永久从JobsChannel取任务 然后执行任务
for task := range pool.JobsChannel {
task.Execute()
fmt.Println("work ID ", workID, " has executed")
}
}
// Pool绑定协程池工作方法
func (pool *Pool) run() {
// 定义worker数量
for i := 0; i < pool.workerNum; i++ {
go pool.worker(i)
}
// 从EntryChannel去任务,发送给JobsChannel
for task := range pool.EntryChannel {
pool.JobsChannel <- task // 添加task优先级排序逻辑
}
}
// ------------------------------------------------
func main() {
// 创建一些任务
task := NewTask(func() error { // 匿名函数
fmt.Println(time.Now())
return nil
})
// 创建协程池
pool := NewPool(4)
// 创建多任务,抛给协程池
go func() { // 开启新的协程,防止阻塞
for {
pool.EntryChannel <- task
}
}()
// 启动协程池
pool.run()
}
package main
import (
"fmt"
"sync"
"time"
)
// 每个协程都会运行该函数。
// 注意,WaitGroup 必须通过指针传递给函数。
func worker(id int, wg *sync.WaitGroup) {
fmt.Printf("Worker %d starting\n", id)
// 睡眠一秒钟,以此来模拟耗时的任务。
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
// 通知 WaitGroup ,当前协程的工作已经完成。
wg.Done()
}
func main() {
// 这个 WaitGroup 被用于等待该函数开启的所有协程。
var wg sync.WaitGroup
// 开启几个协程,并为其递增 WaitGroup 的计数器。
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
// 阻塞,直到 WaitGroup 计数器恢复为 0,即所有协程的工作都已经完成。
wg.Wait()
}
package main
import (
"fmt"
"time"
)
// 这里是 worker,我们将并发执行多个 worker。
// worker 将从 `jobs` 通道接收任务,并且通过 `results` 发送对应的结果。
// 我们将让每个任务间隔 1s 来模仿一个耗时的任务。
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "processing job", j)
time.Sleep(time.Second)
results <- j * 2
}
}
func main() {
// 为了使用 worker 线程池并且收集他们的结果,我们需要 2 个通道。
jobs := make(chan int, 100)
results := make(chan int, 100)
// 这里启动了 3 个 worker,初始是阻塞的,因为还没有传递任务。
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 这里我们发送 9 个 `jobs`,然后 `close` 这些通道
// 来表示这些就是所有的任务了。
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// 最后,我们收集所有这些任务的返回值。
for a := 1; a <= 9; a++ {
<-results
}
}
package main
import (
"context"
"fmt"
"strconv"
"sync/atomic"
pool "github.com/jolestar/go-commons-pool"
)
func Example_simple() {
type myPoolObject struct {
s string
}
v := uint64(0)
factory := pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
return &myPoolObject{
s: strconv.FormatUint(atomic.AddUint64(&v, 1), 10),
},
nil
})
ctx := context.Background()
p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
obj, err := p.BorrowObject(ctx)
if err != nil {
panic(err)
}
o := obj.(*myPoolObject)
fmt.Println(o.s)
err = p.ReturnObject(ctx, obj)
if err != nil {
panic(err)
}
// Output: 1
}
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: Something CPU heavy with payload
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
defer r.Body.Close()
// Funnel this work into our pool. This call is synchronous and will
// block until the job is completed.
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
package main
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
)
var (
// ErrInvalidPoolCap return if pool size <= 0
ErrInvalidPoolCap = errors.New("invalid pool cap")
// ErrPoolAlreadyClosed put task but pool already closed
ErrPoolAlreadyClosed = errors.New("pool already closed")
)
const (
// RUNNING pool is running
RUNNING = 1
// STOPED pool is stoped
STOPED = 0
)
// Task task to-do
type Task struct {
Handler func(v ...interface{})
Params []interface{}
}
// Pool task pool
type Pool struct {
capacity uint64
runningWorkers uint64
state int64
taskC chan *Task
PanicHandler func(interface{})
sync.Mutex
}
// NewPool init pool
func NewPool(capacity uint64) (*Pool, error) {
if capacity <= 0 {
return nil, ErrInvalidPoolCap
}
return &Pool{
capacity: capacity,
state: RUNNING,
taskC: make(chan *Task, capacity),
}, nil
}
// GetCap get capacity
func (p *Pool) GetCap() uint64 {
return p.capacity
}
// GetRunningWorkers get running workers
func (p *Pool) GetRunningWorkers() uint64 {
return atomic.LoadUint64(&p.runningWorkers)
}
func (p *Pool) incRunning() {
atomic.AddUint64(&p.runningWorkers, 1)
}
func (p *Pool) decRunning() {
atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}
// Put put a task to pool
func (p *Pool) Put(task *Task) error {
if p.getState() == STOPED {
return ErrPoolAlreadyClosed
}
// safe run worker
p.Lock()
if p.GetRunningWorkers() < p.GetCap() {
p.run()
}
p.Unlock()
// send task safe
p.Lock()
if p.state == RUNNING {
p.taskC <- task
}
p.Unlock()
return nil
}
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
if r := recover(); r != nil {
if p.PanicHandler != nil {
p.PanicHandler(r)
} else {
log.Printf("Worker panic: %s\n", r)
}
}
}()
for {
select {
case task, ok := <-p.taskC:
if !ok {
return
}
task.Handler(task.Params...)
}
}
}()
}
func (p *Pool) getState() int64 {
p.Lock()
defer p.Unlock()
return p.state
}
func (p *Pool) setState(state int64) {
p.Lock()
defer p.Unlock()
p.state = state
}
// close safe
func (p *Pool) close() {
p.Lock()
defer p.Unlock()
close(p.taskC)
}
// Close close pool graceful
func (p *Pool) Close() {
if p.getState() == STOPED {
return
}
p.setState(STOPED) // stop put task
for len(p.taskC) > 0 { // wait all task be consumed
time.Sleep(1e6) // reduce CPU load
}
p.close()
}
//https://github.com/wazsmwazsm/mortar
func main() {
// 创建容量为 10 的任务池
pool, err := NewPool(10)
if err != nil {
panic(err)
}
wg := new(sync.WaitGroup)
for i := 0; i < 1000; i++ {
wg.Add(1)
// 创建任务
task := &Task{
Handler: func(v ...interface{}) {
wg.Done()
fmt.Println(v)
},
}
// 添加任务函数的参数
task.Params = []interface{}{i, i * 2, "hello"}
// 将任务放入任务池
pool.Put(task)
}
wg.Add(1)
// 再创建一个任务
pool.Put(&Task{
Handler: func(v ...interface{}) {
wg.Done()
fmt.Println(v)
},
Params: []interface{}{"hi!"}, // 也可以在创建任务时设置参数
})
wg.Wait()
// 安全关闭任务池(保证已加入池中的任务被消费完)
pool.Close()
// 如果任务池已经关闭, Put() 方法会返回 ErrPoolAlreadyClosed 错误
err = pool.Put(&Task{
Handler: func(v ...interface{}) {},
})
if err != nil {
fmt.Println(err) // print: pool already closed
}
}
十五、go调用C语言
注意:
rpc(远程函数调用)提供client端通过网络调用远程server端的函数的服务。
rpc-server端需要提供较高的吞吐能力,支持较大的并发连接。
epoll监听多个连接fd,实现IO复用
1)epoll没有最大并发连接的限制,上限是最大可以打开文件的数目,一般远大于2048<br /> 2)epoll效率高,只管活跃的连接,而跟连接总数无关<br /> 3)使用共享内存,省去了内存拷贝
生产者&消费者模式对请求任务进行管理
一个IO-thread线程维护连接池中的连接,连接有3种状态:NO_USED、READY、BUSY。<br /> 1)NO_USED状态,fd位置空闲,连接位尚未使用<br /> 2)READY状态,已经与client端连接连接的fd,等待请求事件<br /> 3)BUSY状态,已经收到请求事件,将fd放入任务队列,等待work-thread处理
```go package main
/*
include
include
void say(){ printf(“hello world\n”); } */ import “C”
import ( “fmt” “net/http” “runtime” “syscall” “time”
"github.com/gin-gonic/gin"
)
func main() { runtime.LockOSThread() router := gin.Default() router.GET(“/ping”, func(c gin.Context) { fmt.Println(“worker”, syscall.Getegid()) c.JSON(200, gin.H{ “message”: “pong”, }) }) s := &http.Server{ Addr: “:18282”, Handler: router, ReadTimeout: 600 time.Second, WriteTimeout: 600 * time.Second, MaxHeaderBytes: 1 << 20, } fmt.Println(“listen:”, syscall.Getegid()) s.ListenAndServe() fmt.Println(“main”, C.say()) }
func testMain() { runtime.GOMAXPROCS(runtime.NumCPU()) ch1 := make(chan bool) ch2 := make(chan bool) fmt.Println(“main”, C.say()) go func() { runtime.LockOSThread() fmt.Println(“locked”, C.say()) go func() { fmt.Println(“locked child”, C.say()) ch1 <- true }() ch2 <- true }() <-ch1 <-ch2 }
<a name="AlQ0C"></a>
# 十六、go RPC
```go
package main
import (
"context"
"log"
"net"
"os"
"time"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/reflection"
)
func main() {
println(f(1))
}
const (
port = ":50051"
address = "localhost:50051"
defaultName = "world"
)
type server struct{} //服务对象
// SayHello 实现服务的接口 在proto中定义的所有服务都是接口
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func serverMain() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer() //起一个服务
pb.RegisterGreeterServer(s, &server{})
// 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func clientMain() {
//建立链接
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
// 1秒的上下文
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message)
}
func f(x int) (_, __ int) {
_, __ = x, x
return
}
十七、go调用tol物联网
package main
import (
"fmt"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const broker = "tcp://你的服务器.mqtt.iot.gz.baidubce.com:1883"
const username = "你的服务器/client"
const password = "UJq**密码**msq"
const ClientID = "随意"
//message的回调
var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload())
}
var wg sync.WaitGroup
var client mqtt.Client
func main() {
//连接MQTT服务器
mqttConnect()
defer client.Disconnect(250) //注册销毁
wg.Add(1)
go mqttSubScribe("test1")
wg.Add(1)
go testPublish()
wg.Wait()
}
func mqttConnect() {
//配置
clinetOptions := mqtt.NewClientOptions().AddBroker(broker).SetUsername(username).SetPassword(password)
clinetOptions.SetClientID(ClientID)
clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
//连接
client = mqtt.NewClient(clinetOptions)
//客户端连接判断
if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
func mqttSubScribe(topic string) {
defer wg.Done()
for {
token := client.Subscribe(topic, 1, onMessage)
token.Wait()
}
}
//测试 3秒发送一次,然后自己接收
func testPublish() {
defer wg.Done()
for {
client.Publish("test1", 1, false, "TEST")
time.Sleep(time.Duration(3) * time.Second)
}
}
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
//https://cloud.emqx.cn/console/deployments/0?oper=new
// ClientOptions:用于设置 broker,端口,客户端 id ,用户名密码等选项
// messagePubHandler:全局 MQTT pub 消息处理
// connectHandler:连接的回调
// connectLostHandler:连接丢失的回调
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
client.Disconnect(250)
}
//发布消息
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
//订阅
func sub(client mqtt.Client) {
topic := "topic/test"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
//如果想使用 TLS 连接,可以如下设置:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
// Import client certificate/key pair
clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientKeyPair},
}
}
//如果不设置客户端证书,可以如下设置:
func NewTlsConfigs() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
}
}
十八、乐观锁 参考:https://github.com/crossoverJie/gorm-optimistic
Quick start
func BenchmarkUpdateWithOptimistic(b *testing.B) {
dsn := "root:abc123@/test?charset=utf8&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
fmt.Println(err)
return
}
b.RunParallel(func(pb *testing.PB) {
var out Optimistic
db.First(&out, Optimistic{Id: 1})
out.Amount = out.Amount + 10
err = UpdateWithOptimistic(db, &out, func(model Lock) Lock {
bizModel := model.(*Optimistic)
bizModel.Amount = bizModel.Amount + 10
return bizModel
}, 5, 0)
if err != nil {
fmt.Println(err)
}
})
}
Model
type Optimistic struct {
Id int64 `gorm:"column:id;primary_key;AUTO_INCREMENT" json:"id"`
UserId string `gorm:"column:user_id;default:0;NOT NULL" json:"user_id"` // 用户ID
Amount float32 `gorm:"column:amount;NOT NULL" json:"amount"` // 金额
Version int64 `gorm:"column:version;default:0;NOT NULL" json:"version"` // 版本
}
func (o *Optimistic) TableName() string {
return "t_optimistic"
}
func (o *Optimistic) GetVersion() int64 {
return o.Version
}
func (o *Optimistic) SetVersion(version int64) {
o.Version = version
}
十九、并发条件下的有序输出
package main
import (
"fmt"
"sync"
"time"
)
func main() {
q := make(chan int)
wg := new(sync.WaitGroup)
wg.Add(1)
go func(wg *sync.WaitGroup, q chan<- int) {
defer wg.Done()
defer close(q)
for i := 1; i <= 100; i++ {
q <- i
time.Sleep(time.Microsecond) // 利用休眠实现对生产者的控制,但这种方案不可靠
}
}(wg, q)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup, q <-chan int) {
defer wg.Done()
for {
if v, ok := <-q; ok {
fmt.Println(v)
} else {
return
}
}
}(wg, q)
}
wg.Wait()
}
// 利用WaitGroup实现安全退出,使生产者休眠来进行控制,但是如果消费者的操作是耗时是不可靠谱的,就会变得不可靠
package main
import (
"fmt"
"time"
)
func print(ch <-chan int){
for{
fmt.Println(<-ch)
}
}
func main() {
ch0 :=make(chan int)
ch1 :=make(chan int)
ch2 :=make(chan int)
go print(ch0)
go print(ch1)
go print(ch2)
for i:=1; i<101; i++{
switch{
case i%3 == 0:
ch0 <- i
case i%3 == 1:
ch1 <- i
case i%3 == 2:
ch2 <-i
}
time.Sleep(time.Microsecond)
}
}
// 利用for循环实现安全退出,同样是使生产者休眠来进行控制,但是如果消费者的操作是耗时是不可靠谱的,就会变得不可靠
package main
import (
"fmt"
"sync"
"time"
)
func main() {
q := make(chan int,100)
// 填充管道
for i := 1; i <= 100; i++ {
q<-i
}
cond := sync.NewCond(&sync.Mutex{})
for i := 0; i < 3; i++ {
go func(c *sync.Cond, q <-chan int) {
for {
c.L.Lock()
c.Wait()
if v, ok := <-q; ok {
fmt.Println(v)
c.L.Unlock()
} else {
c.L.Unlock()
return
}
}
}(cond, q)
}
time.Sleep(time.Second) // 使所有的goroutine进入等待状态
for i := 1; i <= 100; i++ {
cond.Signal() // 唤醒一个等待状态的goroutine
time.Sleep(time.Microsecond)
}
close(q)
time.Sleep(time.Second)
}
// 利用cond是所有goroutine进入阻塞状态直到被唤醒来实现有序输出,
cond和其他锁不一样,临界资源被锁之后goroutine不会进行轮询,而是等待调度,减少资源开销
package main
import (
"fmt"
"sync"
"time"
)
func main() {
q := make(chan int)
wg := new(sync.Mutex)
go func( q chan<- int) {
defer close(q)
for i := 1; i <= 100; i++ {
q <- i
}
}(q)
for i := 0; i < 3; i++ {
go func(wg *sync.Mutex, q <-chan int) {
for {
wg.Lock()
if v, ok := <-q; ok {
fmt.Println(v)
wg.Unlock()
} else {
wg.Unlock()
return
}
}
}(wg, q)
}
time.Sleep(time.Second) // 安全退出
}
// 使用互斥锁阻塞其他协程来实现抢占,使用休眠实现安全退出,
这种方案是可靠的,但最好能够使用WaitGroup来实现安全退出
package main
import (
"fmt"
"time"
)
func main() {
a := 0
fn := func() int {
a++
return a
}
ch := make(chan int, 1)
defer close(ch)
for i := 0; i < 3; i++ {
go func() {
for {
ch <- 1
n := fn()
if n > 100 {
break
}
fmt.Println(n)
<-ch
}
}()
}
time.Sleep(time.Second) // 安全退出
}
// 使用管道来塞其他协程来实现抢占,使用休眠实现安全退出
,这种方案是可靠的,同理,最好能够使用WaitGroup来实现安全退出
go.mod
module datacenter
go 1.15
require (
github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480
github.com/brianvoe/gofakeit/v6 v6.0.0
github.com/chanxuehong/wechat v0.0.0-20201110083048-0180211b69fd
github.com/coreos/go-semver v0.3.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.0.3
github.com/golang/protobuf v1.4.2
github.com/gotomicro/ego v0.3.7
github.com/jmoiron/sqlx v1.3.1
github.com/jolestar/go-commons-pool v2.0.0+incompatible
github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
github.com/nats-io/nats-server/v2 v2.1.7 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/pborman/uuid v1.2.1
github.com/qiniu/api.v7/v7 v7.7.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
github.com/tal-tech/go-zero v1.0.28
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.25.0
github.com/eclipse/paho.mqtt.golang v1.2.0
)