Golang-websocket 案例學習

演示websocket在應用上的使用


golangWebsocket

處理html靜態檔案和websocket路由

package main
 
import (
	"log"
	"net/http"
)
 
func main() {
	setupApi()
	log.Fatalln(http.ListenAndServe(":8082", nil))
}
 
func setupApi() {
	manger := Manager{}
	http.Handle("/", http.FileServer(http.Dir("./front"))) //加載前端
	http.HandleFunc("/ws", manger.serveWs)
}
 
//go run *.go 運行所有文件

新增一個簡單的ws服務

// http status 101 代表正在切換協議
package main
 
// manager.go // 管理websocket
import (
	"github.com/gorilla/websocket"
	"log"
	"net/http"
)
 
// 管理websocket
var (
	websocketUpgrade = websocket.Upgrader{
		//確保用戶不會發送巨大封包
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
	}
)
 
type Manager struct {
}
 
// 工廠模式
func NewManager() *Manager {
	return &Manager{}
}
 
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
	log.Println("new connection")
	//upgrade regular http connection into websocket
	conn, err := websocketUpgrade.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	conn.Close()
}

新增基本客戶端

package main
 
import "github.com/gorilla/websocket"
 
//客戶端列表
type ClientList map[*Client]bool
 
//處理所有單用戶相關的內容
type Client struct {
	connection *websocket.Conn
	manager *Manager
}
 
func NewClient(conn *websocket.Conn,manager *Manager) *Client {
	return &Client{
		connection: conn,
		//之所以會使用manager是因為會將一些事情引導到manager進行處理,例如像其他用戶廣播
		manager: manager,
	}
}

修改Manager讓Manager能夠維護Client

type Manager struct {
	clients ClientList
	//會有很多人同時連接到API,使用互斥鎖保護
	sync.RWMutex
}

新增和移除

func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
	log.Println("new connection")
	//upgrade regular http connection into websocket
	conn, err := websocketUpgrade.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := NewClient(conn,m)
	m.addClient(client)
	//conn.Close()
}
 
//向管理起添加或刪除客戶端
func (m *Manager)addClient(client *Client) {
	m.Lock()
	//鎖上後當兩個人同時連接就不會同時修該Client list,Client list本質為map
	defer m.Unlock()
	m.clients[client] = true
}
 
func (m *Manager) removeClient(client *Client){
	m.Lock()
	defer m.Unlock()
	if _,ok := m.clients[client]; ok{
		client.connection.Close()
		delete(m.clients, client)
	}
}

讀取訊息

package main
 
import (
	"github.com/gorilla/websocket"
	"log"
)
 
// 客戶端列表,上線狀態
type ClientList map[*Client]bool
 
// 處理所有單用戶相關的內容
type Client struct {
	connection *websocket.Conn
	manager    *Manager
}
 
func NewClient(conn *websocket.Conn, manager *Manager) *Client {
	return &Client{
		connection: conn,
		//之所以會使用manager是因為會將一些事情引導到manager進行處理,例如像其他用戶廣播
		manager: manager,
	}
}
 
// 使用一個無緩衝的通道來防止連接同時獲得過多的請求
func (c *Client) readMessages() {
	//這邊使用defer是因為跳出for迴圈後再執行
	defer func() {
		//cleanup connection
		c.manager.removeClient(c)
	}()
 
	for {
		//payload為負載
		messageType, payload, err := c.connection.ReadMessage()
		//messageType在RFC中定義有幾種不同的消息類型讓你對數據,二進制進行ping/pong
		if err != nil {
			//連接意外關閉返回錯誤
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				//檢查異常是因為不希望正常斷開的時候也被當成錯誤紀錄
				log.Printf("error reading message %v", err)
				break
			}
		}
		log.Println(messageType)
		log.Println(payload)
	}
}
function sendMessage(){
                var newMessage = document.getElementById("message");
                if(newMessage !== null ){
                    // console.log(newMessage);
                    conn.send(newMessage.value);
                }
                return false;
            }

