@@ -10,56 +10,69 @@ import (
1010
1111var _ core.Worker = (* Ring )(nil )
1212
13- // Ring for simple queue using buffer channel
13+ // Ring represents a simple queue using a buffer channel.
1414type Ring struct {
1515 sync.Mutex
16- taskQueue []core.TaskMessage
17- runFunc func (context.Context , core.TaskMessage ) error
18- capacity int
19- count int
20- head int
21- tail int
22- exit chan struct {}
23- logger Logger
24- stopOnce sync.Once
25- stopFlag int32
16+ taskQueue []core.TaskMessage // taskQueue holds the tasks in the ring buffer.
17+ runFunc func (context.Context , core.TaskMessage ) error // runFunc is the function responsible for processing tasks.
18+ capacity int // capacity is the maximum number of tasks the queue can hold.
19+ count int // count is the current number of tasks in the queue.
20+ head int // head is the index of the first task in the queue.
21+ tail int // tail is the index where the next task will be added.
22+ exit chan struct {} // exit is used to signal when the queue is shutting down.
23+ logger Logger // logger is used for logging messages.
24+ stopOnce sync.Once // stopOnce ensures the shutdown process only runs once.
25+ stopFlag int32 // stopFlag indicates whether the queue is shutting down.
2626}
2727
28- // Run to execute new task
28+ // Run executes a new task using the provided context and task message.
29+ // It calls the runFunc function, which is responsible for processing the task.
30+ // The context allows for cancellation and timeout control of the task execution.
2931func (s * Ring ) Run (ctx context.Context , task core.TaskMessage ) error {
3032 return s .runFunc (ctx , task )
3133}
3234
33- // Shutdown the worker
35+ // Shutdown gracefully shuts down the worker.
36+ // It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added.
37+ // If the queue is already shut down, it returns ErrQueueShutdown.
38+ // It waits for all tasks to be processed before completing the shutdown.
3439func (s * Ring ) Shutdown () error {
40+ // Attempt to set the stopFlag from 0 to 1. If it fails, the queue is already shut down.
3541 if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
3642 return ErrQueueShutdown
3743 }
3844
45+ // Ensure the shutdown process only runs once.
3946 s .stopOnce .Do (func () {
4047 s .Lock ()
4148 count := s .count
4249 s .Unlock ()
50+ // If there are tasks in the queue, wait for them to be processed.
4351 if count > 0 {
4452 <- s .exit
4553 }
4654 })
4755 return nil
4856}
4957
50- // Queue send task to the buffer channel
58+ // Queue adds a task to the ring buffer queue.
59+ // It returns an error if the queue is shut down or has reached its maximum capacity.
5160func (s * Ring ) Queue (task core.TaskMessage ) error { //nolint:stylecheck
61+ // Check if the queue is shut down
5262 if atomic .LoadInt32 (& s .stopFlag ) == 1 {
5363 return ErrQueueShutdown
5464 }
65+ // Check if the queue has reached its maximum capacity
5566 if s .capacity > 0 && s .count >= s .capacity {
5667 return ErrMaxCapacity
5768 }
5869
5970 s .Lock ()
71+ // Resize the queue if necessary
6072 if s .count == len (s .taskQueue ) {
6173 s .resize (s .count * 2 )
6274 }
75+ // Add the task to the queue
6376 s .taskQueue [s .tail ] = task
6477 s .tail = (s .tail + 1 ) % len (s .taskQueue )
6578 s .count ++
@@ -68,7 +81,15 @@ func (s *Ring) Queue(task core.TaskMessage) error { //nolint:stylecheck
6881 return nil
6982}
7083
71- // Request a new task from channel
84+ // Request retrieves the next task message from the ring queue.
85+ // If the queue has been stopped and is empty, it signals the exit channel
86+ // and returns an error indicating the queue has been closed.
87+ // If the queue is empty but not stopped, it returns an error indicating
88+ // there are no tasks in the queue.
89+ // If a task is successfully retrieved, it is removed from the queue,
90+ // and the queue may be resized if it is less than half full.
91+ // Returns the task message and nil on success, or an error if the queue
92+ // is empty or has been closed.
7293func (s * Ring ) Request () (core.TaskMessage , error ) {
7394 if atomic .LoadInt32 (& s .stopFlag ) == 1 && s .count == 0 {
7495 select {
@@ -95,6 +116,15 @@ func (s *Ring) Request() (core.TaskMessage, error) {
95116 return data , nil
96117}
97118
119+ // resize adjusts the size of the ring buffer to the specified capacity n.
120+ // It reallocates the underlying slice to the new size and copies the existing
121+ // elements to the new slice in the correct order. The head and tail pointers
122+ // are updated accordingly to maintain the correct order of elements in the
123+ // resized buffer.
124+ //
125+ // Parameters:
126+ //
127+ // n - the new capacity of the ring buffer.
98128func (q * Ring ) resize (n int ) {
99129 nodes := make ([]core.TaskMessage , n )
100130 if q .head < q .tail {
@@ -109,7 +139,18 @@ func (q *Ring) resize(n int) {
109139 q .taskQueue = nodes
110140}
111141
112- // NewRing for create new Ring instance
142+ // NewRing creates a new Ring instance with the provided options.
143+ // It initializes the task queue with a default size of 2, sets the capacity
144+ // based on the provided options, and configures the logger and run function.
145+ // The function returns a pointer to the newly created Ring instance.
146+ //
147+ // Parameters:
148+ //
149+ // opts - A variadic list of Option functions to configure the Ring instance.
150+ //
151+ // Returns:
152+ //
153+ // *Ring - A pointer to the newly created Ring instance.
113154func NewRing (opts ... Option ) * Ring {
114155 o := NewOptions (opts ... )
115156 w := & Ring {
0 commit comments