Transactional Message Queue: A Lightweight Solution Based on SQLite

What is a Message Queue?

A message queue is an asynchronous communication mechanism used to pass messages in distributed systems. It works like a "post office" where senders put messages into the queue and receivers take them out for processing. This pattern offers several benefits:

  1. Decoupling: Producers and consumers don't need to communicate directly
  2. Peak Shaving: Can buffer sudden traffic spikes
  3. Asynchrony: Improves system response time
  4. Reliability: Message persistence ensures no data loss

What is a Transactional Message Queue?

The core of a transactional message queue is to ensure atomicity of message publishing. In traditional message queues, if a transaction rolls back, or if the program crashes after transaction completion but before message sending, messages may be lost. Transactional message queues solve these problems through:

  1. Atomicity: Message sending and business operations are in the same transaction - they either both succeed or both fail
  2. Rollback Mechanism: If the transaction fails, messages are automatically rolled back and won't be sent
  3. Durability: Once committed, messages are permanently saved, even if the program crashes

Practical Example: Implementing User Behavior Analysis with NSQite

Let's consider a content platform that needs to record user behavior data (views, likes, favorites, etc.) for later analysis. This behavior data has the following characteristics:

  1. Large volume but not time-critical
  2. Can be retried if processing fails
  3. Doesn't affect core business processes

Using a message queue, we can:

  1. Return success immediately when user behavior occurs
  2. Put behavior data into the message queue
  3. Process data asynchronously (data cleaning, statistical analysis, etc.)

Here's a code example using nsqite:

// Define message handler
type UserActionHandler struct{}

func (h *UserActionHandler) HandleMessage(message *nsqite.Message) error {
    var action struct {
        UserID    string `json:"user_id"`
        Action    string `json:"action"`
        ContentID string `json:"content_id"`
        Timestamp string `json:"timestamp"`
    }
    if err := json.Unmarshal(message.Body, &action); err != nil {
        return err
    }
    // Data cleaning and statistical analysis
    return analyzeUserAction(action)
}

func main() {
    db, err := gorm.Open(sqlite.Open("user_actions.db"), &gorm.Config{})
    if err != nil {
        log.Fatal(err)
    }
    // Set GORM database connection
    nsqite.SetGorm(db)

    const topic = "user_actions"
    // Create producer
    p := nsqite.NewProducer()
    // Create consumer with max retry attempts of 5
    c := nsqite.NewConsumer(topic, "consumer1", nsqite.WithConsumerMaxAttempts(5))
    // Add 5 concurrent handlers
    c.AddConcurrentHandlers(&UserActionHandler{}, 5)

    // Publish message within transaction
    db.Transaction(func(tx *gorm.DB) error {
        // Business operation
        if err := doSomeBusiness(tx); err != nil {
            return err
        }
        // Publish message
        action := map[string]interface{}{
            "user_id":    "123",
            "action":     "view",
            "content_id": "456",
            "timestamp":  time.Now().Format(time.RFC3339),
        }
        body, _ := json.Marshal(action)
        return p.PublishTx(tx, topic, body)
    })


    time.Sleep(5*time.Second)
}

Why Choose NSQite?

NSQite is a message queue implementation based on SQLite with the following features:

  1. Lightweight: Based on SQLite, no additional dependencies
  2. High Performance: Can process millions of messages per second in single-machine environment
  3. Easy Integration: Supports GORM, seamless integration with existing projects
  4. Reliability: Supports transactional messages, ensuring data consistency

Use Cases:

  1. Project Initialization: No need for complex message queue systems
  2. Existing SQLite Projects: No additional dependencies required
  3. Single-Machine Applications: Maintain simplicity, avoid distributed complexity
  4. Resource-Constrained Environments: SQLite's lightweight nature

Project Address

NSQite is an open-source project available on GitHub:
https://github.com/ixugo/nsqite

If you find this project helpful, you can:

  1. Star the project
  2. Submit issues for feedback
  3. Contribute code through PRs