View Javadoc

1   //  Copyright 2005 The Apache Software Foundation
2   //
3   // Licensed under the Apache License, Version 2.0 (the "License");
4   // you may not use this file except in compliance with the License.
5   // You may obtain a copy of the License at
6   //
7   //     http://www.apache.org/licenses/LICENSE-2.0
8   //
9   // Unless required by applicable law or agreed to in writing, software
10  // distributed under the License is distributed on an "AS IS" BASIS,
11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  // See the License for the specific language governing permissions and
13  // limitations under the License. 
14  package org.jph.channels.process;
15  
16  
17  import org.jph.channels.OutputChannel;
18  
19  import java.util.concurrent.TimeUnit;
20  
21  /***
22   * The StdProcessingOutputChannel is the default implementation of the
23   * ProcessingOutputChannel.
24   * <p/>
25   * The processing of messages is done on the Thread that offers/puts messages
26   * on this Channel and if the processing takes time, the Thread can`t do other
27   * tasks. If you don`t want this to happen, you could wrap this StdProcessingOutputChannel
28   * in a ActiveOutputChannel.
29   * <p/>
30   * Dealing with exceptions: if an exception occurs while processing the message,
31   * the exceptionHandler is called.
32   *
33   * @author Peter Veentjer.
34   */
35  public final class StdProcessingOutputChannel<Min,Mout> implements ProcessingOutputChannel<Min, Mout> {
36  
37  	private final OutputChannel<Mout> _target;
38  	private final Processor<Min, Mout> _processor;
39  	private final ProcessExceptionHandler<Min> _exceptionHandler;
40  
41  	/***
42  	 * Creates a new StdProcessingOutputChannel with a ThrowingProcessExceptionHandler.
43  	 *
44  	 * @param channel
45  	 * @param processor
46  	 * @throws NullPointerException if channel or processor is null.
47  	 */
48  	public StdProcessingOutputChannel(OutputChannel<Mout> channel, Processor<Min, Mout> processor) {
49  		this(channel, processor, new ThrowingProcessExceptionHandler<Min>());
50  	}
51  
52  	/***
53  	 * @param channel
54  	 * @param processor
55  	 * @param exceptionHandler if this value is null, a ThrowingProcessExceptionHandler is used.
56  	 * @throws NullPointerException if channel or processor is null.
57  	 */
58  	public StdProcessingOutputChannel(OutputChannel<Mout> channel, Processor<Min, Mout> processor, ProcessExceptionHandler<Min> exceptionHandler) {
59  		if (channel == null || processor == null) throw new NullPointerException();
60  
61  		_processor = processor;
62  		_target = channel;
63  		_exceptionHandler = exceptionHandler == null ? new ThrowingProcessExceptionHandler<Min>() : exceptionHandler;
64  	}
65  
66  	public Processor<Min, Mout> getMsgProcessor() {
67  		return _processor;
68  	}
69  
70  	public OutputChannel<Mout> getTarget() {
71  		return _target;
72  	}
73  
74  	public ProcessExceptionHandler<Min> getExceptionHandler() {
75  		return _exceptionHandler;
76  	}
77  
78  	public void put(Min msg) throws InterruptedException {
79  		if (msg == null) throw new NullPointerException();
80  
81  		Mout mout = _processor.process(msg);
82  		if (mout == null) {
83  			//if null is returned, the processor is responsible for dealing with the msg, but
84  			//the message is put succesfully.
85  			return;
86  		}
87  
88  		_target.put(mout);
89  	}
90  
91  	public boolean offer(Min msg, long timeout, TimeUnit unit) throws InterruptedException {
92  		if (msg == null || unit == null) throw new NullPointerException();
93  
94  		long timeoutNs = unit.toNanos(timeout);
95  		long startNs = System.nanoTime();
96  
97  		Mout mout = _processor.process(msg);
98  		if (mout == null) {
99  			//if null is returned, the processor is responsible for dealing with the msg, but
100 			//the message is offered succesfully to this channel.
101 			return true;
102 		}
103 
104 		//The message has to be put on the target channel.
105 
106 		long endNs = System.nanoTime();
107 		long elapsedNs = endNs - startNs;
108 		timeoutNs -= elapsedNs;
109 
110 		return _target.offer(mout, timeoutNs, TimeUnit.NANOSECONDS);
111 	}
112 }