使用Event判斷消息類型並取代原先的發送和接受

        <script>
            let selectedChat = "general";
            class Event {
                //更好的控制用戶發送的訊息
                constructor(type ,payload) {
                    this.type = type;
                    this.payload = payload;
                }
            }
            function routeEvent(event){
                if(event.type === undefined){
                    alert("no type field in the event");
                }
                switch (event.type){
                    case "new_message":
                        console.log("new Message");
                        break;
                    default:
                        alert("not supported this type");
                        break;
                }
            }
 
            function sendEvent(eventName,payload){
                const event = new Event(eventName,payload);
                conn.send(JSON.stringify(event))
            }
 
            function changeChatRoom (){
                let newChat = document.getElementById("chatroom");
                if(newChat !== null && newChat.value !== selectedChat){
                    console.log(newChat);
                }
                return false;
            }
            function sendMessage(){
                let newMessage = document.getElementById("message");
                if(newMessage !== null ){
                    // console.log(newMessage);
                    // conn.send(newMessage.value);
                    sendEvent("send_message", newMessage)
                }
                return false;
            }
 
            window.onload = function(){
                document.getElementById("chatroom-selection").onsubmit = changeChatRoom;
                document.getElementById("chatroom-message").onsubmit = sendMessage;
 
                if(window["WebSocket"]){
                    console.log("support websocket")
                    //connect to websocket
                    conn = new WebSocket("ws://"+document.location.host+"/ws")
                    conn.onmessage = function(e){
                        // console.log(e)
                        const eventData = JSON.parse(e.data);
                        const event = Object.assign(new Event, eventData);
                        routeEvent(event)
                    }
                }else{
                    alert("not support websocket")
                }
            }
        </script>
// event.go
package main
 
import "encoding/json"
 
type Event struct {
	Type    string          `json:"type"`
	Payload json.RawMessage `json:"payload"`
	//使用RawMessage(原始格式)是因為我們希望使用者可以發送任何類型
}
 
type EventHandler func(event Event, c *Client) error
 
const (
	EventSendMessage = "send_message"
)
 
type SendMessageEvent struct {
	Message string `json:"message"`
	From    string `json:"from"`
}
 
//將這些儲存在管理器,確保管理器方便處理
type Manager struct {
	clients ClientList
	//會有很多人同時連接到API,使用互斥鎖保護
	sync.RWMutex
	//將type當作key並允許我們獲取事件處理程序
	handlers map[string]EventHandler
}
 
// 工廠模式
func NewManager() *Manager {
	m := &Manager{
		clients:  make(ClientList),
		handlers: make(map[string]EventHandler),
	}
	m.setupEventHandlers()
	return m
}
 
func (m *Manager) setupEventHandlers() {
	m.handlers[EventSendMessage] = SendMessage
}
 
func SendMessage(event Event, c *Client) error {
	fmt.Println(event)
	return nil
}
 
func (m *Manager) routeEvent(event Event, c *Client) error {
	//檢查事件類型是否是處理程序的一部分,處理程序是一個使用事件類型作為key的map
	//因此每當我們收到類型設置為發送消息,都會觸發發送消息
	if handler, ok := m.handlers[event.Type]; ok {
		if err := handler(event, c); err != nil {
			return err
		}
		return nil
	} else {
		return errors.New("there is no such event type")
	}
}
func (c *Client) readMessages() {
	//這邊使用defer是因為跳出for迴圈後再執行
	defer func() {
		//cleanup connection from client List,幫助我們清理未使用的客戶端
		c.manager.removeClient(c)
	}()
 
	for {
		//payload為負載,類行為byte
		_, payload, err := c.connection.ReadMessage()
		//messageType在RFC中定義有幾種不同的消息類型讓你對數據,二進制進行ping/pong
		if err != nil {
			//連接意外關閉返回錯誤
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				//檢查異常是因為不希望正常斷開的時候也被當成錯誤紀錄
				log.Printf("error reading message %v", err)
				break
			}
		}
 
		//測試寫入egress,每read一個message都會發送給其他所有客戶
		//for wsclient := range c.manager.clients {
		//	wsclient.egress <- payload
		//}
		//
		//log.Println(messageType)
		//log.Println(string(payload)) ->使用Event取代
		var request Event
		if err := json.Unmarshal(payload, &request); err != nil {
			log.Printf("error unmarshalling :%v", err)
			break
		}
		if err := c.manager.routeEvent(request, c); err != nil {
			log.Println("error handling message", err)
		}
	}
}
 
