|
19 | 19 | public class Agent extends ARef { |
20 | 20 | volatile Object state; |
21 | 21 | AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); |
22 | | -AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY); |
23 | 22 |
|
24 | | -volatile ISeq errors = null; |
| 23 | + volatile ISeq errors = null; |
25 | 24 |
|
26 | 25 | final public static ExecutorService pooledExecutor = |
27 | 26 | Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); |
@@ -68,11 +67,8 @@ static void doRun(Action action){ |
68 | 67 | try |
69 | 68 | { |
70 | 69 | changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); |
71 | | - for(Object o : action.agent.watchers.get()) |
72 | | - { |
73 | | - Map.Entry e = (Map.Entry) o; |
74 | | - ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed)); |
75 | | - } |
| 70 | + if(changed) |
| 71 | + action.agent.notifyWatches(); |
76 | 72 | } |
77 | 73 | catch(Exception e) |
78 | 74 | { |
@@ -143,10 +139,10 @@ public void clearErrors(){ |
143 | 139 | errors = null; |
144 | 140 | } |
145 | 141 |
|
146 | | -public Object dispatch(IFn fn, ISeq args, boolean solo) throws Exception{ |
| 142 | +public Object dispatch(IFn fn, ISeq args, boolean solo) { |
147 | 143 | if(errors != null) |
148 | 144 | { |
149 | | - throw new Exception("Agent has errors", (Exception) RT.first(errors)); |
| 145 | + throw new RuntimeException("Agent has errors", (Exception) RT.first(errors)); |
150 | 146 | } |
151 | 147 | Action action = new Action(this, fn, args, solo); |
152 | 148 | dispatchAction(action); |
@@ -183,31 +179,7 @@ public int getQueueCount(){ |
183 | 179 | return q.get().count(); |
184 | 180 | } |
185 | 181 |
|
186 | | -public Agent addWatch(Object watcher, IFn callback){ |
187 | | - boolean added = false; |
188 | | - IPersistentMap prior = null; |
189 | | - while(!added) |
190 | | - { |
191 | | - prior = watchers.get(); |
192 | | - added = watchers.compareAndSet(prior, prior.assoc(watcher,callback)); |
193 | | - } |
194 | | - |
195 | | - return this; |
196 | | -} |
197 | | - |
198 | | -public Agent removeWatch(Object watcher) throws Exception{ |
199 | | - boolean removed = false; |
200 | | - IPersistentMap prior = null; |
201 | | - while(!removed) |
202 | | - { |
203 | | - prior = watchers.get(); |
204 | | - removed = watchers.compareAndSet(prior, prior.without(watcher)); |
205 | | - } |
206 | | - |
207 | | - return this; |
208 | | -} |
209 | | - |
210 | | -static public int releasePendingSends(){ |
| 182 | + static public int releasePendingSends(){ |
211 | 183 | IPersistentVector sends = nested.get(); |
212 | 184 | if(sends == null) |
213 | 185 | return 0; |
|
0 commit comments