1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.jph.channels.router;
15
16 import org.jph.channels.DeadLetterOutputChannel;
17 import org.jph.channels.MessageCloner;
18 import org.jph.channels.OutputChannel;
19 import org.jph.collections.predicate.Predicate;
20 import org.jph.collections.predicate.TruePredicate;
21 import org.jph.concurrent.unmodifiable.UnmodifiableLists;
22
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27
28 /***
29 * De StdRoutingOutputChannel is the default implementation of the RoutingOutputChannel.
30 * <p/>
31 * this implementation is not complete:
32 * problems:
33 * <li>
34 * <ul>what to do if stopAfterFirst is false and not all receivers accept
35 * the message, but some did. Should the message be send to the failOutputChannel?
36 * <li>at the moment the failOutputChannel is never used.
37 * </li>
38 *
39 * @author Peter Veentjer.
40 */
41 public final class StdRoutingOutputChannel<M> implements RoutingOutputChannel<M> {
42
43
44 private List<OutRouterEntry<M>> _entryList;
45
46 private final OutputChannel<M> _failOutputChannel;
47
48
49 private final boolean _stopAfterFirst;
50
51 private static <M> List<OutRouterEntry<M>> makeList(OutRouterEntry<M> entry) {
52 if (entry == null) throw new NullPointerException();
53 List<OutRouterEntry<M>> list = new ArrayList<OutRouterEntry<M>>();
54 list.add(entry);
55 return list;
56 }
57
58 public StdRoutingOutputChannel(OutRouterEntry<M> entry, boolean stopAfterFirst) {
59 this(makeList(entry), stopAfterFirst);
60 }
61
62 public StdRoutingOutputChannel(List<OutRouterEntry<M>> entryList, boolean stopAfterFirst) {
63 this(entryList, new DeadLetterOutputChannel<M>(), stopAfterFirst);
64 }
65
66
67 public StdRoutingOutputChannel(List<OutRouterEntry<M>> entryList, OutputChannel<M> failOutputChannel,
68 boolean stopAfterFirst) {
69 if (entryList == null || failOutputChannel == null) throw new NullPointerException();
70 _stopAfterFirst = stopAfterFirst;
71 _entryList = entryList;
72 _failOutputChannel = failOutputChannel;
73 }
74
75
76 /***
77 * Returns the OutputChannel where all messages are send to that can`t
78 * be delivered.
79 */
80 public OutputChannel<M> getFailOutputChannel() {
81 return _failOutputChannel;
82 }
83
84 public boolean isStopAfterFirst() {
85 return _stopAfterFirst;
86 }
87
88 public MessageCloner<M> getMessageCloner() {
89 return null;
90 }
91
92 public void add(OutRouterEntry<M> entry) {
93 if (entry == null)
94 throw new NullPointerException("entry can`t be null");
95
96 synchronized (this) {
97 _entryList = UnmodifiableLists.add(_entryList, entry);
98 }
99 }
100
101 public void subscribe(OutputChannel<M> outputChannel) {
102 subscribe(null, outputChannel);
103 }
104
105 public void subscribe(Predicate predicate, OutputChannel<M> outputChannel) {
106 if (outputChannel == null) throw new NullPointerException();
107
108 OutRouterEntry<M> entryOut = new OutRouterEntry<M>(
109 outputChannel,
110 predicate == null ? TruePredicate.INSTANCE : predicate);
111 add(entryOut);
112 }
113
114 public Iterator<OutRouterEntry<M>> entries() {
115 return _entryList.iterator();
116 }
117
118 public void put(M msg) throws InterruptedException {
119 if (msg == null) throw new NullPointerException();
120
121 for (OutRouterEntry<M> entryOut : _entryList) {
122 if (entryOut.getPredicate().evaluate(msg)) {
123 entryOut.getChannel().put(msg);
124 if (_stopAfterFirst)
125 return;
126 }
127 }
128 }
129
130 public boolean offer(M msg, long timeout, TimeUnit unit) throws InterruptedException {
131 if (msg == null || unit == null) throw new NullPointerException();
132
133 long timeoutNs = unit.convert(timeout, TimeUnit.NANOSECONDS);
134 long startNs = System.nanoTime();
135
136 for (OutRouterEntry<M> entryOut : _entryList) {
137 boolean accepted = entryOut.getPredicate().evaluate(msg);
138 if (accepted) {
139
140 boolean offered = entryOut.getChannel().offer(msg, timeoutNs, TimeUnit.NANOSECONDS);
141 if (offered && _stopAfterFirst)
142 return true;
143
144
145 long endNs = System.nanoTime();
146 long elapsedNs = endNs - startNs;
147 timeoutNs -= -elapsedNs;
148 timeoutNs = timeoutNs < 0 ? 0 : timeoutNs;
149 startNs = endNs;
150 }
151 }
152
153
154 return false;
155 }
156 }