func (c *Client) writeMessages() {
	defer func() {
		c.manager.removeClient(c)
	}()
 
	for {
		select {
		case message, ok := <-c.egress:
			if !ok {
				if err := c.connection.WriteMessage(websocket.CloseMessage, nil); err != nil {
					log.Println("connection closed :", err)
				}
				return //return 後會觸發破壞迴圈觸發defer
			}
 
			data, err := json.Marshal(message)
			if err != nil {
				log.Println(err)
				break
			}
 
			if err := c.connection.WriteMessage(websocket.TextMessage, data); err != nil {
				log.Printf("failed to send message %v:", err)
			}
			log.Println("message sent")
		}
	}
}

使用event原因

WebSocket是一種基於TCP的全雙工通信協議,它提供雙向實時、持久的連接,使得客戶端和服務器可以通過WebSocket進行雙向通信。
在WebSocket中,消息的發送和接收是異步的,這就需要在服務器端使用事件來區分不同類型的消息。
通常情況下,WebSocket 會定義一些預定的消息類型,比如聊天消息、命令消息等。當服務器接收到消息時,通過事件判斷消息的類型,然後根據不同的類型執行相應的邏輯。這可以是發送聊天消息消息給其他連接的客戶端,執行一些特定的操作,或者觸發一些事件。

作用地方

在編寫不同的聊天室時,通常會涉及多種類型的消息
例如用戶發送的聊天消息、系統通知消息、用戶加入或離開聊天室的消息等。這些消息可能具有不同的格式和含義,因此需要使用事件來判斷消息的類型,並根據類型執行相應的邏輯。

心跳機制

  • 會向服務端發送ping,目的是為了確保另一端的連接存在
  • 因為websocket依舊走在http協議上,如果閒置過久的話會被中斷,因此會使用ping/pong保持空連接
  • ping/pong屬於客戶端
  • ping給服務端後,前端要pong回應,因為RFC告訴我們ping和pong應該自動觸發,現在瀏覽器都會默認為自動
var (
	pongWait     = 10 * time.Second    //發送ping後pong的最多等待時間
	pingInterval = (pongWait * 9) / 10 //ping每次發送的煎個,如果滿足條件,該職必須低於Pong wait
)
 
//發送PING
ticker := time.NewTicker(pingInterval)
case <-ticker.C:
log.Println("ping")
//Send ping to client
//必須為指定類型,否則前端無法處理
if err := c.connection.WriteMessage(websocket.PingMessage, []byte(``)); err != nil {
log.Println("write message error", err)
return
}
//ping給服務端後,前端要pong回應,因為RFC告訴我們ping和pong應該自動觸發
}
 
//接受PONG
//當我們接受到pong以前能夠等待的時間
if err := c.connection.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Println(err)
return
}
//觸發pong時處理的handler,每收到pong就會觸發
c.connection.SetPongHandler(c.pongHandler)
 
func (c *Client) pongHandler(pongMessage string) error {
log.Println("pong")
//接受到pong以後要重置的時間
return c.connection.SetReadDeadline(time.Now().Add(pongWait))
}

巨型幀(Jumbo frames)

  • 設置read message的最大大小
	//Jumbo frames
	c.connection.SetReadLimit(512)

Cross Origin

//Cross Origin
func checkOrigin(r *http.Request)bool{
	//true將會連接,false關閉連接
	origin := r.Header.Get("Origin")
	switch origin {
	case "http://localhost:8082":
		return true
	default:
		return false
	}
}
 
// 管理websocket
var (
	websocketUpgrade = websocket.Upgrader{
		//cross
		CheckOrigin: checkOrigin,
		//確保用戶不會發送巨大封包
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
	}
)

