To write a TCP server that handles Protocol Buffers data transmitted over a pure TCP stream. The network framework was chosen early on, using the high-performance gnet, but the problem is that there is no direct codec for parsing pure Protocol Buffers in gnet's example library, so I had to do it myself...
Protocol Analysis#
The data coming from the TCP stream is processed Protocol Buffers data, which carries the length information of the packet in the header, like this:
[ Header ][ Data ][ Header ][ Data ][ Header ][ Data ][ Header ][ Data ][ Header ][ Data ]
By calling the func DecodeVarint(b []byte) (uint64, int)
method from the official golang proto library, we can obtain two values from the data: the complete length of the data and the length of the header information indicating the data length.
Since there is no specific protocol for clear separation between packets, we have to use the header data for packet segmentation.
Decoder#
// Store relevant information within the connection
type DataStruct struct {
fullLength int
lenNumLength int
fullData []byte
}
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// Retrieve the codec storage struct for this connection from the context
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// Read all information from the buffer
bytes := c.Read()
// Check if we have started reading a packet
if len(r.fullData) == 0 {
// Call the function to get the information from the header
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
// Get the length of the data already stored in the struct
fullDataLong := len(r.fullData)
// Append all read data to fullData
r.fullData = append(r.fullData, bytes...)
// Check if the length meets the requirements
if len(r.fullData) >= r.fullLength+r.lenNumLength {
c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)
// Extract valid data
res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]
// Clear the connection's buffer
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
// Move the read pointer
c.ShiftN(len(bytes))
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
The above decoding method is currently running without issues, while the following method is more memory-efficient. The main difference between the two decoding methods lies in the Read function called; the former reads all content from gnet's ring buffer, while the latter first reads the header, retrieves the complete data length information, and then calls the ReadN function to accurately extract the packet body.
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
ctx, ok := c.Context().(context.Context)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
// Retrieve the codec storage struct for this connection from the context
r, ok := ctx.Value("codec").(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(ctx)
return nil, nil
}
In the code, you can also see that the length information of the packet body in the header is stored in the connection's context, so when gnet triggers the connection opened event, we need to put the storage struct into the context.
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
ctx := context.WithValue(context.Background(), "codec", DataStruct{})
c.SetContext(ctx)
return
}
Encoder#
The encoder part is very simple; we can directly call the EncodeVarint
function in the proto library to generate the header of the packet body, placing the header information at the front of the packet body to send the data to the client.
func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
return buf, nil
}
Update on 2021-11-09#
I overlooked that the previous method of storing intermediate information in the context had serious performance issues. When calling the native golang context.WithValue
method, it creates a child context under the passed context, which leads to an increasingly large context tree with each decoding, and each layer of context stores the DataStruct
for this decoding, causing memory leak issues.
After struggling for several days and fixing several potential memory leak hazards, I realized this (bald.jpg).
Then I looked at the implementation of the Context()
method in gnet.Conn
and found that it simply stores what we pass into a map, without needing to use context
related features. So a simple solution is to directly pass in DataStruct
, which seems to have resolved the memory leak issue. The code is as follows:
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// Retrieve the codec storage struct for this connection from the context
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, nil
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
var r = DataStruct{}
c.SetContext(r)
return
}
Update on 2021-12-24#
Recently, gnet released a new version v1.6.x, and the behavior of the codec has changed, so the code needs to be modified.
The main change is in the eventloop_unix.go file of the gnet library, where the commit d1ca7f3 changed the entry point into React from when the returned packet is not nil to when the returned error is not nil, so corresponding modifications need to be made after the upgrade.
var (
ContinueRead = errors.New("continue read")
)
func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
// Retrieve the codec storage struct for this connection from the context
r, ok := c.Context().(DataStruct)
if !ok {
err := c.Close()
if err != nil {
return nil, nil
}
}
if len(r.fullData) == 0 {
_, bytes := c.ReadN(10)
var fullLength uint64
fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
r.fullLength = int(fullLength)
fmt.Println(r.fullLength, r.lenNumLength)
if r.fullLength == 0 {
return nil, ContinueRead
}
}
fullDataLong := len(r.fullData)
n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
r.fullData = append(r.fullData, bytes...)
c.ShiftN(n)
if len(r.fullData) >= r.fullLength+r.lenNumLength {
res := r.fullData[r.lenNumLength :]
r.fullData = []byte{}
c.SetContext(r)
return res, nil
}
ctx = context.WithValue(ctx, "codec", r)
c.SetContext(r)
return nil, ContinueRead
}