gomog/manual/TCP_PROTOCOL.md

14 KiB
Raw Permalink Blame History

Gomog TCP 协议参考文档

版本: v1.0.0-alpha
最后更新: 2026-03-14
协议: MongoDB Wire Protocol (兼容)


📖 目录

  1. 概述
  2. 消息格式
  3. 操作码
  4. OP_MSG 协议
  5. 使用示例
  6. 客户端开发
  7. 错误处理

概述

什么是 TCP 协议?

Gomog 的 TCP 协议实现了 MongoDB Wire Protocol 的兼容层,允许任何 MongoDB 客户端直接连接到 Gomog 服务器,无需修改代码。

协议特点

  • MongoDB 兼容: 支持标准 MongoDB 线协议
  • 双向通信: 基于 TCP 的长连接
  • 高效传输: 二进制协议,低开销
  • 并发安全: 支持多客户端并发连接
  • 会话管理: 自动管理连接状态

默认配置

配置项 默认值 说明
监听地址 :27017 MongoDB 默认端口
最大连接数 无限制 可配置
读超时 可配置
写超时 可配置

消息格式

消息头Message Header

所有 MongoDB Wire Protocol 消息都以 16 字节的头部开始:

 0                   4                   8           12  13 14 15
+-------------------+-------------------+-----------+--+--+--+--+
|        Length     |    RequestID      |ResponseTo |Op|Op|Op|Op|
+-------------------+-------------------+-----------+--+--+--+--+
字段 大小 类型 说明
Length 4 bytes int32 消息总长度(包括头部)
RequestID 4 bytes int32 请求 ID由客户端生成
ResponseTo 4 bytes int32 响应到的请求 ID服务器响应时使用
OpCode 4 bytes int32 操作码

字节序

所有多字节字段都使用小端字节序Little-Endian


操作码

支持的操作码

操作码 名称 说明
1 OP_REPLY 服务器响应
4 OP_UPDATE 更新文档
8 OP_INSERT 插入文档
2004 OP_QUERY 查询文档
2006 OP_GETMORE 获取更多结果
2007 OP_DELETE 删除文档
2013 OP_MSG 扩展协议消息(主要使用)

OP_MSG推荐

OP_MSG 是 MongoDB 3.6+ 引入的扩展协议,支持所有数据库操作。


OP_MSG 协议

消息格式

 0               4                             16
+---------------+------------------------------+
|    OpCode     |  Flags                       |
+---------------+------------------------------+
|  Checksum    |  Sections...                  |
+--------------+-------------------------------+

标志位Flags

标志 说明
checksumPresent 0x00000001 存在校验和
moreToCome 0x00000002 更多消息到来

Section 格式

每个 Section 包含:

+-------------+------------------+
| Kind (1B)   | Data...          |
+-------------+------------------+

Body Section (Kind = 0)

包含主要的命令文档。

Document Sequence Section (Kind = 1)

包含文档序列。

常用命令

insert 命令

{
  "insert": "collection_name",
  "documents": [
    {"_id": 1, "name": "Alice"},
    {"_id": 2, "name": "Bob"}
  ],
  "ordered": true
}

find 命令

{
  "find": "collection_name",
  "filter": {"age": {"$gt": 25}},
  "projection": {"name": 1},
  "limit": 10
}

update 命令

{
  "update": "collection_name",
  "updates": [
    {
      "q": {"name": "Alice"},
      "u": {"$set": {"age": 31}}
    }
  ]
}

delete 命令

{
  "delete": "collection_name",
  "deletes": [
    {
      "q": {"name": "Bob"},
      "limit": 1
    }
  ]
}

aggregate 命令

{
  "aggregate": "collection_name",
  "pipeline": [
    {"$match": {"status": "active"}},
    {"$group": {"_id": "$category", "count": {"$sum": 1}}}
  ]
}

使用示例

使用 MongoDB Shell

# 连接到 Gomog
mongosh --host localhost --port 27017

# 切换到数据库
use testdb

# 插入文档
db.users.insertOne({name: "Alice", age: 30})

# 批量插入
db.users.insertMany([
  {name: "Bob", age: 25},
  {name: "Charlie", age: 35}
])

# 查询
db.users.find({age: {$gt: 28}})

# 聚合
db.orders.aggregate([
  {$match: {status: "completed"}},
  {$group: {_id: "$customerId", total: {$sum: "$amount"}}}
])