Auth

  • 流程:用戶使用HTTP連接,返回ODP或一次性密碼,然後將其返回給URL中的websocket端點,如果有效就連接
 function login(){
                let formData = {
                    username:document.getElementById("username").value,
                    password:document.getElementById("password").value,
                };
                fetch("login",{
                    method:"POST",
                    body:JSON.stringify(formData),
                    mode:"cors"
                }).then((response) => {
                    if(response.ok){
                        return response.json();
                    }else{
                        throw 'unauthorized';
                    }
                }).then((data)=>{
                    //通過auth
                    connectWebsocket(data.otp);
                }).catch((e)=>{alert(e)});
                return false;
            }
 
            function connectWebsocket(otp){
                if(window["WebSocket"]){
                    console.log("support websocket");
                    //connect to websocket
                    let conn = new WebSocket("ws://"+document.location.host+"/ws?otp="+otp);
                    //連接到websocket時觸發
                    conn.onopen = function (e){
                        document.getElementById("connection-header").innerText = "Connect to websocket : True";
                    }
                    conn.onclose = function (e){
                        document.getElementById("connection-header").innerText = "Connect to websocket : True";
                        //reconnection
                    }
                    conn.onmessage = function(e){
                        // console.log(e)
                        const eventData = JSON.parse(e.data);
                        const event = Object.assign(new Event, eventData);
                        routeEvent(event);
                    }
                }else{
                    alert("not support websocket");
                }
            }
 
            window.onload = function(){
                document.getElementById("chatroom-selection").onsubmit = changeChatRoom;
                document.getElementById("chatroom-message").onsubmit = sendMessage;
                document.getElementById("login-form").onsubmit = login;
            }
//otp.go
package main
 
import (
	"context"
	"github.com/google/uuid"
	"time"
)
 
type OTP struct {
	Key     string    `json:"key"`
	Created time.Time `json:"created"`
}
 
// 這個map將包含一次性密碼,刪除太舊的一次性密碼
type RetentionMap map[string]OTP
 
// 接受一個上下文和保留時間
func NewRetentionMap(ctx context.Context, retentiontime time.Duration) RetentionMap {
	rm := make(RetentionMap)
	go rm.Retention(ctx, retentiontime) //開一個協程不斷檢查
	return rm
}
 
func (rm RetentionMap) NewOTP() OTP {
	o := OTP{
		Key:     uuid.NewString(),
		Created: time.Now(),
	}
	rm[o.Key] = o
	return o
}
 
func (rm RetentionMap) VerifyOTP(otp string) bool {
	if _, ok := rm[otp]; !ok {
		return false // otp is not valid
	}
	delete(rm, otp) //刪除一次性密碼
	return true
}
 
func (rm RetentionMap) Retention(ctx context.Context, retentionPeriod time.Duration) {
	//每次重新檢查的頻率
	ticker := time.NewTicker(400 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
			for _, otp := range rm {
				//過期時間比現在早,這密碼無效
				if otp.Created.Add(retentionPeriod).Before(time.Now()) {
					delete(rm, otp.Key)
				}
			}
			//上下文關閉
		case <-ctx.Done():
			return
		}
	}
}
 
//manager.go
 
type Manager struct {
	clients ClientList
	//會有很多人同時連接到API,使用互斥鎖保護
	sync.RWMutex
 
	otps RetentionMap
 
	//將type當作key並允許我們獲取事件處理程序
	handlers map[string]EventHandler
}
 
// 工廠模式
func NewManager(ctx context.Context) *Manager {
	m := &Manager{
		clients:  make(ClientList),
		otps:     NewRetentionMap(ctx, 5*time.Second),
		handlers: make(map[string]EventHandler),
	}
	m.setupEventHandlers()
	return m
}
 
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
	//驗證OTP是否有效
	otp := r.URL.Query().Get("otp")
	if otp == "" {
		w.WriteHeader(http.StatusUnauthorized)
		return
	}
	if !m.otps.VerifyOTP(otp) {
		w.WriteHeader(http.StatusUnauthorized)
		return
	}
	log.Println("new connection")
	//upgrade regular http connection into websocket
	conn, err := websocketUpgrade.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := NewClient(conn, m)
	m.addClient(client)
	// Start client processing
	go client.readMessages()
	go client.writeMessages()
}
 
 
func (m *Manager) loginHandler(w http.ResponseWriter, r *http.Request) {
	type userLoginRequest struct {
		Username string `json:"username"`
		Password string `json:"password"`
	}
	var req userLoginRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	//帳密驗證須依照自身所需進行更改,這邊只是事例
	if req.Username == "test" && req.Password == "test" {
		type Response struct {
			OTP string `json:"otp"`
		}
		otp := m.otps.NewOTP()
		resp := Response{
			OTP: otp.Key,
		}
		data, err := json.Marshal(resp)
		if err != nil {
			log.Println(err)
			return
		}
		w.WriteHeader(http.StatusOK)
		w.Write(data)
		return
	}
	w.WriteHeader(http.StatusUnauthorized)
}
 
