Skip to content
This repository was archived by the owner on Nov 15, 2021. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
#329 alter the way we start the thread that handles each profiler ins…
…tance
  • Loading branch information
sawilde committed Sep 3, 2015
commit d9c20c80b4df1e6260ebc15833fac1d388b2cda1
54 changes: 37 additions & 17 deletions main/OpenCover.Framework/Manager/ProfilerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using log4net;
using OpenCover.Framework.Communication;
using OpenCover.Framework.Persistance;
using OpenCover.Framework.Utility;
Expand All @@ -38,6 +39,8 @@ public class ProfilerManager : IProfilerManager

private ConcurrentQueue<byte[]> _messageQueue;

private static readonly ILog DebugLogger = LogManager.GetLogger("DebugLogger");

/// <summary>
/// Create an instance of the profiler manager
/// </summary>
Expand All @@ -56,6 +59,11 @@ public ProfilerManager(ICommunicationManager communicationManager, IPersistance
_perfCounters = perfCounters;
}

/// <summary>
/// Start the target process
/// </summary>
/// <param name="process"></param>
/// <param name="servicePrincipal"></param>
public void RunProcess(Action<Action<StringDictionary>> process, string[] servicePrincipal)
{
var key = Guid.NewGuid().GetHashCode().ToString("X");
Expand Down Expand Up @@ -173,7 +181,14 @@ private void ProcessMessages(WaitHandle[] handles)
break;

case 1:
_communicationManager.HandleCommunicationBlock(_mcb, block => threadHandles.Add(StartProcessingThread(block)));
_communicationManager.HandleCommunicationBlock(_mcb,
block => Task.Factory.StartNew(() =>
{
lock (threadHandles)
{
threadHandles.Add(StartProcessingThread(block));
}
}));
break;
}
} while (_continueWait);
Expand All @@ -186,32 +201,36 @@ private void ProcessMessages(WaitHandle[] handles)
_messageQueue.Enqueue(data);
}

if (threadHandles.Any())
lock (threadHandles)
{
var tasks = threadHandles
.Select((e, index) => new {Pair = e, Block = index / NumHandlesPerBlock})
.GroupBy(g => g.Block)
.Select(g => g.Select(a => a.Pair).ToList())
.Select(g => Task.Factory.StartNew(() =>
{
g.Select(h => h.Item1).ToList().ForEach(h => h.Set());
WaitHandle.WaitAll(g.Select(h => h.Item2).ToArray<WaitHandle>(), new TimeSpan(0, 0, 20));
})).ToArray();
Task.WaitAll(tasks);

foreach(var threadHandle in threadHandles)
if (threadHandles.Any())
{
threadHandle.Item1.Dispose();
threadHandle.Item2.Dispose();
var tasks = threadHandles
.Select((e, index) => new {Pair = e, Block = index/NumHandlesPerBlock})
.GroupBy(g => g.Block)
.Select(g => g.Select(a => a.Pair).ToList())
.Select(g => Task.Factory.StartNew(() =>
{
g.Select(h => h.Item1).ToList().ForEach(h => h.Set());
WaitHandle.WaitAll(g.Select(h => h.Item2).ToArray<WaitHandle>(), new TimeSpan(0, 0, 20));
})).ToArray();
Task.WaitAll(tasks);

foreach (var threadHandle in threadHandles)
{
threadHandle.Item1.Dispose();
threadHandle.Item2.Dispose();
}
threadHandles.Clear();
}
threadHandles.Clear();
}

_messageQueue.Enqueue(new byte[0]);
}

private Tuple<EventWaitHandle, EventWaitHandle> StartProcessingThread(ManagedBufferBlock block)
{
DebugLogger.InfoFormat("Starting Process Block => {0}", block.BufferId);
var terminateThread = new ManualResetEvent(false);
var threadTerminated = new ManualResetEvent(false);

Expand All @@ -221,6 +240,7 @@ private Tuple<EventWaitHandle, EventWaitHandle> StartProcessingThread(ManagedBuf
threadActivated, threadTerminated));
threadActivated.WaitOne();
}
DebugLogger.InfoFormat("Started Process Block => {0}", block.BufferId);
return new Tuple<EventWaitHandle, EventWaitHandle>(terminateThread, threadTerminated);
}

Expand Down