View Javadoc

1   //  Copyright 2005 The Apache Software Foundation
2   //
3   // Licensed under the Apache License, Version 2.0 (the "License");
4   // you may not use this file except in compliance with the License.
5   // You may obtain a copy of the License at
6   //
7   //     http://www.apache.org/licenses/LICENSE-2.0
8   //
9   // Unless required by applicable law or agreed to in writing, software
10  // distributed under the License is distributed on an "AS IS" BASIS,
11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  // See the License for the specific language governing permissions and
13  // limitations under the License. 
14  package org.jph.channels.router;
15  
16  import org.jph.channels.DeadLetterOutputChannel;
17  import org.jph.channels.MessageCloner;
18  import org.jph.channels.OutputChannel;
19  import org.jph.collections.predicate.Predicate;
20  import org.jph.collections.predicate.TruePredicate;
21  import org.jph.concurrent.unmodifiable.UnmodifiableLists;
22  
23  import java.util.ArrayList;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.TimeUnit;
27  
28  /***
29   * De StdRoutingOutputChannel is the default implementation of the RoutingOutputChannel.
30   * <p/>
31   * this implementation is not complete:
32   * problems:
33   * <li>
34   * <ul>what to do if stopAfterFirst is false and not all receivers accept
35   * the message, but some did. Should the message be send to the failOutputChannel?
36   * <li>at the moment the failOutputChannel is never used.
37   * </li>
38   *
39   * @author Peter Veentjer.
40   */
41  public final class StdRoutingOutputChannel<M> implements RoutingOutputChannel<M> {
42  
43  	//the entrylist always is immutable.
44  	private List<OutRouterEntry<M>> _entryList;
45  	//the channel where message are send to that can`t be delivered.
46  	private final OutputChannel<M> _failOutputChannel;
47  	//stopAfterFirst indicated if messages should be tried with other channels if
48  	//a channel has accepted the message.
49  	private final boolean _stopAfterFirst;
50  
51  	private static <M> List<OutRouterEntry<M>> makeList(OutRouterEntry<M> entry) {
52  		if (entry == null) throw new NullPointerException();
53  		List<OutRouterEntry<M>> list = new ArrayList<OutRouterEntry<M>>();
54  		list.add(entry);
55  		return list;
56  	}
57  
58  	public StdRoutingOutputChannel(OutRouterEntry<M> entry, boolean stopAfterFirst) {
59  		this(makeList(entry), stopAfterFirst);
60  	}
61  
62  	public StdRoutingOutputChannel(List<OutRouterEntry<M>> entryList, boolean stopAfterFirst) {
63  		this(entryList, new DeadLetterOutputChannel<M>(), stopAfterFirst);
64  	}
65  
66  
67  	public StdRoutingOutputChannel(List<OutRouterEntry<M>> entryList, OutputChannel<M> failOutputChannel,
68  								   boolean stopAfterFirst) {
69  		if (entryList == null || failOutputChannel == null) throw new NullPointerException();
70  		_stopAfterFirst = stopAfterFirst;
71  		_entryList = entryList;
72  		_failOutputChannel = failOutputChannel;
73  	}
74  
75  
76  	/***
77  	 * Returns the OutputChannel where all messages are send to that can`t
78  	 * be delivered.
79  	 */
80  	public OutputChannel<M> getFailOutputChannel() {
81  		return _failOutputChannel;
82  	}
83  
84  	public boolean isStopAfterFirst() {
85  		return _stopAfterFirst;
86  	}
87  
88  	public MessageCloner<M> getMessageCloner() {
89  		return null;  //To change body of implemented methods use File | Settings | File Templates.
90  	}
91  
92  	public void add(OutRouterEntry<M> entry) {
93  		if (entry == null)
94  			throw new NullPointerException("entry can`t be null");
95  
96  		synchronized (this) {
97  			_entryList = UnmodifiableLists.add(_entryList, entry);
98  		}
99  	}
100 
101 	public void subscribe(OutputChannel<M> outputChannel) {
102 		subscribe(null, outputChannel);
103 	}
104 
105 	public void subscribe(Predicate predicate, OutputChannel<M> outputChannel) {
106 		if (outputChannel == null) throw new NullPointerException();
107 
108 		OutRouterEntry<M> entryOut = new OutRouterEntry<M>(
109 				outputChannel,
110 				predicate == null ? TruePredicate.INSTANCE : predicate);
111 		add(entryOut);
112 	}
113 
114 	public Iterator<OutRouterEntry<M>> entries() {
115 		return _entryList.iterator();
116 	}
117 
118 	public void put(M msg) throws InterruptedException {
119 		if (msg == null) throw new NullPointerException();
120 
121 		for (OutRouterEntry<M> entryOut : _entryList) {
122 			if (entryOut.getPredicate().evaluate(msg)) {
123 				entryOut.getChannel().put(msg);
124 				if (_stopAfterFirst)
125 					return;
126 			}
127 		}
128 	}
129 
130 	public boolean offer(M msg, long timeout, TimeUnit unit) throws InterruptedException {
131 		if (msg == null || unit == null) throw new NullPointerException();
132 
133 		long timeoutNs = unit.convert(timeout, TimeUnit.NANOSECONDS);
134 		long startNs = System.nanoTime();
135 
136 		for (OutRouterEntry<M> entryOut : _entryList) {
137 			boolean accepted = entryOut.getPredicate().evaluate(msg);
138 			if (accepted) {
139 
140 				boolean offered = entryOut.getChannel().offer(msg, timeoutNs, TimeUnit.NANOSECONDS);
141 				if (offered && _stopAfterFirst)
142 					return true;
143 
144 				//decrease the remaining timeout
145 				long endNs = System.nanoTime();
146 				long elapsedNs = endNs - startNs;
147 				timeoutNs -= -elapsedNs;
148 				timeoutNs = timeoutNs < 0 ? 0 : timeoutNs;
149 				startNs = endNs;
150 			}
151 		}
152 
153 		//todo
154 		return false;
155 	}
156 }