main.go
func setupApi() {
	ctx := context.Background()
	manger := NewManager(ctx)
	http.Handle("/", http.FileServer(http.Dir("./front"))) //加載前端
	http.HandleFunc("/ws", manger.serveWs)
	http.HandleFunc("/login", manger.loginHandler)
}
 

新增廣播和群組聊天->最終完成品

front/index.html

 
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Go_Websocket</title>
</head>
<body>
    <div class="center">
        <h1>Chat</h1>
        <h3 id="connection-header">Connect to websocket: false</h3>
        <form id="chatroom-selection">
            <label for="chatroom" id="chat-header">ChatRoom:</label>
            <input type="text" id="chatroom" name="chatroom"><br><br>
            <input type="submit" value="Change Chatroom">
        </form>
        <br><br>
        <!--            record message from websovket-->
        <textarea class="messagearea" id="chatmessage" readonly name="chatmessages" cols="30" rows="10" placeholder="welocome to chatroom"></textarea>
        <br>
        <form id="chatroom-message">
            <label for="chatmessage">MESSAGE</label>
            <input type="text" id="message" name="message">
            <br><br>
            <input type="submit"  value="sendmessage">
        </form>
 
        <div style="border: 3px solid black;margin-top: 30px">
            <form id="login-form">
                <label for="username">Username</label>
                <input type="text" name="username" id="username">
                <label for="password">Password</label>
                <input type="password" name="password" id="password">
                <br><br>
                <input type="submit" value="Login">
            </form>
        </div>
 
        <script>
            let selectedChat = "general";
            class Event {
                //更好的控制用戶發送的訊息
                constructor(type ,payload) {
                    this.type = type;
                    this.payload = payload;
                }
            }
 
            class SendMessageEvent {
                constructor(message ,from) {
                    this.message = message;
                    this.from = from;
                }
            }
 
            class NewMessageEvent {
                constructor(message ,from,sent) {
                    this.message = message;
                    this.from = from;
                    this.sent = sent;
                }
            }
 
            class ChangeChatRoomEvent {
                constructor(name) {
                    this.name = name;
                }
            }
 
            function changeChatRoom(){
                var newchat = document.getElementById("chatroom")
                //不希望垃圾郵件更改到同一個聊天室,只能到新的
                if(newchat !== null && newchat.value !== selectedChat){
                    selectedChat = newchat.value;
                    header = document.getElementById("chat-header").innerHTML = "Currently in chatroom :" + selectedChat;
                    let changeEvent = new ChangeChatRoomEvent(selectedChat)
                    sendEvent("change_room",changeEvent)
                    textarea = document.getElementById("chatmessage");
                    textarea.innerHTML = `You changed new chatroom ${selectedChat}`;
                }
                //如果沒有return false將會重定向
                return false;
            }
 
            function routeEvent(event){
                if(event.type === undefined){
                    alert("no type field in the event");
                }
                switch (event.type){
                    case "new_message":
                        // console.log("new Message");
                        const messageEvent = Object.assign(new NewMessageEvent,event.payload)
                        appendChatMessage(messageEvent)
                        break;
                    default:
                        alert("not supported this type");
                        break;
                }
            }
 
            function appendChatMessage(messageEvent){
                let date = new Date(messageEvent.sent);
                const formattedMessage = `${date.toLocaleString()} : ${messageEvent.message}`;
                textarea = document.getElementById("chatmessage");
                textarea.innerHTML = textarea.innerHTML + "\n" + formattedMessage;
                textarea.scrollTop = textarea.scrollHeight;
            }
 
            function sendEvent(eventName,payload){
                const event = new Event(eventName,payload);
                conn.send(JSON.stringify(event))
            }
 
            function sendMessage(){
                let newMessage = document.getElementById("message");
                if(newMessage !== null ){
                    // console.log(newMessage);
                    // conn.send(newMessage.value);
                    //peter到時候需案自行更改
                    let outgoingEvent = new SendMessageEvent(newMessage.value,"test")
                    sendEvent("send_message",outgoingEvent);
                }
                return false;
            }
 
            function login(){
                let formData = {
                    username:document.getElementById("username").value,
                    password:document.getElementById("password").value,
                };
                fetch("login",{
                    method:"POST",
                    body:JSON.stringify(formData),
                    mode:"cors"
                }).then((response) => {
                    if(response.ok){
                        return response.json();
                    }else{
                        throw 'unauthorized';
                    }
                }).then((data)=>{
                    //通過auth
                    connectWebsocket(data.otp);
                }).catch((e)=>{alert(e)});
                return false;
            }
 
            function connectWebsocket(otp){
                if(window["WebSocket"]){
                    console.log("support websocket");
                    //connect to websocket
                    conn = new WebSocket("ws://"+document.location.host+"/ws?otp="+otp);
                    //連接到websocket時觸發
                    conn.onopen = function (e){
                        document.getElementById("connection-header").innerText = "Connect to websocket : True";
                    }
                    conn.onclose = function (e){
                        document.getElementById("connection-header").innerText = "Connect to websocket : True";
                        //reconnection
                    }
                    conn.onmessage = function(e){
                        // console.log(e)
                        const eventData = JSON.parse(e.data);
                        const event = Object.assign(new Event, eventData);
                        routeEvent(event);
                    }
                }else{
                    alert("not support websocket");
                }
            }
 
            window.onload = function(){
                document.getElementById("chatroom-selection").onsubmit = changeChatRoom;
                document.getElementById("chatroom-message").onsubmit = sendMessage;
                document.getElementById("login-form").onsubmit = login;
            }
        </script>
    </div>
