Hey there, Gophers! 👋
Ever found yourself needing a lightweight, in-memory queue in Go that's both powerful and easy to use? Well, that's exactly what we're going to explore today with GoFrame's gqueue
component!
What's gqueue and Why Should You Care? 🤔
First off, let me paint a picture. You're building a service that needs to:
- Process tasks asynchronously
- Handle traffic spikes gracefully
- Manage background jobs efficiently
This is where gqueue
shines! It's an in-memory queue implementation that comes bundled with GoFrame, offering a perfect balance of simplicity and power.
Quick Start: Your First gqueue 🌱
Let's dive right in with a simple example:
package main
import (
"github.com/gogf/gf/v2/container/gqueue"
"fmt"
)
func main() {
// Create a queue that can hold 10 items
q := gqueue.New(10)
// Push some data
q.Push("Hello, gqueue!")
// Get the data back
if value := q.Pop(); value != nil {
fmt.Printf("Got: %v\n", value)
}
}
Simple, right? But wait, there's so much more we can do! 🎨
The Cool Features You'll Love ✨
Here's what makes gqueue
stand out from regular channels or standard library queues:
- Thread-Safe by Default - No more mutex headaches!
- Flexible Capacity - Starts small, grows as needed
- Batch Operations - Process multiple items efficiently
- Timeout Support - Never get stuck waiting
Real-World Example: Building a Task Processor 🛠️
Let's build something more practical - a task processing system that you might actually use in production:
package main
import (
"github.com/gogf/gf/v2/container/gqueue"
"context"
"fmt"
"time"
)
type Task struct {
ID string
Payload interface{}
}
func NewTaskProcessor() {
// Create a queue with decent capacity
q := gqueue.New(1000)
// Start the worker
go func() {
for {
// Try to get a task
if data := q.Pop(); data != nil {
if task, ok := data.(Task); ok {
// Process the task
fmt.Printf("Processing task: %s\n", task.ID)
// Your processing logic here
time.Sleep(100 * time.Millisecond)
}
}
}
}()
// Add some tasks
for i := 0; i < 5; i++ {
q.Push(Task{
ID: fmt.Sprintf("task-%d", i),
Payload: fmt.Sprintf("data-%d", i),
})
}
}
More Real-World Examples 🌟
Building a Rate Limiter
Here's how you can build a simple rate limiter using gqueue
:
type RateLimiter struct {
q *gqueue.Queue
rate int
window time.Duration
}
func NewRateLimiter(rate int, window time.Duration) *RateLimiter {
rl := &RateLimiter{
q: gqueue.New(rate * 2), // Buffer for bursts
rate: rate,
window: window,
}
// Clean up old timestamps
go rl.cleanup()
return rl
}
func (rl *RateLimiter) Allow() bool {
now := time.Now()
// Remove expired timestamps
for {
if ts := rl.q.Pop(); ts != nil {
if now.Sub(ts.(time.Time)) <= rl.window {
// Put it back if still within window
rl.q.Push(ts)
break
}
} else {
break
}
}
// Check if we can add new request
if rl.q.Len() < int64(rl.rate) {
rl.q.Push(now)
return true
}
return false
}
func (rl *RateLimiter) cleanup() {
ticker := time.NewTicker(rl.window / 2)
for range ticker.C {
now := time.Now()
for {
if ts := rl.q.Pop(); ts != nil {
if now.Sub(ts.(time.Time)) <= rl.window {
rl.q.Push(ts)
break
}
} else {
break
}
}
}
}
Event Processing Pipeline
Here's an example of building an event processing pipeline with retry logic:
type Event struct {
ID string
Data interface{}
Attempts int
LastError error
}
type Pipeline struct {
mainQueue *gqueue.Queue
retryQueue *gqueue.Queue
maxAttempts int
}
func NewPipeline(maxAttempts int) *Pipeline {
p := &Pipeline{
mainQueue: gqueue.New(1000),
retryQueue: gqueue.New(1000),
maxAttempts: maxAttempts,
}
// Start retry handler
go p.handleRetries()
// Start main processor
go p.processEvents()
return p
}
func (p *Pipeline) handleRetries() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
if event := p.retryQueue.Pop(); event != nil {
e := event.(Event)
// Exponential backoff
if time.Since(e.LastAttempt) > time.Second*time.Duration(1<<e.Attempts) {
p.mainQueue.Push(e)
} else {
p.retryQueue.Push(e)
}
}
}
}
func (p *Pipeline) processEvents() {
for {
if data := p.mainQueue.Pop(); data != nil {
event := data.(Event)
if err := p.processEvent(event); err != nil {
event.Attempts++
event.LastError = err
event.LastAttempt = time.Now()
if event.Attempts < p.maxAttempts {
p.retryQueue.Push(event)
} else {
// Handle fatal error
p.handleFatalError(event)
}
}
}
}
}
Batch Processing with Timeouts
Here's a more sophisticated batch processing implementation:
type BatchProcessor struct {
q *gqueue.Queue
batchSize int
timeout time.Duration
processor func([]interface{}) error
}
func NewBatchProcessor(batchSize int, timeout time.Duration, processor func([]interface{}) error) *BatchProcessor {
return &BatchProcessor{
q: gqueue.New(batchSize * 10),
batchSize: batchSize,
timeout: timeout,
processor: processor,
}
}
func (bp *BatchProcessor) Start(ctx context.Context) {
batch := make([]interface{}, 0, bp.batchSize)
timer := time.NewTimer(bp.timeout)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
bp.processor(batch)
}
return
case <-timer.C:
if len(batch) > 0 {
bp.processor(batch)
batch = make([]interface{}, 0, bp.batchSize)
}
timer.Reset(bp.timeout)
default:
if item := bp.q.Pop(); item != nil {
batch = append(batch, item)
if len(batch) >= bp.batchSize {
bp.processor(batch)
batch = make([]interface{}, 0, bp.batchSize)
timer.Reset(bp.timeout)
}
}
}
}
}
Pro Tips from the Trenches 💡
After using gqueue
in several production systems, here are some tips I've learned:
1. Size Your Queue Right
// For small services (< 1000 req/s)
q := gqueue.New(1000)
// For medium services (1000-5000 req/s)
q := gqueue.New(5000)
// For high-load services (5000+ req/s)
q := gqueue.New(10000)
2. Implement Graceful Shutdown
func main() {
q := gqueue.New(1000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start worker
go func() {
for {
select {
case <-ctx.Done():
// Clean up and exit
return
default:
if data := q.Pop(); data != nil {
// Process data
}
}
}
}()
}
3. Handle Backpressure
func pushWithBackpressure(q *gqueue.Queue, data interface{}) error {
if q.Len() > int64(q.Cap()*0.8) {
// Queue is getting full, take action
return fmt.Errorf("queue is at high capacity")
}
q.Push(data)
return nil
}
Troubleshooting Guide and Common Pitfalls ⚠️
1. Memory Leaks
Symptom: Increasing memory usage over time
Common Causes:
- Forgotten goroutines still processing queue items
- Large items not being released from memory
Solution:
func preventMemoryLeaks() {
q := gqueue.New(1000)
// Proper cleanup with context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
// Clean up remaining items
for q.Len() > 0 {
_ = q.Pop()
}
return
default:
if item := q.Pop(); item != nil {
// Process item
item = nil // Help GC
}
}
}
}()
}
2. Queue Capacity Issues
Symptom: Application becomes slow or unresponsive
Common Causes:
- Queue filling up faster than it's being processed
- No backpressure mechanism
Solution:
func handleBackpressure() {
q := gqueue.New(1000)
// Monitor queue capacity
go func() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
if q.Len() > int64(float64(q.Cap())*0.8) {
// Alert on high usage
log.Printf("Queue at %d%% capacity",
int(float64(q.Len())/float64(q.Cap())*100))
// Take action (e.g., slow down producers)
throttleProducers()
}
}
}()
}
func throttleProducers() {
// Implement throttling logic
}
3. Deadlocks and Hanging
Symptom: Workers stop processing
Common Causes:
- Infinite loops in processing logic
- Missing error handling
Solution:
func preventDeadlocks() {
q := gqueue.New(1000)
// Use timeouts
go func() {
for {
done := make(chan bool)
go func() {
if item := q.Pop(); item != nil {
processWithTimeout(item)
}
done <- true
}()
// Timeout if processing takes too long
select {
case <-done:
// Processing completed normally
case <-time.After(5 * time.Second):
// Handle timeout
log.Println("Processing timeout")
}
}
}()
}
func processWithTimeout(item interface{}) {
// Your processing logic here
}
4. Data Loss During Shutdown
Symptom: Items disappear when application stops
Common Causes:
- Improper shutdown handling
- Not waiting for queue to empty
Solution:
func gracefulShutdown() {
q := gqueue.New(1000)
ctx, cancel := context.WithCancel(context.Background())
// Graceful shutdown handler
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
<-sigChan
// Cancel context to stop new items
cancel()
// Wait for queue to empty
for q.Len() > 0 {
time.Sleep(100 * time.Millisecond)
}
// Now safe to exit
os.Exit(0)
}()
}
5. Performance Degradation
Symptom: Processing becomes slower over time
Common Causes:
- Too many goroutines
- Inefficient batch processing
- Memory pressure
Solution:
func optimizePerformance() {
q := gqueue.New(1000)
// Use worker pool
const workerCount = 5
sem := make(chan struct{}, workerCount)
go func() {
for {
sem <- struct{}{} // Limit concurrent workers
go func() {
defer func() { <-sem }()
if item := q.Pop(); item != nil {
// Process with proper error handling
if err := processItem(item); err != nil {
log.Printf("Error processing item: %v", err)
}
}
}()
}
}()
}
Performance Tips 🚄
Here's a quick batch processing pattern that can significantly boost performance:
func batchProcess(q *gqueue.Queue, batchSize int) {
batch := make([]interface{}, 0, batchSize)
// Collect items
for i := 0; i < batchSize; i++ {
if item := q.Pop(); item != nil {
batch = append(batch, item)
}
}
// Process batch
if len(batch) > 0 {
processBatch(batch)
}
}
When to Use gqueue (And When Not To) 🤔
Perfect for:
- ✅ Background task processing
- ✅ Message buffering
- ✅ Event handling
- ✅ Rate limiting
Maybe not for:
- ❌ Persistent storage needs
- ❌ Distributed systems (use Kafka/RabbitMQ instead)
- ❌ Critical transaction processing
Let's Wrap It Up! 🎁
gqueue
is a fantastic tool when you need a lightweight, in-memory queue in Go. It's perfect for those scenarios where a full-blown message queue system would be overkill, but plain channels aren't quite enough.
Your Turn! 🎯
Now I'd love to hear from you:
- Have you used
gqueue
in your projects? - What other queue implementations have you tried in Go?
- Any cool patterns or tips to share?
Drop your thoughts in the comments below! And if you found this useful, don't forget to give it a ❤️
P.S. If you want to dive deeper into GoFrame, check out my other articles in the series! 📚