Hey there, Gophers! 👋

Ever found yourself needing a lightweight, in-memory queue in Go that's both powerful and easy to use? Well, that's exactly what we're going to explore today with GoFrame's gqueue component!

What's gqueue and Why Should You Care? 🤔

First off, let me paint a picture. You're building a service that needs to:

  • Process tasks asynchronously
  • Handle traffic spikes gracefully
  • Manage background jobs efficiently

This is where gqueue shines! It's an in-memory queue implementation that comes bundled with GoFrame, offering a perfect balance of simplicity and power.

Quick Start: Your First gqueue 🌱

Let's dive right in with a simple example:

package main

import (
    "github.com/gogf/gf/v2/container/gqueue"
    "fmt"
)

func main() {
    // Create a queue that can hold 10 items
    q := gqueue.New(10)

    // Push some data
    q.Push("Hello, gqueue!")

    // Get the data back
    if value := q.Pop(); value != nil {
        fmt.Printf("Got: %v\n", value)
    }
}

Simple, right? But wait, there's so much more we can do! 🎨

The Cool Features You'll Love ✨

Here's what makes gqueue stand out from regular channels or standard library queues:

  1. Thread-Safe by Default - No more mutex headaches!
  2. Flexible Capacity - Starts small, grows as needed
  3. Batch Operations - Process multiple items efficiently
  4. Timeout Support - Never get stuck waiting

Real-World Example: Building a Task Processor 🛠️

Let's build something more practical - a task processing system that you might actually use in production:

package main

import (
    "github.com/gogf/gf/v2/container/gqueue"
    "context"
    "fmt"
    "time"
)

type Task struct {
    ID      string
    Payload interface{}
}

func NewTaskProcessor() {
    // Create a queue with decent capacity
    q := gqueue.New(1000)

    // Start the worker
    go func() {
        for {
            // Try to get a task
            if data := q.Pop(); data != nil {
                if task, ok := data.(Task); ok {
                    // Process the task
                    fmt.Printf("Processing task: %s\n", task.ID)
                    // Your processing logic here
                    time.Sleep(100 * time.Millisecond)
                }
            }
        }
    }()

    // Add some tasks
    for i := 0; i < 5; i++ {
        q.Push(Task{
            ID:      fmt.Sprintf("task-%d", i),
            Payload: fmt.Sprintf("data-%d", i),
        })
    }
}

More Real-World Examples 🌟

Building a Rate Limiter

Here's how you can build a simple rate limiter using gqueue:

type RateLimiter struct {
    q       *gqueue.Queue
    rate    int
    window  time.Duration
}

func NewRateLimiter(rate int, window time.Duration) *RateLimiter {
    rl := &RateLimiter{
        q:      gqueue.New(rate * 2), // Buffer for bursts
        rate:   rate,
        window: window,
    }

    // Clean up old timestamps
    go rl.cleanup()
    return rl
}

func (rl *RateLimiter) Allow() bool {
    now := time.Now()

    // Remove expired timestamps
    for {
        if ts := rl.q.Pop(); ts != nil {
            if now.Sub(ts.(time.Time)) <= rl.window {
                // Put it back if still within window
                rl.q.Push(ts)
                break
            }
        } else {
            break
        }
    }

    // Check if we can add new request
    if rl.q.Len() < int64(rl.rate) {
        rl.q.Push(now)
        return true
    }
    return false
}

func (rl *RateLimiter) cleanup() {
    ticker := time.NewTicker(rl.window / 2)
    for range ticker.C {
        now := time.Now()
        for {
            if ts := rl.q.Pop(); ts != nil {
                if now.Sub(ts.(time.Time)) <= rl.window {
                    rl.q.Push(ts)
                    break
                }
            } else {
                break
            }
        }
    }
}

Event Processing Pipeline

Here's an example of building an event processing pipeline with retry logic:

type Event struct {
    ID        string
    Data      interface{}
    Attempts  int
    LastError error
}

type Pipeline struct {
    mainQueue   *gqueue.Queue
    retryQueue  *gqueue.Queue
    maxAttempts int
}