</body>
</html>

client.go

package main
 
import (
	"encoding/json"
	"github.com/gorilla/websocket"
	"log"
	"time"
)
 
var (
	pongWait     = 10 * time.Second    //發送ping後pong的最多等待時間
	pingInterval = (pongWait * 9) / 10 //ping每次發送的煎個,如果滿足條件,該職必須低於Pong wait
)
 
// ClientList 客戶端列表,上線狀態
type ClientList map[*Client]bool
 
// Client 處理所有單用戶相關的內容
type Client struct {
	connection *websocket.Conn
	manager    *Manager
	chatroom   string
	//egress 避免客戶端併發權限,使用一個無緩衝的通道來防止連接同時獲得過多的請求
	//egress chan []byte
	egress chan Event
}
 
func NewClient(conn *websocket.Conn, manager *Manager) *Client {
	return &Client{
		connection: conn,
		//之所以會使用manager是因為會將一些事情引導到manager進行處理,例如像其他用戶廣播
		manager: manager,
		egress:  make(chan Event),
	}
}
 
func (c *Client) readMessages() {
	//這邊使用defer是因為跳出for迴圈後再執行
	defer func() {
		//cleanup connection from client List,幫助我們清理未使用的客戶端
		c.manager.removeClient(c)
	}()
 
	//Jumbo frames
	c.connection.SetReadLimit(512)
 
	//當我們接受到pong以前能夠等待的時間
	if err := c.connection.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
		log.Println(err)
		return
	}
	//觸發pong時處理的handler,每收到pong就會觸發
	c.connection.SetPongHandler(c.pongHandler)
	for {
		//payload為負載,類行為byte
		_, payload, err := c.connection.ReadMessage()
		//messageType在RFC中定義有幾種不同的消息類型讓你對數據,二進制進行ping/pong
		if err != nil {
			//連接意外關閉返回錯誤
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				//檢查異常是因為不希望正常斷開的時候也被當成錯誤紀錄
				log.Printf("error reading message %v", err)
				break
			}
		}
 
		//測試寫入egress,每read一個message都會發送給其他所有客戶
		//for wsclient := range c.manager.clients {
		//	wsclient.egress <- payload
		//}
		//
		//log.Println(messageType)
		//log.Println(string(payload)) ->使用Event取代
		var request Event
		if err := json.Unmarshal(payload, &request); err != nil {
			log.Printf("error unmarshalling :%v", err)
			break
		}
		if err := c.manager.routeEvent(request, c); err != nil {
			log.Println("error handling message", err)
		}
	}
}
 
