最近在分析一个需求,需要开发一个采集器的调度框架,实现采集器的注册,离线以及采集任务分配(负载均衡)。
采集器用于登录到网络设备上采集数据,部分运营商考虑到设备性能问题,会限制同时只能有一个用户登录设备查询数据。那么在此限制下,分配采集任务时需要保证:
- 对于同一设备的任务始终都落在同一个采集器上去执行,才能保证同一时刻对于同一设备不会有多个采集器采集。
- 而且,需要保证在某个采集器失效离线时,之前落在该采集器上的设备列表需要均匀分布到剩下的采集器上去,不至于造成某一个采集器负载过大。
到这里,实现方案已经呼之欲出,这不就是解决分布式缓存问题的套路么 — 一致性哈希算法,可以参考这篇文章进行了解 《一致性哈希算法及其在分布式系统中的应用》
0x01 接口定义
首先,对几个关键角色进行接口抽象
网络设备
1 2 3 4 5 6 7 8 9 10
|
public interface Device {
String getIp(); }
|
采集器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public interface Collector {
String getIp();
void setIp(String ip);
Map<String, Object> collect(Device device, List<String> commands); }
|
集群
这里把采集器的调度框架抽象成集群,并且使用泛型来定义集群接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
public interface Cluster<T> {
void register(T t);
void offline(T t);
T choose(String ip); }
|
0x02 算法实现
Hash算法选择
在选择设备对应的采集器时,需要对设备的IP进行hash计算。由于设备的IP前缀基本一致,使用默认的字符串hash算法会导致计算出来的hash值不够离散,只能落在hash环上很小的一段区间。因此需要重新选择一种hash算法,保证字符串hash的离散性。这里使用FNV1_32_HASH算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
private static int rehash(String str) { final int p = 16777619; int hash = (int) 2166136261L; for (int i = 0; i < str.length(); i++) { hash = (hash ^ str.charAt(i)) * p; } hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5;
if (hash < 0) { hash = Math.abs(hash); } return hash; }
|
具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
public class CollectorCluster implements Cluster<Collector> {
private static final int VIRTUAL_NODE_NUMBER = 320;
@GuardedBy("clusterLock") private Map<Collector, int[]> registeredServers = new HashMap<>();
@GuardedBy("clusterLock") private TreeMap<Integer, Collector> hashRingMap = new TreeMap<>();
private ReadWriteLock clusterLock = new ReentrantReadWriteLock();
@Override public void register(Collector collector) { System.out.println("add server " + collector.toString());
final Lock lock = clusterLock.writeLock(); lock.lock(); try { int[] nodesHash = new int[VIRTUAL_NODE_NUMBER]; for (int i = 0; i < VIRTUAL_NODE_NUMBER; i++) { int hash = CollectorCluster.rehash(collector.getIp() + ":" + i); nodesHash[i] = hash; hashRingMap.put(hash, collector); } registeredServers.put(collector, nodesHash); } finally { lock.unlock(); } }
@Override public void offline(Collector collector) { System.out.println("delete server " + collector.toString());
final Lock lock = clusterLock.writeLock(); lock.lock(); try { for (int hash : registeredServers.get(collector)) { hashRingMap.remove(hash); } registeredServers.remove(collector); } finally { lock.unlock(); } }
@Override public Collector choose(String deviceIp) { final int hash = rehash(deviceIp);
final Lock lock = clusterLock.readLock(); lock.lock(); try { Map.Entry<Integer, Collector> entry = hashRingMap.floorEntry(hash); Collector collector = entry == null ? hashRingMap.lastEntry().getValue() : entry.getValue();
System.out.println(deviceIp + " --> " + collector); return collector; } finally { lock.unlock(); } } }
|
0x03 测试
首先实现一个具体的采集器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
public class DefaultCollector implements Collector { private String ip;
DefaultCollector(String ip) { this.ip = ip; }
@Override public String getIp() { return ip; }
@Override public void setIp(String ip) { this.ip = ip; }
@Override public Map<String, Object> collect(Device ne, List<String> commands) { return new HashMap<>(commands.size()); }
@Override public String toString() { return "DefaultCollector{" + "ip='" + ip + '\'' + '}'; }
@Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } DefaultCollector that = (DefaultCollector) o; return Objects.equals(ip, that.ip); }
@Override public int hashCode() {
return CollectorCluster.rehash(ip); } }
|
然后直接在CollectorCluster类中增加一个main函数来测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void main(String[] args) { CollectorCluster cluster = new CollectorCluster();
List.of("192.168.0.1", "192.168.0.2", "192.168.0.3", "192.168.0.4", "192.168.0.5") .stream() .map(DefaultCollector::new) .forEach(cluster::register);
String ipPrefix = "136.10.1."; Stream.iterate(1, i -> i + 1).limit(20).map(i -> ipPrefix + i).forEach(cluster::choose);
System.out.println("============"); cluster.offline(new DefaultCollector("192.168.0.5")); Stream.iterate(1, i -> i + 1).limit(20).map(i -> ipPrefix + i).forEach(cluster::choose); }
|