func NewPipeline(maxAttempts int) *Pipeline {
    p := &Pipeline{
        mainQueue:   gqueue.New(1000),
        retryQueue:  gqueue.New(1000),
        maxAttempts: maxAttempts,
    }

    // Start retry handler
    go p.handleRetries()
    // Start main processor
    go p.processEvents()

    return p
}

func (p *Pipeline) handleRetries() {
    ticker := time.NewTicker(5 * time.Second)
    for range ticker.C {
        if event := p.retryQueue.Pop(); event != nil {
            e := event.(Event)
            // Exponential backoff
            if time.Since(e.LastAttempt) > time.Second*time.Duration(1<<e.Attempts) {
                p.mainQueue.Push(e)
            } else {
                p.retryQueue.Push(e)
            }
        }
    }
}

func (p *Pipeline) processEvents() {
    for {
        if data := p.mainQueue.Pop(); data != nil {
            event := data.(Event)
            if err := p.processEvent(event); err != nil {
                event.Attempts++
                event.LastError = err
                event.LastAttempt = time.Now()

                if event.Attempts < p.maxAttempts {
                    p.retryQueue.Push(event)
                } else {
                    // Handle fatal error
                    p.handleFatalError(event)
                }
            }
        }
    }
}

Batch Processing with Timeouts

Here's a more sophisticated batch processing implementation:

type BatchProcessor struct {
    q          *gqueue.Queue
    batchSize  int
    timeout    time.Duration
    processor  func([]interface{}) error
}

func NewBatchProcessor(batchSize int, timeout time.Duration, processor func([]interface{}) error) *BatchProcessor {
    return &BatchProcessor{
        q:         gqueue.New(batchSize * 10),
        batchSize: batchSize,
        timeout:   timeout,
        processor: processor,
    }
}

func (bp *BatchProcessor) Start(ctx context.Context) {
    batch := make([]interface{}, 0, bp.batchSize)
    timer := time.NewTimer(bp.timeout)

    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                bp.processor(batch)
            }
            return

        case <-timer.C:
            if len(batch) > 0 {
                bp.processor(batch)
                batch = make([]interface{}, 0, bp.batchSize)
            }
            timer.Reset(bp.timeout)

        default:
            if item := bp.q.Pop(); item != nil {
                batch = append(batch, item)
                if len(batch) >= bp.batchSize {
                    bp.processor(batch)
                    batch = make([]interface{}, 0, bp.batchSize)
                    timer.Reset(bp.timeout)
                }
            }
        }
    }
}

Pro Tips from the Trenches 💡

After using gqueue in several production systems, here are some tips I've learned:

1. Size Your Queue Right

// For small services (< 1000 req/s)
q := gqueue.New(1000)

// For medium services (1000-5000 req/s)
q := gqueue.New(5000)

// For high-load services (5000+ req/s)
q := gqueue.New(10000)

2. Implement Graceful Shutdown

func main() {
    q := gqueue.New(1000)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start worker
    go func() {
        for {
            select {
            case <-ctx.Done():
                // Clean up and exit
                return
            default:
                if data := q.Pop(); data != nil {
                    // Process data
                }
            }
        }
    }()
}

3. Handle Backpressure

func pushWithBackpressure(q *gqueue.Queue, data interface{}) error {
    if q.Len() > int64(q.Cap()*0.8) {
        // Queue is getting full, take action
        return fmt.Errorf("queue is at high capacity")
    }
    q.Push(data)
    return nil
}

Troubleshooting Guide and Common Pitfalls ⚠️

1. Memory Leaks

Symptom: Increasing memory usage over time
Common Causes:

  • Forgotten goroutines still processing queue items
  • Large items not being released from memory

Solution:

func preventMemoryLeaks() {
    q := gqueue.New(1000)

    // Proper cleanup with context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        for {
            select {
            case <-ctx.Done():
                // Clean up remaining items
                for q.Len() > 0 {
                    _ = q.Pop()
                }
                return
            default:
                if item := q.Pop(); item != nil {
                    // Process item
                    item = nil // Help GC
                }
            }
        }
    }()
}

