Duplicate the SubAlarm before emitting it

This prevents Storm from throwing a ConcurrentModificationException if
the SubAlarm's state changes soon after the emit

Change-Id: Idc0de8a0ef6d13bce800e4e8a4e13e43cdf1c010
Closes-Bug: #1548999
This commit is contained in:
Craig Bryant 2016-02-24 08:40:50 -07:00
parent 698b9a5d2d
commit 4e333d5fe4
1 changed files with 18 additions and 2 deletions

View File

@ -219,8 +219,24 @@ public class MetricAggregationBolt extends BaseRichBolt {
private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) {
logger.debug("Alarm state changed for {}", subAlarmStats);
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats
.getSubAlarm()));
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), duplicate(subAlarmStats
.getSubAlarm())));
}
/**
* Create a copy of SubAlarm to prevent ConcurrentModificationExceptions thrown by Storm.
* The AlarmSubExpression is not immutable, but since it is only replaced in this Bolt,
* it can be reused
* @param original SubAlarm to be duplicated
* @return copy of original
*/
public SubAlarm duplicate(final SubAlarm original) {
final SubAlarm newSubAlarm =
new SubAlarm(original.getId(), original.getAlarmId(), new SubExpression(
original.getAlarmSubExpressionId(), original.getExpression()), original.getState());
newSubAlarm.setNoState(original.isNoState());
newSubAlarm.setSporadicMetric(original.isSporadicMetric());
return newSubAlarm;
}
/**