Apache Storm

Topology Permissions

By default, only the user who deploys the topology has access to admin operations such as rebalance, activate, deactivate, and kill.

The following properties can be set in the topology configuration:

Configuration Description
topology.users/groups This allows the users/groups specified to act as owners of the topology. This allows users to perform topology admin operations such as rebalance, activate, deactivate, and kill.
logs.users/groups This allows the users/groups specified to look at the logs of the topology.

The configurations above can also be set at the cluster level. They will be the default configuration if the topology does not override it.

There is one cluster level configuration that will enable a set of users to be admins for the whole Storm cluster.

Configuration Description
nimbus.admins These users will have super user permissions on all topologies deployed. They will be able to perform other admin operations (such as rebalance, activate, deactivate and kill), even if they are not the owners of the topology.

Test Spout

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

class MySpout implements IRichSpout {
  private int i = 0;
  private Map conf;
  private TopologyContext context;
  private SpoutOutputCollector collector;

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.conf = conf;
    this.context = context;
    this.collector = collector;
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("key", "message"));
  }

  @Override
  public void close() {}

  @Override
  public void activate() {}

  @Override
  public void deactivate() {}

  @Override
  public void nextTuple() {
    if(this.i < 5) {
      this.collector.emit(new Values("key" + i, "val" + i), i);
      this.i++;
    }
  }

  @Override
  public void ack(Object o) {}

  @Override
  public void fail(Object o) {}

  @Override
  public Map<String, Object> getComponentConfiguration() {
    return null;
  }
}

results matching ""

    No results matching ""