2. Queue Capacity Issues

Symptom: Application becomes slow or unresponsive
Common Causes:

  • Queue filling up faster than it's being processed
  • No backpressure mechanism

Solution:

func handleBackpressure() {
    q := gqueue.New(1000)

    // Monitor queue capacity
    go func() {
        ticker := time.NewTicker(time.Second)
        for range ticker.C {
            if q.Len() > int64(float64(q.Cap())*0.8) {
                // Alert on high usage
                log.Printf("Queue at %d%% capacity", 
                    int(float64(q.Len())/float64(q.Cap())*100))

                // Take action (e.g., slow down producers)
                throttleProducers()
            }
        }
    }()
}

func throttleProducers() {
    // Implement throttling logic
}

3. Deadlocks and Hanging

Symptom: Workers stop processing
Common Causes:

  • Infinite loops in processing logic
  • Missing error handling

Solution:

func preventDeadlocks() {
    q := gqueue.New(1000)

    // Use timeouts
    go func() {
        for {
            done := make(chan bool)
            go func() {
                if item := q.Pop(); item != nil {
                    processWithTimeout(item)
                }
                done <- true
            }()

            // Timeout if processing takes too long
            select {
            case <-done:
                // Processing completed normally
            case <-time.After(5 * time.Second):
                // Handle timeout
                log.Println("Processing timeout")
            }
        }
    }()
}

func processWithTimeout(item interface{}) {
    // Your processing logic here
}

4. Data Loss During Shutdown

Symptom: Items disappear when application stops
Common Causes:

  • Improper shutdown handling
  • Not waiting for queue to empty

Solution:

func gracefulShutdown() {
    q := gqueue.New(1000)
    ctx, cancel := context.WithCancel(context.Background())

    // Graceful shutdown handler
    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
        <-sigChan

        // Cancel context to stop new items
        cancel()

        // Wait for queue to empty
        for q.Len() > 0 {
            time.Sleep(100 * time.Millisecond)
        }

        // Now safe to exit
        os.Exit(0)
    }()
}

5. Performance Degradation

Symptom: Processing becomes slower over time
Common Causes:

  • Too many goroutines
  • Inefficient batch processing
  • Memory pressure

Solution:

func optimizePerformance() {
    q := gqueue.New(1000)

    // Use worker pool
    const workerCount = 5
    sem := make(chan struct{}, workerCount)

    go func() {
        for {
            sem <- struct{}{} // Limit concurrent workers
            go func() {
                defer func() { <-sem }()

                if item := q.Pop(); item != nil {
                    // Process with proper error handling
                    if err := processItem(item); err != nil {
                        log.Printf("Error processing item: %v", err)
                    }
                }
            }()
        }
    }()
}

Performance Tips 🚄

Here's a quick batch processing pattern that can significantly boost performance:

func batchProcess(q *gqueue.Queue, batchSize int) {
    batch := make([]interface{}, 0, batchSize)

    // Collect items
    for i := 0; i < batchSize; i++ {
        if item := q.Pop(); item != nil {
            batch = append(batch, item)
        }
    }

    // Process batch
    if len(batch) > 0 {
        processBatch(batch)
    }
}

When to Use gqueue (And When Not To) 🤔

Perfect for:

  • ✅ Background task processing
  • ✅ Message buffering
  • ✅ Event handling
  • ✅ Rate limiting

Maybe not for:

  • ❌ Persistent storage needs
  • ❌ Distributed systems (use Kafka/RabbitMQ instead)
  • ❌ Critical transaction processing

Let's Wrap It Up! 🎁

gqueue is a fantastic tool when you need a lightweight, in-memory queue in Go. It's perfect for those scenarios where a full-blown message queue system would be overkill, but plain channels aren't quite enough.

Your Turn! 🎯

Now I'd love to hear from you:

  • Have you used gqueue in your projects?
  • What other queue implementations have you tried in Go?
  • Any cool patterns or tips to share?

Drop your thoughts in the comments below! And if you found this useful, don't forget to give it a ❤️


P.S. If you want to dive deeper into GoFrame, check out my other articles in the series! 📚