# 更新
db.users.updateOne(
  {name: "Alice"},
  {$set: {age: 31}}
)

# 删除
db.users.deleteOne({name: "Bob"})

使用 Node.js MongoDB Driver

const { MongoClient } = require('mongodb');

async function main() {
  // 连接到 Gomog
  const client = new MongoClient('mongodb://localhost:27017');
  
  try {
    await client.connect();
    console.log('Connected to Gomog!');
    
    const db = client.db('testdb');
    const users = db.collection('users');
    
    // 插入
    await users.insertOne({name: 'Alice', age: 30});
    
    // 查询
    const result = await users.findOne({name: 'Alice'});
    console.log(result);
    
    // 更新
    await users.updateOne(
      {name: 'Alice'},
      {$set: {age: 31}}
    );
    
    // 聚合
    const aggResult = await users.aggregate([
      {$group: {_id: null, avgAge: {$avg: '$age'}}}
    ]).toArray();
    console.log(aggResult);
    
  } finally {
    await client.close();
  }
}

main().catch(console.error);

使用 Python PyMongo

from pymongo import MongoClient

# 连接到 Gomog
client = MongoClient('mongodb://localhost:27017/')
db = client.testdb
users = db.users

# 插入
user = {"name": "Alice", "age": 30}
result = users.insert_one(user)
print(f"Inserted ID: {result.inserted_id}")

# 批量插入
users.insert_many([
    {"name": "Bob", "age": 25},
    {"name": "Charlie", "age": 35}
])

# 查询
for user in users.find({"age": {"$gt": 28}}):
    print(user)

# 更新
result = users.update_one(
    {"name": "Alice"},
    {"$set": {"age": 31}}
)
print(f"Modified: {result.modified_count}")

# 聚合
pipeline = [
    {"$group": {"_id": None, "avgAge": {"$avg": "$age"}}}
]
result = list(users.aggregate(pipeline))
print(result)

使用 Go MongoDB Driver

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // 连接到 Gomog
    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
    if err != nil {
        log.Fatal(err)
    }
    defer client.Disconnect(ctx)
    
    db := client.Database("testdb")
    users := db.Collection("users")
    
    // 插入
    _, err = users.InsertOne(ctx, bson.M{"name": "Alice", "age": 30})
    if err != nil {
        log.Fatal(err)
    }
    
    // 查询
    var result bson.M
    err = users.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Found: %+v\n", result)
    
    // 更新
    _, err = users.UpdateOne(
        ctx,
        bson.M{"name": "Alice"},
        bson.M{"$set": bson.M{"age": 31}},
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // 聚合
    pipeline := mongo.Pipeline{
        {{"$group", bson.D{{"_id", nil}, {"avgAge", bson.D{{"$avg", "$age"}}}}}},
    }
    cursor, err := users.Aggregate(ctx, pipeline)
    if err != nil {
        log.Fatal(err)
    }
    defer cursor.Close(ctx)
    
    var results []bson.M
    if err = cursor.All(ctx, &results); err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Aggregation: %+v\n", results)
}

客户端开发

实现 OP_MSG 解析

package tcp

import (
    "encoding/binary"
    "io"
)

// ReadMessageHeader 读取消息头
func ReadMessageHeader(r io.Reader) (*MessageHeader, error) {
    header := &MessageHeader{}
    
    // 读取 Length
    if err := binary.Read(r, binary.LittleEndian, &header.Length); err != nil {
        return nil, err
    }
    
    // 读取 RequestID
    if err := binary.Read(r, binary.LittleEndian, &header.RequestID); err != nil {
        return nil, err
    }
    
    // 读取 ResponseTo
    if err := binary.Read(r, binary.LittleEndian, &header.ResponseTo); err != nil {
        return nil, err
    }
    
    // 读取 OpCode
    if err := binary.Read(r, binary.LittleEndian, &header.OpCode); err != nil {
        return nil, err
    }
    
    return header, nil
}

// WriteMessage 写入消息
func WriteMessage(w io.Writer, requestID uint32, responseTo uint32, opCode uint32, payload []byte) error {
    length := uint32(16 + len(payload))
    
    // 写入 Length
    if err := binary.Write(w, binary.LittleEndian, length); err != nil {
        return err
    }
    
    // 写入 RequestID
    if err := binary.Write(w, binary.LittleEndian, requestID); err != nil {
        return err
    }
    
    // 写入 ResponseTo
    if err := binary.Write(w, binary.LittleEndian, responseTo); err != nil {
        return err
    }
    
    // 写入 OpCode
    if err := binary.Write(w, binary.LittleEndian, opCode); err != nil {
        return err
    }
    
    // 写入 Payload
    if _, err := w.Write(payload); err != nil {
        return err
    }
    
    return nil
}

