소스 검색

AMBARI-21234. Ambari rack awareness for Kafka. (Ambud Sharma via stoader)

Ambud Sharma 8 년 전
부모
커밋
f22256e73a

+ 1 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metainfo.xml

@@ -104,6 +104,7 @@
                 </osSpecific>
             </osSpecifics>
             <restartRequiredAfterChange>true</restartRequiredAfterChange>
+            <restartRequiredAfterRackChange>true</restartRequiredAfterRackChange>
         </service>
     </services>
 </metainfo>

+ 10 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py

@@ -103,6 +103,16 @@ def kafka(upgrade_type=None):
 
     kafka_data_dir = kafka_server_config['log.dirs']
     kafka_data_dirs = filter(None, kafka_data_dir.split(","))
+
+    rack="/default-rack"
+    i=0
+    if len(params.all_racks) > 0:
+     for host in params.all_hosts:
+      if host == params.hostname:
+        rack=params.all_racks[i]
+        break
+      i=i+1
+
     Directory(kafka_data_dirs,
               mode=0755,
               cd_access='a',

+ 3 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py

@@ -105,6 +105,9 @@ zookeeper_hosts.sort()
 secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False)
 kafka_security_migrator = os.path.join(kafka_home, "bin", "zookeeper-security-migration.sh")
 
+all_hosts = default("/clusterHostInfo/all_hosts", [])
+all_racks = default("/clusterHostInfo/all_racks", [])
+
 #Kafka log4j
 kafka_log_maxfilesize = default('/configurations/kafka-log4j/kafka_log_maxfilesize',256)
 kafka_log_maxbackupindex = default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20)

+ 1 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.10.0/metainfo.xml

@@ -22,6 +22,7 @@
       <name>KAFKA</name>
       <extends>common-services/KAFKA/0.9.0</extends>
       <version>0.10.0</version>
+      <restartRequiredAfterRackChange>true</restartRequiredAfterRackChange>
     </service>
   </services>
 </metainfo>

+ 12 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py

@@ -103,6 +103,18 @@ def kafka(upgrade_type=None):
 
     kafka_data_dir = kafka_server_config['log.dirs']
     kafka_data_dirs = filter(None, kafka_data_dir.split(","))
+
+    rack="/default-rack"
+    i=0
+    if len(params.all_racks) > 0:
+     for host in params.all_hosts:
+      if host == params.hostname:
+        rack=params.all_racks[i]
+        break
+      i=i+1
+
+    kafka_server_config['broker.rack']=rack
+
     Directory(kafka_data_dirs,
               mode=0755,
               cd_access='a',

+ 3 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py

@@ -105,6 +105,9 @@ zookeeper_hosts.sort()
 secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False)
 kafka_security_migrator = os.path.join(kafka_home, "bin", "zookeeper-security-migration.sh")
 
+all_hosts = default("/clusterHostInfo/all_hosts", [])
+all_racks = default("/clusterHostInfo/all_racks", [])
+
 #Kafka log4j
 kafka_log_maxfilesize = default('/configurations/kafka-log4j/kafka_log_maxfilesize',256)
 kafka_log_maxbackupindex = default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20)