func (c *Client) writeMessages() {
	defer func() {
		c.manager.removeClient(c)
	}()
	//計時器
	ticker := time.NewTicker(pingInterval)
	for {
		select {
		case message, ok := <-c.egress:
			if !ok {
				if err := c.connection.WriteMessage(websocket.CloseMessage, nil); err != nil {
					log.Println("connection closed :", err)
				}
				return //return 後會觸發破壞迴圈觸發defer
			}
 
			data, err := json.Marshal(message)
			if err != nil {
				log.Println(err)
				break
			}
 
			if err := c.connection.WriteMessage(websocket.TextMessage, data); err != nil {
				log.Printf("failed to send message %v:", err)
			}
			log.Println("message sent")
		case <-ticker.C:
			log.Println("ping")
			//Send ping to client
			//必須為指定類型,否則前端無法處理
			if err := c.connection.WriteMessage(websocket.PingMessage, []byte(``)); err != nil {
				log.Println("write message error", err)
				return
			}
			//ping給服務端後,前端要pong回應,因為RFC告訴我們ping和pong應該自動觸發
		}
	}
}
 
func (c *Client) pongHandler(pongMessage string) error {
	log.Println("pong")
	//接受到pong以後要重置的時間
	return c.connection.SetReadDeadline(time.Now().Add(pongWait))
}
 

event.go

 
package main
 
import (
	"encoding/json"
	"fmt"
	"time"
)
 
type Event struct {
	Type    string          `json:"type"`
	Payload json.RawMessage `json:"payload"`
	//使用RawMessage(原始格式)是因為我們希望使用者可以發送任何類型
}
 
type EventHandler func(event Event, c *Client) error
 
const (
	EventSendMessage = "send_message"
	EventNewMessage  = "new_message"
	EventChangeRoom  = "change_room"
)
 
type SendMessageEvent struct {
	Message string `json:"message"`
	From    string `json:"from"`
}
 
type NewMessageEvent struct {
	SendMessageEvent
	Sent time.Time `json:"sent"`
}
type ChangeRoomEvent struct {
	Name string `json:"name"`
}
 
func ChangeRoomHandler(event Event, c *Client) error {
	var chatRoomEvent ChangeRoomEvent
	if err := json.Unmarshal(event.Payload, &chatRoomEvent); err != nil {
		return fmt.Errorf("bad payload in request : %v", err)
	}
	c.chatroom = chatRoomEvent.Name
	return nil
}
func SendMessage(event Event, c *Client) error {
	//fmt.Println(event)
	var chatevent SendMessageEvent
	if err := json.Unmarshal(event.Payload, &chatevent); err != nil {
		return fmt.Errorf("bad payload in request : %v", err)
	}
	var broadMessage NewMessageEvent
	broadMessage.Sent = time.Now()
	broadMessage.Message = chatevent.Message
	broadMessage.From = chatevent.From
 
	data, err := json.Marshal(broadMessage)
	if err != nil {
		return fmt.Errorf("failed to marshal broadcast message : %v", err)
	}
	var outgoing Event
	outgoing.Type = EventNewMessage
	outgoing.Payload = data
	for clients := range c.manager.clients {
		if clients.chatroom == c.chatroom {
			clients.egress <- outgoing
		}
	}
	return nil
}
 
//將這些儲存在管理器,確保管理器方便處理
 

manager.go

// http status 101 代表正在切換協議
package main
 
// manager.go // 管理websocket
import (
	"context"
	"encoding/json"
	"errors"
	"github.com/gorilla/websocket"
	"log"
	"net/http"
	"sync"
	"time"
)
 
// 管理websocket
var (
	websocketUpgrade = websocket.Upgrader{
		//cross
		CheckOrigin: checkOrigin,
		//確保用戶不會發送巨大封包
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
	}
)
 
type Manager struct {
	clients ClientList
	//會有很多人同時連接到API,使用互斥鎖保護
	sync.RWMutex
 
	otps RetentionMap
 
	//將type當作key並允許我們獲取事件處理程序
	handlers map[string]EventHandler
}
 
// 工廠模式
func NewManager(ctx context.Context) *Manager {
	m := &Manager{
		clients:  make(ClientList),
		otps:     NewRetentionMap(ctx, 5*time.Second),
		handlers: make(map[string]EventHandler),
	}
	m.setupEventHandlers()
	return m
}
 
func (m *Manager) setupEventHandlers() {
	m.handlers[EventSendMessage] = SendMessage
	m.handlers[EventChangeRoom] = ChangeRoomHandler
}
 