处理 OP_MSG

func handleOPMsg(header *MessageHeader, body []byte) ([]byte, error) {
    // 解析 Section
    kind := body[0]
    documentData := body[1:]
    
    // 解析 BSON 文档
    var doc bson.Raw
    if err := bson.Unmarshal(documentData, &doc); err != nil {
        return nil, err
    }
    
    // 获取命令
    command := doc.Lookup("insert").StringValue()
    if command == "" {
        command = doc.Lookup("find").StringValue()
    }
    
    // 执行相应操作
    switch {
    case doc.Lookup("insert").Valid():
        return handleInsert(doc)
    case doc.Lookup("find").Valid():
        return handleFind(doc)
    case doc.Lookup("update").Valid():
        return handleUpdate(doc)
    case doc.Lookup("delete").Valid():
        return handleDelete(doc)
    case doc.Lookup("aggregate").Valid():
        return handleAggregate(doc)
    default:
        return nil, errors.New("unknown command")
    }
}

错误处理

错误响应格式

{
  "ok": 0,
  "errmsg": "错误描述",
  "code": 错误码,
  "codeName": "错误名称"
}

常见错误

连接错误

E11000 duplicate key error
NoSuchKey: key not found
NamespaceNotFound: collection does not exist

语法错误

BadValue: invalid parameter type
FailedToParse: unable to parse query
Location40352: unsupported operator

错误处理示例

try {
  await db.users.insertOne({_id: 1, name: "Alice"});
  await db.users.insertOne({_id: 1, name: "Bob"}); // 重复键错误
} catch (error) {
  console.error(error.code);    // 11000
  console.error(error.message); // duplicate key error
}

性能优化

1. 使用批量操作

// ❌ 不推荐:逐条插入
for (let i = 0; i < 1000; i++) {
  await db.users.insertOne({id: i});
}

// ✅ 推荐:批量插入
const docs = Array.from({length: 1000}, (_, i) => ({id: i}));
await db.users.insertMany(docs);

2. 使用投影

// ❌ 不推荐:返回所有字段
db.users.find({status: "active"});

// ✅ 推荐:只返回需要的字段
db.users.find(
  {status: "active"},
  {projection: {name: 1, email: 1}}
);

3. 使用索引

// 为常用查询创建索引
db.users.createIndex({email: 1});
db.users.createIndex({status: 1, createdAt: -1});

4. 限制结果集

// 总是使用 limit
db.users.find({}).limit(100);

监控与调试

查看当前连接

// 在 Gomog 服务器日志中查看
tail -f /var/log/gomog/gomog.log | grep "connection"

慢查询分析

# 配置慢查询阈值
log:
  level: "debug"
  slow_query_threshold: "100ms"

启用详细日志

server:
  mode: "dev"  # 开发模式会输出详细日志

最佳实践

1. 连接池管理

const client = new MongoClient(uri, {
  maxPoolSize: 10,
  minPoolSize: 5,
  maxIdleTimeMS: 30000
});

2. 重试逻辑

const session = client.startSession();
await session.withTransaction(async () => {
  // 事务操作
}, {
  maxCommitTimeMS: 10000,
  readConcern: {level: 'local'},
  writeConcern: {w: 'majority'}
});

3. 超时设置

const cursor = db.collection.find({}, {
  timeout: true
}).maxTimeMS(5000);

附录

A. 数据类型映射

BSON 类型 JavaScript Go Python
String String string str
Int32 Number int32 int
Int64 BigInt int64 int
Double Number float64 float
Boolean Boolean bool bool
ObjectId ObjectId primitive.ObjectID ObjectId
Date Date time.Time datetime
Array Array []interface{} list
Object Object map[string]interface{} dict
Null null nil None

B. 保留字段

以下字段名在 Gomog 中有特殊含义:

  • _id: 文档唯一标识符
  • $*: 操作符前缀
  • $$: 系统变量前缀

C. 相关链接


维护者: Gomog Team
许可证: MIT