title: NiFi.java 源码解读
date: 2020-05-21
categories:

  • Apache NIFI
    tags:
  • Apache NIFI
    author: 张诚
    location: BeiJing
    publish: true
    sticky:

在RunNiFi.java源码解读中有提到,最终RunNiFi进程在主程序中启动了新的进程NiFi,并循环监听NIFI进程的状态,直到NIFI进程不在运行,RunNiFi主程序才结束。

以下便是NIFI进程的入口类,从main方法开始即可,关键地方有注释。(自己跟着源码逻辑读更好)

  1. package org.apache.nifi;
  2. public class NiFi {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class);
  4. private static final String KEY_FILE_FLAG = "-K";
  5. private final NiFiServer nifiServer;
  6. private final BootstrapListener bootstrapListener;
  7. // RunNiFi进程的Socket监听端口 进程间通信
  8. public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
  9. //标记 是否关闭
  10. private volatile boolean shutdown = false;
  11. // nifi.properties
  12. public NiFi(final NiFiProperties properties)
  13. throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
  14. this(properties, ClassLoader.getSystemClassLoader());
  15. }
  16. public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
  17. throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
  18. // 整个Java进程只能有一个krb5.conf,所以在启动期间全局设置它,这样处理器和Kerberos身份验证代码就不必设置它
  19. final File kerberosConfigFile = properties.getKerberosConfigurationFile();
  20. if (kerberosConfigFile != null) {
  21. final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
  22. LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath});
  23. System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
  24. }
  25. setDefaultUncaughtExceptionHandler();
  26. // 注册 shutdown hook
  27. addShutdownHook();
  28. //RunNiFi 启动NIFI时设置 RunNIFI进程 的Socket的监听端口 NIFI进程将本进程的Socket监听端口和pid传 给RunNIFI ,RunNIFI便可以传达指令
  29. final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
  30. if (bootstrapPort != null) {
  31. try {
  32. final int port = Integer.parseInt(bootstrapPort);
  33. if (port < 1 || port > 65535) {
  34. throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
  35. }
  36. bootstrapListener = new BootstrapListener(this, port);
  37. bootstrapListener.start();
  38. } catch (final NumberFormatException nfe) {
  39. throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
  40. }
  41. } else {
  42. LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
  43. bootstrapListener = null;
  44. }
  45. //删除web工作目录——如果应用程序没有成功启动,web应用程序目录可能处于无效状态。
  46. //当这种情况发生时,jetty将不会尝试重新将war解压缩到目录中。
  47. //通过删除工作目录,我们可以确信它将尝试在每次应用程序启动时提取war。
  48. // nifi.web.jetty.working.directory= 默认值:./work/jetty
  49. File webWorkingDir = properties.getWebWorkingDirectory();
  50. FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
  51. FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
  52. //确定我们运行的机器是否有时间问题。
  53. detectTimingIssues();
  54. // 重定向JUL日志事件
  55. initLogging();
  56. // 这里可以看另一篇讲解 【NIFI nar包加载机制源码解读】
  57. // nifi.nar.library.directory=./lib 获取lib bundle
  58. final Bundle systemBundle = SystemBundle.create(properties);
  59. // 期间过滤了非nar包的文件 解压nar到 /work/nar/framework /work/nar/extendings /work/docs/components
  60. // 解压doc file
  61. // 读取 各个JarFile META-INF/services/org.apache.nifi.processor.Processor META-INF/services/org.apache.nifi.reporting.ReportingTask META-INF/services/org.apache.nifi.controller.ControllerService
  62. // 返回的extensionMapping有三个 Map 分别存了Processor ControllerService ReportingTask
  63. final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
  64. // 获取 extensions classloaders (单例)
  65. NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();
  66. //初始化 为所有的nar包创建唯一 的类加载器
  67. //
  68. narClassLoaders.init(rootClassLoader,
  69. properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
  70. // load the framework classloader
  71. final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
  72. if (frameworkClassLoader == null) {
  73. throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
  74. }
  75. //此集合是有序的 首先是framework 其次其余nar包按 依赖,被依赖在先 ,Bundle中有此nar包各种信息以及nar的类加载器
  76. final Set<Bundle> narBundles = narClassLoaders.getBundles();
  77. // frameworkClassLoader类加载器加载framework bundle(nifi-framework-nar)
  78. Thread.currentThread().setContextClassLoader(frameworkClassLoader);
  79. // 顾名思义 其中启用 了Jetty
  80. Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
  81. Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);
  82. final long startTime = System.nanoTime();
  83. nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
  84. nifiServer.setExtensionMapping(extensionMapping);
  85. nifiServer.setBundles(systemBundle, narBundles);
  86. if (shutdown) {
  87. LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
  88. } else {
  89. //余下的启动就交给JettyServer了
  90. nifiServer.start();
  91. if (bootstrapListener != null) {
  92. bootstrapListener.sendStartedStatus(true);
  93. }
  94. final long duration = System.nanoTime() - startTime;
  95. LOGGER.info("Controller initialization took " + duration + " nanoseconds "
  96. + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
  97. }
  98. }
  99. protected void setDefaultUncaughtExceptionHandler() {
  100. Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
  101. @Override
  102. public void uncaughtException(final Thread t, final Throwable e) {
  103. LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
  104. LOGGER.error("", e);
  105. }
  106. });
  107. }
  108. protected void addShutdownHook() {
  109. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  110. @Override
  111. public void run() {
  112. // shutdown the jetty server
  113. shutdownHook();
  114. }
  115. }));
  116. }
  117. protected void initLogging() {
  118. SLF4JBridgeHandler.removeHandlersForRootLogger();
  119. SLF4JBridgeHandler.install();
  120. }
  121. private static ClassLoader createBootstrapClassLoader() {
  122. //获取lib文件夹中的文件列表
  123. final List<URL> urls = new ArrayList<>();
  124. try {
  125. Files.list(Paths.get("lib/bootstrap")).forEach(p -> {
  126. try {
  127. urls.add(p.toUri().toURL());
  128. } catch (final MalformedURLException mef) {
  129. LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef);
  130. }
  131. });
  132. } catch (IOException ioe) {
  133. LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe);
  134. }
  135. //创建bootstrap classloader
  136. return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
  137. }
  138. protected void shutdownHook() {
  139. try {
  140. shutdown();
  141. } catch (final Throwable t) {
  142. LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
  143. }
  144. }
  145. protected void shutdown() {
  146. this.shutdown = true;
  147. LOGGER.info("Initiating shutdown of Jetty web server...");
  148. if (nifiServer != null) {
  149. nifiServer.stop();
  150. }
  151. if (bootstrapListener != null) {
  152. bootstrapListener.stop();
  153. }
  154. LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
  155. }
  156. /**
  157. * 确定我们运行的机器是否有时间问题。
  158. */
  159. private void detectTimingIssues() {
  160. final int minRequiredOccurrences = 25;
  161. final int maxOccurrencesOutOfRange = 15;
  162. final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
  163. final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
  164. private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
  165. @Override
  166. public Thread newThread(final Runnable r) {
  167. final Thread t = defaultFactory.newThread(r);
  168. t.setDaemon(true);
  169. t.setName("Detect Timing Issues");
  170. return t;
  171. }
  172. });
  173. final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
  174. final AtomicInteger occurrences = new AtomicInteger(0);
  175. final Runnable command = new Runnable() {
  176. @Override
  177. public void run() {
  178. final long curMillis = System.currentTimeMillis();
  179. final long difference = curMillis - lastTriggerMillis.get();
  180. final long millisOff = Math.abs(difference - 2000L);
  181. occurrences.incrementAndGet();
  182. if (millisOff > 500L) {
  183. occurrencesOutOfRange.incrementAndGet();
  184. }
  185. lastTriggerMillis.set(curMillis);
  186. }
  187. };
  188. final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
  189. final TimerTask timerTask = new TimerTask() {
  190. @Override
  191. public void run() {
  192. future.cancel(true);
  193. service.shutdownNow();
  194. if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
  195. LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
  196. + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
  197. }
  198. }
  199. };
  200. final Timer timer = new Timer(true);
  201. timer.schedule(timerTask, 60000L);
  202. }
  203. /**
  204. * 应用程序的主要入口点。
  205. */
  206. public static void main(String[] args) {
  207. LOGGER.info("Launching NiFi...");
  208. try {
  209. NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
  210. new NiFi(properties);
  211. } catch (final Throwable t) {
  212. LOGGER.error("Failure to launch NiFi due to " + t, t);
  213. }
  214. }
  215. protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) {
  216. final ClassLoader bootstrap = createBootstrapClassLoader();
  217. NiFiProperties properties = initializeProperties(args, bootstrap);
  218. properties.validate();
  219. return properties;
  220. }
  221. private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
  222. // Try to get key
  223. // If key doesn't exist, instantiate without
  224. // Load properties
  225. // If properties are protected and key missing, throw RuntimeException
  226. final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
  227. final String key;
  228. try {
  229. key = loadFormattedKey(args);
  230. // The key might be empty or null when it is passed to the loader
  231. } catch (IllegalArgumentException e) {
  232. final String msg = "The bootstrap process did not provide a valid key";
  233. throw new IllegalArgumentException(msg, e);
  234. }
  235. Thread.currentThread().setContextClassLoader(boostrapLoader);
  236. try {
  237. final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader);
  238. final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class);
  239. final Object loaderInstance = withKeyMethod.invoke(null, key);
  240. final Method getMethod = propsLoaderClass.getMethod("get");
  241. final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
  242. LOGGER.info("Loaded {} properties", properties.size());
  243. return properties;
  244. } catch (InvocationTargetException wrappedException) {
  245. final String msg = "There was an issue decrypting protected properties";
  246. throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause());
  247. } catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) {
  248. final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue";
  249. throw new IllegalArgumentException(msg, reex);
  250. } catch (final RuntimeException e) {
  251. final String msg = "There was an issue decrypting protected properties";
  252. throw new IllegalArgumentException(msg, e);
  253. } finally {
  254. Thread.currentThread().setContextClassLoader(contextClassLoader);
  255. }
  256. }
  257. private static String loadFormattedKey(String[] args) {
  258. String key = null;
  259. List<String> parsedArgs = parseArgs(args);
  260. // Check if args contain protection key
  261. if (parsedArgs.contains(KEY_FILE_FLAG)) {
  262. key = getKeyFromKeyFileAndPrune(parsedArgs);
  263. // Format the key (check hex validity and remove spaces)
  264. key = formatHexKey(key);
  265. }
  266. if (null == key) {
  267. return "";
  268. } else if (!isHexKeyValid(key)) {
  269. throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
  270. } else {
  271. return key;
  272. }
  273. }
  274. private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) {
  275. String key = null;
  276. LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag");
  277. int i = parsedArgs.indexOf(KEY_FILE_FLAG);
  278. if (parsedArgs.size() <= i + 1) {
  279. LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG);
  280. throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key");
  281. }
  282. try {
  283. String passwordfile_path = parsedArgs.get(i + 1);
  284. // Slurp in the contents of the file:
  285. byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path));
  286. key = new String(encoded,StandardCharsets.UTF_8);
  287. if (0 == key.length())
  288. throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key");
  289. LOGGER.info("Now overwriting file in "+passwordfile_path);
  290. // Overwrite the contents of the file (to avoid littering file system
  291. // unlinked with key material):
  292. File password_file = new File(passwordfile_path);
  293. FileWriter overwriter = new FileWriter(password_file,false);
  294. // Construe a random pad:
  295. Random r = new Random();
  296. StringBuffer sb = new StringBuffer();
  297. // Note on correctness: this pad is longer, but equally sufficient.
  298. while(sb.length() < encoded.length){
  299. sb.append(Integer.toHexString(r.nextInt()));
  300. }
  301. String pad = sb.toString();
  302. LOGGER.info("Overwriting key material with pad: "+pad);
  303. overwriter.write(pad);
  304. overwriter.close();
  305. LOGGER.info("Removing/unlinking file: "+passwordfile_path);
  306. password_file.delete();
  307. } catch (IOException e) {
  308. LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString());
  309. System.exit(1);
  310. }
  311. LOGGER.info("Read property protection key from key file provided by bootstrap process");
  312. return key;
  313. }
  314. private static List<String> parseArgs(String[] args) {
  315. List<String> parsedArgs = new ArrayList<>(Arrays.asList(args));
  316. for (int i = 0; i < parsedArgs.size(); i++) {
  317. if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) {
  318. String[] split = parsedArgs.get(i).split(" ", 2);
  319. parsedArgs.set(i, split[0]);
  320. parsedArgs.add(i + 1, split[1]);
  321. break;
  322. }
  323. }
  324. return parsedArgs;
  325. }
  326. private static String formatHexKey(String input) {
  327. if (input == null || input.trim().isEmpty()) {
  328. return "";
  329. }
  330. return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase();
  331. }
  332. private static boolean isHexKeyValid(String key) {
  333. if (key == null || key.trim().isEmpty()) {
  334. return false;
  335. }
  336. // Key length is in "nibbles" (i.e. one hex char = 4 bits)
  337. return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$");
  338. }
  339. }

公众号

关注公众号 得到第一手文章/文档更新推送。

NiFi源码 - 图1