POM

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>aliyun-log</artifactId>
  4. <version>0.6.38</version>
  5. </dependency>

Java

  1. import com.aliyun.openservices.log.Client;
  2. import com.aliyun.openservices.log.common.Consts.CursorMode;
  3. import com.aliyun.openservices.log.common.*;
  4. import com.aliyun.openservices.log.exception.LogException;
  5. import com.aliyun.openservices.log.request.*;
  6. import com.aliyun.openservices.log.response.BatchGetLogResponse;
  7. import com.aliyun.openservices.log.response.GetCursorResponse;
  8. import com.aliyun.openservices.log.response.GetHistogramsResponse;
  9. import com.aliyun.openservices.log.response.GetLogsResponse;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. public class SdkSample {
  13. public static void main(String[] args) throws LogException, InterruptedException {
  14. // 选择与上面步骤创建 project 所属区域匹配的Endpoint
  15. String endpoint = "<log_service_endpoint>";
  16. // 使用您的阿里云访问密钥 AccessKeyId
  17. String accessKeyId = "<your_access_key_id>";
  18. // 使用您的阿里云访问密钥AccessKeySecret
  19. String accessKeySecret = "<your_access_key_secret>";
  20. // 上面步骤创建的项目名称
  21. String project = "<project_name>";
  22. // 上面步骤创建的日志库名称
  23. String logstore = "<logstore_name>";
  24. // 构建一个客户端实例
  25. Client client = new Client(endpoint, accessKeyId, accessKeySecret);
  26. // 列出当前 project 下的所有日志库名称
  27. int offset = 0;
  28. int size = 100;
  29. String logstoreSubName = "";
  30. ListLogStoresRequest req1 = new ListLogStoresRequest(project, offset, size, logstoreSubName);
  31. List<String> logstores = client.ListLogStores(req1).GetLogstores();
  32. System.out.println("ListLogs:" + logstores.toString() + "\n");
  33. // 写入日志
  34. String topic = "";
  35. String source = "";
  36. // 连续发送 10 个数据包,每个数据包有 10 条日志
  37. for (int i = 0; i < 10; i++) {
  38. List<LogItem> logGroup = new ArrayList<LogItem>();
  39. for (int j = 0; j < 10; j++) {
  40. LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
  41. logItem.PushBack("index" + String.valueOf(j), String.valueOf(i * 10 + j));
  42. logGroup.add(logItem);
  43. }
  44. PutLogsRequest req2 = new PutLogsRequest(project, logstore, topic, source, logGroup);
  45. client.PutLogs(req2);
  46. /*
  47. * 发送的时候也可以指定将数据发送至有一个特定的 shard,只要设置 shard 的 hashkey,则数据会写入包含该
  48. * hashkey 的 range 所对应的 shard,具体 API 参考以下接口: public PutLogsResponse
  49. * PutLogs( String project, String logstore, String topic,
  50. * List<LogItem> logItems, String source, String shardHash // 根据
  51. * hashkey 确定写入 shard,hashkey 可以是 MD5(ip) 或 MD5(id) 等 );
  52. */
  53. }
  54. // 把 0 号 shard 中,最近 1 分钟写入的数据都读取出来。
  55. int shardId = 0;
  56. long curTimeInSec = System.currentTimeMillis() / 1000;
  57. GetCursorResponse cursorRes = client.GetCursor(project, logstore, shardId, curTimeInSec - 60);
  58. String beginCursor = cursorRes.GetCursor();
  59. cursorRes = client.GetCursor(project, logstore, shardId, CursorMode.END);
  60. String endCursor = cursorRes.GetCursor();
  61. String curCursor = beginCursor;
  62. while (!curCursor.equals(endCursor)) {
  63. // 每次读取两个LogGroup
  64. int logGroupCount = 2;
  65. BatchGetLogResponse logDataRes = client.BatchGetLog(project, logstore, shardId, logGroupCount, curCursor,
  66. endCursor);
  67. // 读取LogGroup的List
  68. List<LogGroupData> logGroups = logDataRes.GetLogGroups();
  69. for (LogGroupData logGroup : logGroups) {
  70. FastLogGroup flg = logGroup.GetFastLogGroup();
  71. System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
  72. flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
  73. System.out.println("Tags");
  74. for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
  75. FastLogTag logtag = flg.getLogTags(tagIdx);
  76. System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
  77. }
  78. for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
  79. FastLog log = flg.getLogs(lIdx);
  80. System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
  81. for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
  82. FastLogContent content = log.getContents(cIdx);
  83. System.out.println(content.getKey() + "\t:\t" + content.getValue());
  84. }
  85. }
  86. }
  87. String nextCursor = logDataRes.GetNextCursor();
  88. System.out.println("The Next cursor:" + nextCursor);
  89. curCursor = nextCursor;
  90. }
  91. // !!!重要提示 : 只有打开索引功能,才能调用以下接口 !!!
  92. // 等待 1 分钟让日志可查询
  93. try {
  94. Thread.sleep(60 * 1000);
  95. } catch (InterruptedException e) {
  96. e.printStackTrace();
  97. }
  98. // 查询日志分布情况
  99. String query = "<此处为需要查询的关键词,如果查询全部内容设置为空字符串即可>";
  100. int from = (int) (System.currentTimeMillis() / 1000 - 300);
  101. int to = (int) (System.currentTimeMillis() / 1000);
  102. GetHistogramsResponse res3 = null;
  103. while (true) {
  104. GetHistogramsRequest req3 = new GetHistogramsRequest(project, logstore, topic, query, from, to);
  105. res3 = client.GetHistograms(req3);
  106. // IsCompleted() 返回
  107. if (res3 != null && res3.IsCompleted())
  108. // 如果返回true,表示查询结果是准确的,如果返回false,则重复查询
  109. {
  110. break;
  111. }
  112. Thread.sleep(200);
  113. }
  114. System.out.println("Total count of logs is " + res3.GetTotalCount());
  115. for (Histogram ht : res3.GetHistograms()) {
  116. System.out.printf("from %d, to %d, count %d.\n", ht.GetFrom(), ht.GetTo(), ht.GetCount());
  117. }
  118. // 查询日志数据
  119. long totalLogLines = res3.GetTotalCount();
  120. int logOffset = 0;
  121. //logLine 最大值为100,每次获取100行数据。若需要读取更多数据,请使用offset翻页。
  122. // offset和lines只对关键字查询有效,若使用SQL查询,则无效。在SQL查询中返回更多数据,请使用limit语法。
  123. int logLine = 10;
  124. while (logOffset <= totalLogLines) {
  125. GetLogsResponse res4 = null;
  126. // 对于每个 log offset,一次读取 10 行 log,如果读取失败,最多重复读取 3 次。
  127. for (int retryTime = 0; retryTime < 3; retryTime++) {
  128. GetLogsRequest req4 = new GetLogsRequest(project, logstore, from, to, topic, query, logOffset,
  129. logLine, false);
  130. res4 = client.GetLogs(req4);
  131. if (res4 != null && res4.IsCompleted()) {
  132. break;
  133. }
  134. Thread.sleep(200);
  135. }
  136. assert res4 != null;
  137. System.out.println("Read log count:" + res4.GetCount());
  138. logOffset += logLine;
  139. }
  140. //打开分析功能,只有打开分析功能,才能使用SQL 功能。 可以在控制台开通分析功能,也可以使用SDK开启分析功能
  141. IndexKeys indexKeys = new IndexKeys();
  142. ArrayList<String> tokens = new ArrayList<String>();
  143. tokens.add(",");
  144. tokens.add(".");
  145. tokens.add("#");
  146. IndexKey keyContent = new IndexKey(tokens, false, "text");
  147. indexKeys.AddKey("index0", keyContent);
  148. keyContent = new IndexKey(new ArrayList<String>(), false, "long");
  149. indexKeys.AddKey("index1", keyContent);
  150. keyContent = new IndexKey(new ArrayList<String>(), false, "double");
  151. indexKeys.AddKey("index2", keyContent);
  152. IndexLine indexLine = new IndexLine(new ArrayList<String>(), false);
  153. Index index = new Index(7, indexKeys, indexLine);
  154. CreateIndexRequest createIndexRequest = new CreateIndexRequest(project, logstore, index);
  155. client.CreateIndex(createIndexRequest);
  156. //使用分析功能
  157. GetLogsRequest req4 = new GetLogsRequest(project, logstore, from, to, "", " index0:value | select avg(index1) as v1,sum(index2) as v2, index0 group by index0");
  158. GetLogsResponse res4 = client.GetLogs(req4);
  159. if (res4 != null && res4.IsCompleted()) {
  160. for (QueriedLog log : res4.GetLogs()) {
  161. LogItem item = log.GetLogItem();
  162. for (LogContent content : item.GetLogContents()) {
  163. System.out.print(content.GetKey() + ":" + content.GetValue());
  164. }
  165. System.out.println();
  166. }
  167. }
  168. }
  169. }

然后呢

  • 在后台可以在后台看到和读取数据。

参考资料