This repository was archived by the owner on Nov 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 243
Expand file tree
/
Copy pathProfilerManager.cs
More file actions
330 lines (297 loc) · 13.1 KB
/
ProfilerManager.cs
File metadata and controls
330 lines (297 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
//
// OpenCover - S Wilde
//
// This source code is released under the MIT License; see the accompanying license file.
//
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using log4net;
using OpenCover.Framework.Communication;
using OpenCover.Framework.Persistance;
using OpenCover.Framework.Utility;
namespace OpenCover.Framework.Manager
{
/// <summary>
/// This is the core manager for integrating the host the target
/// application and the profiler
/// </summary>
/// <remarks>It probably does too much!</remarks>
public class ProfilerManager : IProfilerManager
{
const int MaxMsgSize = 65536;
private const int NumHandlesPerBlock = 32;
private const string ProfilerGuid = "{1542C21D-80C3-45E6-A56C-A9C1E4BEB7B8}";
private readonly ICommunicationManager _communicationManager;
private readonly IPersistance _persistance;
private readonly IMemoryManager _memoryManager;
private readonly ICommandLine _commandLine;
private readonly IPerfCounters _perfCounters;
private MemoryManager.ManagedCommunicationBlock _mcb;
private ConcurrentQueue<byte[]> _messageQueue;
private static readonly ILog DebugLogger = LogManager.GetLogger("DebugLogger");
/// <summary>
/// Create an instance of the profiler manager
/// </summary>
/// <param name="communicationManager"></param>
/// <param name="persistance"></param>
/// <param name="memoryManager"></param>
/// <param name="commandLine"></param>
/// <param name="perfCounters"></param>
public ProfilerManager(ICommunicationManager communicationManager, IPersistance persistance,
IMemoryManager memoryManager, ICommandLine commandLine, IPerfCounters perfCounters)
{
_communicationManager = communicationManager;
_persistance = persistance;
_memoryManager = memoryManager;
_commandLine = commandLine;
_perfCounters = perfCounters;
}
/// <summary>
/// Start the target process
/// </summary>
/// <param name="process"></param>
/// <param name="servicePrincipal"></param>
public void RunProcess(Action<Action<StringDictionary>> process, string[] servicePrincipal)
{
var key = Guid.NewGuid().GetHashCode().ToString("X");
string @namespace = servicePrincipal.Any() ? "Global" : "Local";
_memoryManager.Initialise(@namespace, key, servicePrincipal);
_messageQueue = new ConcurrentQueue<byte[]>();
using (_mcb = new MemoryManager.ManagedCommunicationBlock(@namespace, key, MaxMsgSize, -1, servicePrincipal))
using (var processMgmt = new AutoResetEvent(false))
using (var queueMgmt = new AutoResetEvent(false))
using (var environmentKeyRead = new AutoResetEvent(false))
{
var handles = new List<WaitHandle> { processMgmt, _mcb.ProfilerRequestsInformation };
ThreadPool.QueueUserWorkItem(
SetProfilerAttributes(process, key, @namespace, environmentKeyRead, processMgmt));
ThreadPool.QueueUserWorkItem(SaveVisitData(queueMgmt));
// wait for the environment key to be read
if (WaitHandle.WaitAny(new WaitHandle[] {environmentKeyRead}, new TimeSpan(0, 0, 0, 10)) != -1)
{
ProcessMessages(handles.ToArray());
queueMgmt.WaitOne();
}
}
}
private WaitCallback SetProfilerAttributes(Action<Action<StringDictionary>> process, string profilerKey,
string profilerNamespace, EventWaitHandle environmentKeyRead, EventWaitHandle processMgmt)
{
return state =>
{
try
{
process(dictionary =>
{
if (dictionary == null) return;
SetProfilerAttributesOnDictionary(profilerKey, profilerNamespace, dictionary);
environmentKeyRead.Set();
});
}
finally
{
processMgmt.Set();
}
};
}
private void SetProfilerAttributesOnDictionary(string profilerKey, string profilerNamespace, StringDictionary dictionary)
{
dictionary[@"OpenCover_Profiler_Key"] = profilerKey;
dictionary[@"OpenCover_Profiler_Namespace"] = profilerNamespace;
dictionary[@"OpenCover_Profiler_Threshold"] = _commandLine.Threshold.ToString(CultureInfo.InvariantCulture);
if (_commandLine.TraceByTest)
dictionary[@"OpenCover_Profiler_TraceByTest"] = "1";
dictionary["Cor_Profiler"] = ProfilerGuid;
dictionary["Cor_Enable_Profiling"] = "1";
dictionary["CoreClr_Profiler"] = ProfilerGuid;
dictionary["CoreClr_Enable_Profiling"] = "1";
dictionary["Cor_Profiler_Path"] = string.Empty;
dictionary["CorClr_Profiler_Path"] = string.Empty;
if (_commandLine.CommunicationTimeout > 0)
dictionary["OpenCover_Profiler_ShortWait"] = _commandLine.CommunicationTimeout.ToString();
switch (_commandLine.Registration)
{
case Registration.Path32:
string profilerPath32 = ProfilerRegistration.GetProfilerPath(false);
dictionary["Cor_Profiler_Path"] = profilerPath32;
dictionary["CorClr_Profiler_Path"] = profilerPath32;
break;
case Registration.Path64:
string profilerPath64 = ProfilerRegistration.GetProfilerPath(true);
dictionary["Cor_Profiler_Path"] = profilerPath64;
dictionary["CorClr_Profiler_Path"] = profilerPath64;
break;
}
}
private WaitCallback SaveVisitData(EventWaitHandle queueMgmt)
{
return state =>
{
while (true)
{
byte[] data;
while (!_messageQueue.TryDequeue(out data))
ThreadHelper.YieldOrSleep(100);
_perfCounters.CurrentMemoryQueueSize = _messageQueue.Count;
_perfCounters.IncrementBlocksReceived();
if (data.Length == 0)
{
_communicationManager.Complete();
queueMgmt.Set();
return;
}
_persistance.SaveVisitData(data);
}
};
}
/// <summary>
/// wait for how long
/// </summary>
internal static int BufferWaitCount = 30;
private bool _continueWait = true;
private void ProcessMessages(WaitHandle[] handles)
{
var threadHandles = new List<Tuple<EventWaitHandle, EventWaitHandle>>();
do
{
switch (WaitHandle.WaitAny(handles))
{
case 0:
_continueWait = false;
break;
case 1:
_communicationManager.HandleCommunicationBlock(_mcb,
block => Task.Factory.StartNew(() =>
{
lock (threadHandles)
{
threadHandles.Add(StartProcessingThread(block));
}
}));
break;
}
} while (_continueWait);
// we need to let the profilers dump the thread buffers over before they close - max 15s (ish)
var i = 0;
while (i < BufferWaitCount && _memoryManager.GetBlocks.Any(b => b.Active))
{
DebugLogger.InfoFormat("Waiting for {0} processes to close", _memoryManager.GetBlocks.Count(b => b.Active));
Thread.Sleep(500);
i++;
}
// grab anything left in the main buffers
foreach (var block in _memoryManager.GetBlocks.Where(b => b.Active).Select(b => b.MemoryBlock))
{
var data = new byte[block.BufferSize];
block.StreamAccessorResults.Seek(0, SeekOrigin.Begin);
block.StreamAccessorResults.Read(data, 0, block.BufferSize);
_messageQueue.Enqueue(data);
}
lock (threadHandles)
{
if (threadHandles.Any())
{
var tasks = threadHandles
.Select((e, index) => new {Pair = e, Block = index/NumHandlesPerBlock})
.GroupBy(g => g.Block)
.Select(g => g.Select(a => a.Pair).ToList())
.Select(g => Task.Factory.StartNew(() =>
{
ConsumeException(() =>
{
g.Select(h => h.Item1).ToList().ForEach(h => h.Set());
WaitHandle.WaitAll(g.Select(h => h.Item2).ToArray<WaitHandle>(), new TimeSpan(0, 0, 20));
});
})).ToArray();
Task.WaitAll(tasks);
foreach (var threadHandle in threadHandles)
{
var handle = threadHandle;
ConsumeException(() =>
{
handle.Item1.Dispose();
handle.Item2.Dispose();
});
}
threadHandles.Clear();
}
}
_messageQueue.Enqueue(new byte[0]);
}
// wrap exceptions when closing down
private static void ConsumeException(Action doSomething)
{
try
{
doSomething();
}
catch (Exception ex)
{
DebugLogger.Error("An unexpected exception was encountered but consumed.", ex);
}
}
private Tuple<EventWaitHandle, EventWaitHandle> StartProcessingThread(ManagedBufferBlock block)
{
DebugLogger.InfoFormat("Starting Process Block => {0}", block.BufferId);
var terminateThread = new ManualResetEvent(false);
var threadTerminated = new ManualResetEvent(false);
using (var threadActivated = new AutoResetEvent(false))
{
ThreadPool.QueueUserWorkItem(ProcessBlock(block, terminateThread,
threadActivated, threadTerminated));
threadActivated.WaitOne();
}
DebugLogger.InfoFormat("Started Process Block => {0}", block.BufferId);
return new Tuple<EventWaitHandle, EventWaitHandle>(terminateThread, threadTerminated);
}
private WaitCallback ProcessBlock(ManagedBufferBlock block,
WaitHandle terminateThread, EventWaitHandle threadActivated, EventWaitHandle threadTerminated)
{
return state =>
{
var processEvents = new []
{
block.CommunicationBlock.ProfilerRequestsInformation,
block.MemoryBlock.ProfilerHasResults,
terminateThread
};
threadActivated.Set();
while(block.Active)
{
switch (WaitHandle.WaitAny(processEvents))
{
case 0:
_communicationManager.HandleCommunicationBlock(block.CommunicationBlock, b => { });
break;
case 1:
var data = _communicationManager.HandleMemoryBlock(block.MemoryBlock);
// don't let the queue get too big as using too much memory causes
// problems i.e. the target process closes down but the host takes
// ages to shutdown; this is a compromise.
_messageQueue.Enqueue(data);
if (_messageQueue.Count > 400)
{
do
{
ThreadHelper.YieldOrSleep(100);
} while (_messageQueue.Count > 200);
}
break;
case 2:
threadTerminated.Set();
return;
}
}
threadTerminated.Set();
_memoryManager.RemoveDeactivatedBlocks();
};
}
}
}