1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.jph.channels.process;
15
16
17 import org.jph.channels.OutputChannel;
18
19 import java.util.concurrent.TimeUnit;
20
21 /***
22 * The StdProcessingOutputChannel is the default implementation of the
23 * ProcessingOutputChannel.
24 * <p/>
25 * The processing of messages is done on the Thread that offers/puts messages
26 * on this Channel and if the processing takes time, the Thread can`t do other
27 * tasks. If you don`t want this to happen, you could wrap this StdProcessingOutputChannel
28 * in a ActiveOutputChannel.
29 * <p/>
30 * Dealing with exceptions: if an exception occurs while processing the message,
31 * the exceptionHandler is called.
32 *
33 * @author Peter Veentjer.
34 */
35 public final class StdProcessingOutputChannel<Min,Mout> implements ProcessingOutputChannel<Min, Mout> {
36
37 private final OutputChannel<Mout> _target;
38 private final Processor<Min, Mout> _processor;
39 private final ProcessExceptionHandler<Min> _exceptionHandler;
40
41 /***
42 * Creates a new StdProcessingOutputChannel with a ThrowingProcessExceptionHandler.
43 *
44 * @param channel
45 * @param processor
46 * @throws NullPointerException if channel or processor is null.
47 */
48 public StdProcessingOutputChannel(OutputChannel<Mout> channel, Processor<Min, Mout> processor) {
49 this(channel, processor, new ThrowingProcessExceptionHandler<Min>());
50 }
51
52 /***
53 * @param channel
54 * @param processor
55 * @param exceptionHandler if this value is null, a ThrowingProcessExceptionHandler is used.
56 * @throws NullPointerException if channel or processor is null.
57 */
58 public StdProcessingOutputChannel(OutputChannel<Mout> channel, Processor<Min, Mout> processor, ProcessExceptionHandler<Min> exceptionHandler) {
59 if (channel == null || processor == null) throw new NullPointerException();
60
61 _processor = processor;
62 _target = channel;
63 _exceptionHandler = exceptionHandler == null ? new ThrowingProcessExceptionHandler<Min>() : exceptionHandler;
64 }
65
66 public Processor<Min, Mout> getMsgProcessor() {
67 return _processor;
68 }
69
70 public OutputChannel<Mout> getTarget() {
71 return _target;
72 }
73
74 public ProcessExceptionHandler<Min> getExceptionHandler() {
75 return _exceptionHandler;
76 }
77
78 public void put(Min msg) throws InterruptedException {
79 if (msg == null) throw new NullPointerException();
80
81 Mout mout = _processor.process(msg);
82 if (mout == null) {
83
84
85 return;
86 }
87
88 _target.put(mout);
89 }
90
91 public boolean offer(Min msg, long timeout, TimeUnit unit) throws InterruptedException {
92 if (msg == null || unit == null) throw new NullPointerException();
93
94 long timeoutNs = unit.toNanos(timeout);
95 long startNs = System.nanoTime();
96
97 Mout mout = _processor.process(msg);
98 if (mout == null) {
99
100
101 return true;
102 }
103
104
105
106 long endNs = System.nanoTime();
107 long elapsedNs = endNs - startNs;
108 timeoutNs -= elapsedNs;
109
110 return _target.offer(mout, timeoutNs, TimeUnit.NANOSECONDS);
111 }
112 }