POM
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.38</version>
</dependency>
Java
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;
import com.aliyun.openservices.log.response.BatchGetLogResponse;
import com.aliyun.openservices.log.response.GetCursorResponse;
import com.aliyun.openservices.log.response.GetHistogramsResponse;
import com.aliyun.openservices.log.response.GetLogsResponse;
import java.util.ArrayList;
import java.util.List;
public class SdkSample {
public static void main(String[] args) throws LogException, InterruptedException {
// 选择与上面步骤创建 project 所属区域匹配的Endpoint
String endpoint = "<log_service_endpoint>";
// 使用您的阿里云访问密钥 AccessKeyId
String accessKeyId = "<your_access_key_id>";
// 使用您的阿里云访问密钥AccessKeySecret
String accessKeySecret = "<your_access_key_secret>";
// 上面步骤创建的项目名称
String project = "<project_name>";
// 上面步骤创建的日志库名称
String logstore = "<logstore_name>";
// 构建一个客户端实例
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 列出当前 project 下的所有日志库名称
int offset = 0;
int size = 100;
String logstoreSubName = "";
ListLogStoresRequest req1 = new ListLogStoresRequest(project, offset, size, logstoreSubName);
List<String> logstores = client.ListLogStores(req1).GetLogstores();
System.out.println("ListLogs:" + logstores.toString() + "\n");
// 写入日志
String topic = "";
String source = "";
// 连续发送 10 个数据包,每个数据包有 10 条日志
for (int i = 0; i < 10; i++) {
List<LogItem> logGroup = new ArrayList<LogItem>();
for (int j = 0; j < 10; j++) {
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("index" + String.valueOf(j), String.valueOf(i * 10 + j));
logGroup.add(logItem);
}
PutLogsRequest req2 = new PutLogsRequest(project, logstore, topic, source, logGroup);
client.PutLogs(req2);
/*
* 发送的时候也可以指定将数据发送至有一个特定的 shard,只要设置 shard 的 hashkey,则数据会写入包含该
* hashkey 的 range 所对应的 shard,具体 API 参考以下接口: public PutLogsResponse
* PutLogs( String project, String logstore, String topic,
* List<LogItem> logItems, String source, String shardHash // 根据
* hashkey 确定写入 shard,hashkey 可以是 MD5(ip) 或 MD5(id) 等 );
*/
}
// 把 0 号 shard 中,最近 1 分钟写入的数据都读取出来。
int shardId = 0;
long curTimeInSec = System.currentTimeMillis() / 1000;
GetCursorResponse cursorRes = client.GetCursor(project, logstore, shardId, curTimeInSec - 60);
String beginCursor = cursorRes.GetCursor();
cursorRes = client.GetCursor(project, logstore, shardId, CursorMode.END);
String endCursor = cursorRes.GetCursor();
String curCursor = beginCursor;
while (!curCursor.equals(endCursor)) {
// 每次读取两个LogGroup
int logGroupCount = 2;
BatchGetLogResponse logDataRes = client.BatchGetLog(project, logstore, shardId, logGroupCount, curCursor,
endCursor);
// 读取LogGroup的List
List<LogGroupData> logGroups = logDataRes.GetLogGroups();
for (LogGroupData logGroup : logGroups) {
FastLogGroup flg = logGroup.GetFastLogGroup();
System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
System.out.println("Tags");
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
FastLog log = flg.getLogs(lIdx);
System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
String nextCursor = logDataRes.GetNextCursor();
System.out.println("The Next cursor:" + nextCursor);
curCursor = nextCursor;
}
// !!!重要提示 : 只有打开索引功能,才能调用以下接口 !!!
// 等待 1 分钟让日志可查询
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 查询日志分布情况
String query = "<此处为需要查询的关键词,如果查询全部内容设置为空字符串即可>";
int from = (int) (System.currentTimeMillis() / 1000 - 300);
int to = (int) (System.currentTimeMillis() / 1000);
GetHistogramsResponse res3 = null;
while (true) {
GetHistogramsRequest req3 = new GetHistogramsRequest(project, logstore, topic, query, from, to);
res3 = client.GetHistograms(req3);
// IsCompleted() 返回
if (res3 != null && res3.IsCompleted())
// 如果返回true,表示查询结果是准确的,如果返回false,则重复查询
{
break;
}
Thread.sleep(200);
}
System.out.println("Total count of logs is " + res3.GetTotalCount());
for (Histogram ht : res3.GetHistograms()) {
System.out.printf("from %d, to %d, count %d.\n", ht.GetFrom(), ht.GetTo(), ht.GetCount());
}
// 查询日志数据
long totalLogLines = res3.GetTotalCount();
int logOffset = 0;
//logLine 最大值为100,每次获取100行数据。若需要读取更多数据,请使用offset翻页。
// offset和lines只对关键字查询有效,若使用SQL查询,则无效。在SQL查询中返回更多数据,请使用limit语法。
int logLine = 10;
while (logOffset <= totalLogLines) {
GetLogsResponse res4 = null;
// 对于每个 log offset,一次读取 10 行 log,如果读取失败,最多重复读取 3 次。
for (int retryTime = 0; retryTime < 3; retryTime++) {
GetLogsRequest req4 = new GetLogsRequest(project, logstore, from, to, topic, query, logOffset,
logLine, false);
res4 = client.GetLogs(req4);
if (res4 != null && res4.IsCompleted()) {
break;
}
Thread.sleep(200);
}
assert res4 != null;
System.out.println("Read log count:" + res4.GetCount());
logOffset += logLine;
}
//打开分析功能,只有打开分析功能,才能使用SQL 功能。 可以在控制台开通分析功能,也可以使用SDK开启分析功能
IndexKeys indexKeys = new IndexKeys();
ArrayList<String> tokens = new ArrayList<String>();
tokens.add(",");
tokens.add(".");
tokens.add("#");
IndexKey keyContent = new IndexKey(tokens, false, "text");
indexKeys.AddKey("index0", keyContent);
keyContent = new IndexKey(new ArrayList<String>(), false, "long");
indexKeys.AddKey("index1", keyContent);
keyContent = new IndexKey(new ArrayList<String>(), false, "double");
indexKeys.AddKey("index2", keyContent);
IndexLine indexLine = new IndexLine(new ArrayList<String>(), false);
Index index = new Index(7, indexKeys, indexLine);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(project, logstore, index);
client.CreateIndex(createIndexRequest);
//使用分析功能
GetLogsRequest req4 = new GetLogsRequest(project, logstore, from, to, "", " index0:value | select avg(index1) as v1,sum(index2) as v2, index0 group by index0");
GetLogsResponse res4 = client.GetLogs(req4);
if (res4 != null && res4.IsCompleted()) {
for (QueriedLog log : res4.GetLogs()) {
LogItem item = log.GetLogItem();
for (LogContent content : item.GetLogContents()) {
System.out.print(content.GetKey() + ":" + content.GetValue());
}
System.out.println();
}
}
}
}
然后呢
参考资料