func (m *Manager) routeEvent(event Event, c *Client) error {
	//檢查事件類型是否是處理程序的一部分,處理程序是一個使用事件類型作為key的map
	//因此每當我們收到類型設置為發送消息,都會觸發發送消息
	if handler, ok := m.handlers[event.Type]; ok {
		if err := handler(event, c); err != nil {
			return err
		}
		return nil
	} else {
		return errors.New("there is no such event type")
	}
}
 
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
	//驗證OTP是否有效
	otp := r.URL.Query().Get("otp")
	if otp == "" {
		w.WriteHeader(http.StatusUnauthorized)
		return
	}
	if !m.otps.VerifyOTP(otp) {
		w.WriteHeader(http.StatusUnauthorized)
		return
	}
	log.Println("new connection")
	//upgrade regular http connection into websocket
	conn, err := websocketUpgrade.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := NewClient(conn, m)
	m.addClient(client)
	// Start client processing
	go client.readMessages()
	go client.writeMessages()
}
 
func (m *Manager) loginHandler(w http.ResponseWriter, r *http.Request) {
	type userLoginRequest struct {
		Username string `json:"username"`
		Password string `json:"password"`
	}
	var req userLoginRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	//form
	if req.Username == "test" && req.Password == "test" {
		type Response struct {
			OTP string `json:"otp"`
		}
		otp := m.otps.NewOTP()
		resp := Response{
			OTP: otp.Key,
		}
		data, err := json.Marshal(resp)
		if err != nil {
			log.Println(err)
			return
		}
		w.WriteHeader(http.StatusOK)
		w.Write(data)
		return
	}
	w.WriteHeader(http.StatusUnauthorized)
}
 
// 向管理起添加或刪除客戶端
func (m *Manager) addClient(client *Client) {
	m.Lock()
	//鎖上後當兩個人同時連接就不會同時修該Client list,Client list本質為map
	defer m.Unlock()
	m.clients[client] = true
}
 
func (m *Manager) removeClient(client *Client) {
	m.Lock()
	defer m.Unlock()
	if _, ok := m.clients[client]; ok {
		client.connection.Close()
		delete(m.clients, client)
	}
}
 
// Cross Origin
func checkOrigin(r *http.Request) bool {
	//true將會連接,false關閉連接
	origin := r.Header.Get("Origin")
	switch origin {
	case "http://localhost:8082":
		return true
	default:
		return false
	}
}
 

otp.go

package main
 
import (
	"context"
	"github.com/google/uuid"
	"time"
)
 
type OTP struct {
	Key     string    `json:"key"`
	Created time.Time `json:"created"`
}
 
// 這個map將包含一次性密碼,刪除太舊的一次性密碼
type RetentionMap map[string]OTP
 
// 接受一個上下文和保留時間
func NewRetentionMap(ctx context.Context, retentiontime time.Duration) RetentionMap {
	rm := make(RetentionMap)
	go rm.Retention(ctx, retentiontime) //開一個協程不斷檢查
	return rm
}
 
func (rm RetentionMap) NewOTP() OTP {
	o := OTP{
		Key:     uuid.NewString(),
		Created: time.Now(),
	}
	rm[o.Key] = o
	return o
}
 
func (rm RetentionMap) VerifyOTP(otp string) bool {
	if _, ok := rm[otp]; !ok {
		return false // otp is not valid
	}
	delete(rm, otp) //刪除一次性密碼
	return true
}
 
func (rm RetentionMap) Retention(ctx context.Context, retentionPeriod time.Duration) {
	//每次重新檢查的頻率
	ticker := time.NewTicker(400 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
			for _, otp := range rm {
				//過期時間比現在早,這密碼無效
				if otp.Created.Add(retentionPeriod).Before(time.Now()) {
					delete(rm, otp.Key)
				}
			}
			//上下文關閉
		case <-ctx.Done():
			return
		}
	}
}
 

main.go

package main
 
import (
	"context"
	"log"
	"net/http"
)
 
func main() {
	setupApi()
	log.Fatalln(http.ListenAndServe(":8082", nil))
}
 
func setupApi() {
	ctx := context.Background()
	manger := NewManager(ctx)
	http.Handle("/", http.FileServer(http.Dir("./front"))) //加載前端
	http.HandleFunc("/ws", manger.serveWs)
	http.HandleFunc("/login", manger.loginHandler)
}
 
//go run *.go 運行所有文件