23
23
* THE SOFTWARE.
24
24
*/
25
25
package com .iluwatar .logaggregation ;
26
+ import java .util .ArrayList ;
27
+ import java .util .List ;
26
28
import java .util .concurrent .BlockingQueue ;
29
+ import java .util .concurrent .CountDownLatch ;
30
+ import java .util .concurrent .Executors ;
27
31
import java .util .concurrent .LinkedBlockingQueue ;
28
32
import java .util .concurrent .ScheduledExecutorService ;
29
- import java .util .concurrent .Executors ;
30
33
import java .util .concurrent .TimeUnit ;
31
34
import java .util .concurrent .atomic .AtomicInteger ;
32
- import java .util .concurrent .CountDownLatch ;
33
- import java .util .ArrayList ;
34
- import java .util .List ;
35
35
import lombok .extern .slf4j .Slf4j ;
36
36
/**
37
37
* Responsible for collecting and buffering logs from different services. Once the logs reach a
@@ -45,47 +45,51 @@ public class LogAggregator {
45
45
private static final int BUFFER_THRESHOLD = 3 ;
46
46
private static final int FLUSH_INTERVAL_SECONDS = 5 ;
47
47
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10 ;
48
-
48
+
49
49
private final CentralLogStore centralLogStore ;
50
50
private final BlockingQueue <LogEntry > buffer = new LinkedBlockingQueue <>();
51
51
private final LogLevel minLogLevel ;
52
52
private final ScheduledExecutorService scheduledExecutor = Executors .newScheduledThreadPool (1 );
53
53
private final AtomicInteger logCount = new AtomicInteger (0 );
54
54
private final CountDownLatch shutdownLatch = new CountDownLatch (1 );
55
55
private volatile boolean running = true ;
56
+
56
57
/**
57
58
* constructor of LogAggregator.
58
59
*
59
60
* @param centralLogStore central log store implement
60
61
* @param minLogLevel min log level to store log
61
62
*/
62
63
public LogAggregator (CentralLogStore centralLogStore , LogLevel minLogLevel ) {
63
- this .centralLogStore = centralLogStore ;
64
+ this .centralLogStore = centralLogStore ;
64
65
this .minLogLevel = minLogLevel ;
65
66
startPeriodicFlusher ();
66
-
67
+
67
68
// Add shutdown hook for graceful termination
68
- Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
69
- try {
70
- stop ();
71
- } catch (InterruptedException e ) {
72
- LOGGER .warn ("Shutdown interrupted" , e );
73
- Thread .currentThread ().interrupt ();
74
- }
75
- }));
69
+ Runtime .getRuntime ()
70
+ .addShutdownHook (
71
+ new Thread (
72
+ () -> {
73
+ try {
74
+ stop ();
75
+ } catch (InterruptedException e ) {
76
+ LOGGER .warn ("Shutdown interrupted" , e );
77
+ Thread .currentThread ().interrupt ();
78
+ }
79
+ }));
76
80
}
77
81
78
82
/**
79
83
* Collects a given log entry, and filters it by the defined log level.
80
84
*
81
85
* @param logEntry The log entry to collect.
82
86
*/
83
- public void collectLog (LogEntry logEntry ) {
87
+ public void collectLog (LogEntry logEntry ) {
84
88
if (!running ) {
85
89
LOGGER .warn ("LogAggregator is shutting down. Skipping log entry." );
86
90
return ;
87
91
}
88
-
92
+
89
93
if (logEntry .getLevel () == null || minLogLevel == null ) {
90
94
LOGGER .warn ("Log level or threshold level is null. Skipping." );
91
95
return ;
@@ -115,19 +119,19 @@ public void collectLog(LogEntry logEntry) {
115
119
*
116
120
* @throws InterruptedException If any thread has interrupted the current thread.
117
121
*/
118
- public void stop () throws InterruptedException {
122
+ public void stop () throws InterruptedException {
119
123
LOGGER .info ("Stopping LogAggregator..." );
120
124
running = false ;
121
-
125
+
122
126
// Shutdown the scheduler gracefully
123
127
scheduledExecutor .shutdown ();
124
-
128
+
125
129
try {
126
130
// Wait for scheduled tasks to complete
127
131
if (!scheduledExecutor .awaitTermination (SHUTDOWN_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
128
132
LOGGER .warn ("Scheduler did not terminate gracefully, forcing shutdown" );
129
133
scheduledExecutor .shutdownNow ();
130
-
134
+
131
135
// Wait a bit more for tasks to respond to interruption
132
136
if (!scheduledExecutor .awaitTermination (2 , TimeUnit .SECONDS )) {
133
137
LOGGER .error ("Scheduler did not terminate after forced shutdown" );
@@ -141,57 +145,53 @@ public void stop() throws InterruptedException {
141
145
}
142
146
}
143
147
144
-
145
-
146
148
/**
147
- * Waits for the LogAggregator to complete shutdown.
148
- * Useful for testing or controlled shutdown scenarios.
149
+ * Waits for the LogAggregator to complete shutdown. Useful for testing or controlled shutdown
150
+ * scenarios.
149
151
*
150
152
* @throws InterruptedException If any thread has interrupted the current thread.
151
153
*/
152
- public void awaitShutdown () throws InterruptedException {
154
+ public void awaitShutdown () throws InterruptedException {
153
155
shutdownLatch .await ();
154
156
}
155
157
156
-
157
158
private void flushBuffer () {
158
159
if (!running && buffer .isEmpty ()) {
159
160
return ;
160
161
}
161
-
162
+
162
163
try {
163
164
List <LogEntry > batch = new ArrayList <>();
164
165
int drained = 0 ;
165
-
166
+
166
167
// Drain up to a reasonable batch size for efficiency
167
168
LogEntry logEntry ;
168
169
while ((logEntry = buffer .poll ()) != null && drained < 100 ) {
169
170
batch .add (logEntry );
170
171
drained ++;
171
172
}
172
-
173
+
173
174
if (!batch .isEmpty ()) {
174
175
LOGGER .debug ("Flushing {} log entries to central store" , batch .size ());
175
-
176
+
176
177
// Process the batch
177
178
for (LogEntry entry : batch ) {
178
179
centralLogStore .storeLog (entry );
179
180
logCount .decrementAndGet ();
180
181
}
181
-
182
+
182
183
LOGGER .debug ("Successfully flushed {} log entries" , batch .size ());
183
184
}
184
185
} catch (Exception e ) {
185
186
LOGGER .error ("Error occurred while flushing buffer" , e );
186
187
}
187
188
}
188
189
189
-
190
190
/**
191
- * Starts the periodic buffer flusher using ScheduledExecutorService.
192
- * This eliminates the busy-waiting loop with Thread.sleep().
191
+ * Starts the periodic buffer flusher using ScheduledExecutorService. This eliminates the
192
+ * busy-waiting loop with Thread.sleep().
193
193
*/
194
- private void startPeriodicFlusher () {
194
+ private void startPeriodicFlusher () {
195
195
scheduledExecutor .scheduleAtFixedRate (
196
196
() -> {
197
197
if (running ) {
@@ -202,16 +202,16 @@ private void startPeriodicFlusher() {
202
202
}
203
203
}
204
204
},
205
- FLUSH_INTERVAL_SECONDS , // Initial delay
206
- FLUSH_INTERVAL_SECONDS , // Period
207
- TimeUnit .SECONDS
208
- );
209
-
210
- LOGGER . info ( "Periodic log flusher started with interval of {} seconds" , FLUSH_INTERVAL_SECONDS );
205
+ FLUSH_INTERVAL_SECONDS , // Initial delay
206
+ FLUSH_INTERVAL_SECONDS , // Period
207
+ TimeUnit .SECONDS );
208
+
209
+ LOGGER . info (
210
+ "Periodic log flusher started with interval of {} seconds" , FLUSH_INTERVAL_SECONDS );
211
211
}
212
+
212
213
/**
213
- * Gets the current number of buffered log entries.
214
- * Useful for monitoring and testing.
214
+ * Gets the current number of buffered log entries. Useful for monitoring and testing.
215
215
*
216
216
* @return Current buffer size
217
217
*/
@@ -220,8 +220,7 @@ public int getBufferSize() {
220
220
}
221
221
222
222
/**
223
- * Gets the current log count.
224
- * Useful for monitoring and testing.
223
+ * Gets the current log count. Useful for monitoring and testing.
225
224
*
226
225
* @return Current log count
227
226
*/
0 commit comments