1. <?php/** * 到ES库 操作 API 分别有插入 、查询、聚合 * Created by PhpStorm. * User: BfireLai * Date: 2016/7/20 * Time: 9:11 */class o_es{ /*@var _host*/ protected $_host = ''; /*@var _port*/ protected $_port = ''; /*@var _index*/ protected $_index = ''; /*@var _type*/ protected $_type = ''; /*@var _root*/ protected $_root = ''; /*@var _curl*/ protected $_curl; /*@var _logdir */ protected $_logdir = array(); /*@var _logdir*/ protected $_logflag = 0; /*@var _params*/ protected $_params = array(); /* 程序初始化*/ public function __construct() { //网站根目录 if (!defined('ROOT')) { define('ROOT', dirname(__FILE__)); } //导入 ES 与 log_Info 配置 $all_Config = array( 'es_Conn' => array(//配置ES连接服务 // '_index' => 'filebeat-' . date('Y.m.d', time()),//log_test '_index' => 'test-' . date('Y.m.d', time()),//log_test '_type' => 'eslog',//log_test 'host' => '192.168.56.101',//192.168.1.95 'port' => '9200',//9300 ), 'logDir' => '/data/wwwroot/cms/data/'.date('Ym').'/url_http/',//记录ES LOG 日志目录位置 'logFlag' => 1,//log 是否开启打日志,默认为0,并且只记错误日志 ); //导入 cur 配置 require_once(ROOT . '/Curl.php'); $curl = new Curl(); //库与表 $index = $all_Config['es_Conn']['_index']; $type = $all_Config['es_Conn']['_type']; $host = $all_Config['es_Conn']['host']; $port = $all_Config['es_Conn']['port']; //init $this->setHost($host); $this->setPort($port); $this->setIndex($index); $this->setType($type);//默认值 $this->setRoot(ROOT); $this->setCurl($curl); $this->setLogdir($all_Config['logDir']); $this->setLogflag($all_Config['logFlag']); }///////////////////////////类属性方法 开始///////////////////////////////// /** * Sets specific curl values (updates and keeps default values) * @param string $curl Params * @return $this */ public function setCurl($curl) { $this->_curl = $curl; return $this; } /** * Sets specific host values (updates and keeps default values) * @param string $host Params * @return $this */ public function setHost($host) { $this->_host = $host; return $this; } /** * Sets specific port values (updates and keeps default values) * @param string $port Params * @return $this */ public function setPort($port) { $this->_port = $port; return $this; } /** * Sets specific index values (updates and keeps default values) * @param string $index Params * @return $this */ public function setIndex($index) { $this->_index = $index; return $this; } /** * Sets specific type values (updates and keeps default values) * @param string $type Params * @return $this */ public function setType($type) { $this->_type = $type; return $this; } /** * Sets specific root values (updates and keeps default values) * @param string $root Params * @return $this */ public function setRoot($root) { $this->_root = $root; return $this; } /** * Sets specific logdir values (updates and keeps default values) * @param string $logdir Params * @return $this */ public function setLogdir($logdir) { $this->_logdir = $logdir; return $this; } /** * Sets specific logflag values (updates and keeps default values) * @param string $logflag Params * @return $this */ public function setLogflag($logflag) { $this->_logflag = $logflag; return $this; } /** * Set request param in post or get * @param $params * @return $this */ public function setParams($params) { $this->_params = $params; return $this; }///////////////////////////类属性方法 结束///////////////////////////////// /** * @功能 此方法是返回处理后的 JSON字符串 * @param int $code 1表示正常返回 404 请求不合规则 * @param string $msg * @param string $data */ public function msg($code = 0, $msg = '') { $code = (int)$code; switch ($code){ case 10001: $msg = '您所请求缺少参数!'; break; case 10002: $msg = '您所请求的参数格式错误!'; break; default :break; } $res['ok'] = $code?0:1; $res['msg'] = (string)$msg?:''; echo json_encode($res); exit(); } /** * @功能 对应API 文档说明 */ public function doc() { $docArr['title'] = 'LogApi document'; $docArr['doc']['addApi'] = array(); $docArr['doc']['searchApi'] =array(); $addApi[]['name'] = '<h5>1、请求地址</h5>'; $addApi[]['val'] = '<p>http://vm.boyaa.com/logtools/LogApi.php</p>'; $addApi[]['name'] = '<h5>2、请求方式</h5>'; $addApi[]['val'] = '<p>GET/POST</p>'; $addApi[]['name'] = '<h5>3、请求参数</h5>'; $addApi[]['val'] = '<p><p>act&nbsp;=>string&nbsp;&nbsp; <font color="red">【必填】</font>//表示请求动作<br> data =>jsonString <font color="red">【必填】</font>//要插入的JSON 字符串数据,支持单条与多条<br></p>'; $addApi[]['name'] = '<h5>4、允许插入data JSON 字符串 数据字段如下 :(以下字段参数都为选填)</h5>'; $addApi[]['val'] = '<p>_index =>string; //ES 中的_index 不填写则是默认读配置<br> _type&nbsp;=>string; //ES 中的_type 不填写则是默认读配置<br> server =>string; //服务名<br> sid&nbsp;&nbsp;=>string; //服务ID<br> file &nbsp;=>string; //采集的日志文件名<br> logtime =>int; &nbsp;//log时间戳 不填写默认系统时间<br> lv &nbsp;&nbsp;=>string; //日志级别:ERROR, WARN, INFO, DEBUG, TRACE 等 兼容大小写 <b>默认info 级别</b><br> txt&nbsp;&nbsp;=>string; //日志内容<br> api&nbsp;&nbsp;=>string; //调用的API<br> ip &nbsp;&nbsp;=>string; //调用者ip<br> uid&nbsp;&nbsp;=>string; //用户id<br> <h6>01、单条格式</h6> <span>[{}]<br>eg:http://vm.boyaa.com/logtools/LogApi.php?act=add&data=[{"server":"servertest","sid":"1","lv":"INFO","lvn":20000,"txt":"bfire test","api":"api","ip":"192.168.56.101","uid":"2298"}]</span>; <h6>02、多条格式</h6> <span>[{},{},{}]<br>eg:http://vm.boyaa.com/logtools/LogApi.php?act=add&data=[{"txt":"test1"},{"txt":"test2"},{"txt":"test3"}]</span>; </p>'; $addApi[]['name'] = '<h5>5、返回结果</h5>'; $addApi[]['val'] = 'succ'; //赋值 $docArr['doc']['addApi'] = $addApi; //输出HTML foreach ($docArr as $k=>$v){ if($k = 'title' && !is_array($v)){ print_r( '<h1>'.$v.'</h1><hr>'); } if($k = 'doc' && $v){ if(is_array($v)){ foreach ($v as $key=>$val){ print '<h3>'.$key.'</h3><hr>'; if(is_array($val)){ foreach ($val as $key2=>$val2){ if(@$val2['name']){ print '<h5>'.$val2['name'].'</h5>'; } if(@$val2['val']){ print '<p>'.$val2['val'].'</p>'; } } } } } } } } /** * @功能 添加LOG 日志入ES库的方法 act 固定为add ,data 则为 json 字符串 * @param $params */ public function add($params) { if(empty($params['data']) || 'add' != $params['act'] ){ $this->msg(10001); } // 因在GET data 时已经进行了 addslashes 转换 现要转换回来 $params['data'] = stripslashes($params['data']); //去除UTF-8 中的BOM 防止JSON解释出为空 $params['data'] = trim($params['data'], "\xEF\xBB\xBF"); $dataarr = json_decode($params['data'], true); $jsondata = json_encode($dataarr); //没有数据则直接返回 $countlogarr = count($params); if($jsondata =='null' || $countlogarr == 0){ $this->msg(10002); } $data =array(); foreach ($dataarr as $key=>$val){ $_id = '';//编号 $_index=''; $_type=''; $server = '';//项目编号 服务 $sid = '';//服务名字 $file = '';//采集的日志文件名 $ts = date("Y-m-d H:i:s");//log时间日期 $tsn = '';//log时间戳 $lv = 'INFO';//日志级别:TRACE,DEBUG等 $lvn = 20000;//日志级别的数字 $txt = '';//日志内容 $api = '';//调用的API $ip = '';//调用者ip $uid = '';//用户id if(@$val['_index']){ $_index = $val['_index']; } if(@$val['_type']){ $_type = $val['_type']; } if(@$val['file']){ $file = $val['file']; } if(@$val['server']){ $server = $val['server']; } if(@$val['sid']){ $sid = $val['sid']; } if(@$val['logtime']){ $logtime = (int)$val['logtime']; } if(@$val['lv']){ //lv lvn 取值 $lvt = $val['lv']; $lvt = strtoupper($lvt); $loglevelArr = array('ERROR', 'WARN', 'INFO', 'DEBUG', 'TRACE'); if (in_array($lvt, $loglevelArr, TRUE)) { $level = $this->getLoglevel($lvt); $lv = $level['lv']; $lvn = (int)$level['lvn']; } } if(@$val['txt']){ $txt = $val['txt']; } if(@$val['api']){ $api = $val['api']; } if(@$val['ip']){ $ip = $val['ip']; } if(@$val['uid']){ $uid = $val['uid']; } //处理 $logtime = $logtime ?: time(); $time = time(); $_id = ($key+1).'-'.$time.'-'.$logtime; $ts = date('Y-m-d H:i:s', $logtime); $tsn = (int)$logtime; $data[] = array( // '@timestamp'=>gmdate('Y-m-d\TH:i:s\Z',time()), '@timestamp' => str_replace('+00:00', '.000Z', gmdate('c', $tsn + 3600 * 8)), 'file' => $file, //采集的日志文件名 'server' => $server, //服务 'sid' => $sid, //服务ID 'ts' => $ts, //log时间日期 'tsn' => $tsn, //log时间戳 'lv' => $lv, //日志级别:TRACE,DEBUG等 'lvn' => $lvn, //日志级别的数字 'txt' => $txt, //日志内容 'api' => $api, //调用的API 'ip' => $ip, //调用者ip 'uid' => $uid, //用户id '_index' => $_index,//_index '_type' => $_type, //_type '_id' => $_id, //$_id ); } //处理BULK $jsonStr =''; foreach ($data as $k=>$v){ if($v['_index'] || $v['_type'] || $v['_id']){ $tmpindex=array(); if($v['_index']){ $tmpindex['_index'] = $v['_index']; } if($v['_type']){ $tmpindex['_type'] = $v['_type']; } if($v['_id']){ $tmpindex['_id'] = $v['_id']; } $jsonStr .= '{"index":'.json_encode($tmpindex).'}'."\n"; }else{ $jsonStr .= '{"index":{}}'."\n"; } //unset _index _type unset($v['_index'],$v['_type'],$v['_id']); $jsonStr .= json_encode($v)."\n"; } //单条数据导入 //$url = $this->_type . '/' . $_id . '/'; //批量导入 $url = $this->_type . '/_bulk?pretty'; $this->curlToEs($url, $jsonStr,'add'); $args = $this->_curl->getErrNo(); //统一返回消息 $this->msg($args,$this->_curl->getError()); } /** * @功能 所请求到ES 库中查询 包括条件查询,分布,时间段查询,高亮 查询 * @param array $where * @return mixed */ public function serach($where=array()){ //查询信息设置 //multi_match 精确查询 $query['query']['match_all'] = (object)array(); $should = array();//条件查询 $highlight = array();//高亮 $range = array();//日志时间范围查询 $params = array();//参数 if(is_array($where['query']) && $where['query']){ $timeRange['tsn'] = array(); foreach ($where['query'] as $key=>$val){ if('startTime'== $val['name'] || 'endTime'== $val['name']){ //gt: > 大于 lt: < 小于 gte: >= 大于或等于 lte: <= 小于或等于 if('startTime'== $val['name']){ $range['gte'] = (int)$val['val'];//大于等于 } if('endTime'== $val['name']){ $range['lte'] = (int)$val['val'];//小于等于 } }else{ $should[$key]['match'] = array($val['name']=>$val['val']); //$highlight[$val['name']] = (object)array(); 默认 //每个字段都可以设置高亮显示的字符片fragment_size段大小(默认为100),以及返回的最大片段数number_of_fragments(默认为5) $highlight[$val['name']] = array("fragment_size" =>99999999, "number_of_fragments"=>1); } } } //包含查询 if($should || $highlight || $range){ unset( $query['query']['match_all']); //条件查询 if($should){ $query['query']['bool']['should'] = $should; } //高亮设置 if($highlight){ $query['highlight']['pre_tags'] = array(0=>'<font color="red">');//开始标签 $query['highlight']['post_tags'] = array(0=>'</font>');//结束标签 $query['highlight']['fields'] = $highlight; } //日志时间范围查询 if($range){ //此处固定查询的是时间戳 $query['query']['bool']['must'][]['range']['tsn'] = (object) $range; } } //排序设置 // "sort": { "date": { "order": "desc" }} $sort = array(); if(is_array($where['sort']) && $where['sort']){ foreach ($where['sort'] as $key=>$val){ //log时间戳 if('ts' == $val['name']){ $val['name'] = 'tsn'; } //日志级别的数字 info 20000 error 50000 if('lv' == $val['name']){ $val['name'] = 'lvn'; } $sort[$key][$val['name']] = array("order"=>$val['val']); } } if($sort){ $params['sort'] = $sort; } //合并查询数组 $params = array_merge($params, $query); $page = (int)$where['page']?:1;//第几页 $pagesize = (int)$where['pagesize']?:15;//每页大小 $offset = $pagesize*($page - 1); //偏移量$offset=$pagesize*($page - 1); $params['size'] = $pagesize;//每页大小 $params['from'] = $offset;//偏移量 //$params['timeout'] = '10s';//请求过期时间 10ms 或 1s //post 提交一定要 先转换JSON String ,GET 则不用 可直接用 $params $jsonStr = json_encode($params); //$url = $this->_type . '/_search'; $url = '/_search'; //$url = 'http://' . $this->_host . ':' . $this->_port . '/' . $this->_index . '/' . $url; //$output = $this->_curl->post($url,$jsonStr); $output = $this->curlToEs($url, $jsonStr,'search'); // 获取结果后处理 $outputArr = json_decode($output,true); $res['took'] = (int)$outputArr['took']?:0; $res['num'] = (int)$outputArr['hits']['total']?:0; if($outputArr['hits']['hits']){ $tmpArr =array(); foreach ($outputArr['hits']['hits'] as &$val){ //高亮处理 if(isset($val['highlight'])){ foreach ($val['highlight'] as $k=>$v){ //默认有多个返回所以只取第一个,因为上面fragment_size 已经设置非常大 $val['_source'][$k] = $v[0]; } } $tmpArr[] = $val['_source']; } $res['loop'] = $tmpArr; } return $res; } /** * @功能:通过CURL 方式把数据POST 到ES 系统中 * @参数: * $url=> POST 地址 * $jsonStr=> 数据的JSON串 * $return => 返回结果集, */ function curlToEs($url, $jsonStr, $intype = '') {//URL 正式地址 $url = 'http://' . $this->_host . ':' . $this->_port . '/' . $this->_index . '/' . $url; $output = $this->_curl->post($url, $jsonStr);//$output = $this->_curl->get($url);//记录日志 $param['intype'] = $intype; $param['txt'] = $output; $param['curlUrl'] = $url; $param['curlParam'] = $jsonStr; $args = $this->_curl->getErrNo(); $this->rewritetolog($param, $args); return $output; } /* ** * 写文件 * @param string $file 文件路径 * @param string $str 写入内容 * @param char $mode 写入模式 */ function writeFile($file, $str, $mode = 'w') { $oldmask = @umask(0); $fp = @fopen($file, $mode); @flock($fp, 3); if (!$fp) { Return false; } else { @fwrite($fp, $str); @fclose($fp); @umask($oldmask); Return true; } } /** * @功能:日志记录 * @参数: * @param $param Array() * $param['intype'] => 要记录日志的类型, * $param['txt'] => 返回结果集, * $args => 返回结果状态,0表示成功,非0表示失败 * $logger => 是否要记录日志 * @返回:若转换成功返回true,否则返回false或直接跳出 * 显示行号 __LINE__ */ function rewritetolog($param = array(), $args = '', $logger = true) {//$log['_id'] = '';//编号//$log['proj'] = '';//项目编号 $log['sid'] = '';//服务名字 $log['file'] = '';//采集的日志文件名 $log['iid'] = '';//实例号 $log['seq'] = 0;//日志收集的序列号//$log['ts'] = '';//时间戳//$log['tsn'] = '';//时间戳的毫秒数字 $log['lv'] = 'ERROR';//日志级别:TRACE,DEBUG等 $log['lvn'] = 40000;//日志级别的数字 $log['th'] = '';//线程名 $log['cl'] = '';//类名或文件名 $log['m'] = '';//方法名 $log['ln'] = 0;//行号 $log['bsid'] = '';//业务会话id $log['esid'] = '';//总线会话id $log['txt'] = '';//日志内容 $log['ex'] = '';//异常Exception日志的堆栈信息 $log['ag'] = '';//user agent $log['ip'] = '';//调用者ip $log['uid'] = '';//用户id $time = time(); $ts = date('Y-m-d H:i:s', $time); $tsn = (int)($time * 1000); $log['proj'] = 'log2es';//项目编号 $log['ts'] = $ts;//时间戳 $log['tsn'] = $tsn;//时间戳的毫秒数字 $logStr = ''; if ($logger) { $status = 'INFO'; $flag = $this->_logflag; $curl = $this->_curl; $t['time'] = (string)date("Y-m-d H:i:s"); $t['name'] = $param['intype'] ? (string)$param['intype'] : ''; $t['txt'] = $param['txt'] ? $param['txt'] : ''; $t['curlUrl'] = @$param['curlUrl'] ? (string)$param['curlUrl'] : ''; $t['curlParam'] = @$param['curlParam'] ? $param['curlParam'] : ''; $t['curlInfo'] = $curl->getInfo(); //$curl->getInfo(); $t['curlStatus'] = $curl->getStatus();//$curl->getStatus(); $t['curlError'] = $curl->getError(); //$curl->getError(); $t['curlErrNo'] = $curl->getErrNo(); //$curl->getErrNo(); if ($flag) { if ($args) { $status = 'ERROR'; } $t['status'] = (string)$status;//处理日志记录 $level = $this->getLoglevel($status);//日志级别 $log['lv'] = $level['lv'];//日志级别:TRACE,DEBUG等 $log['lvn'] = $level['lvn'];//日志级别的数字 $log['txt'] = $t;//日志内容//print_r($log);//exit;//$_SERVER $logStr = json_encode($log); $logStr = $logStr . ''; $root = $this->_logdir; $fileName = $root . '/eslog_'; $fileName = $fileName . date('Y-m-d') . ".log"; $this->writeFile($fileName, $logStr, 'a'); } else { if ($args) { $status = 'ERROR';//} $t['status'] = (string)$status;//处理日志记录 $level = $this->getLoglevel($status);//日志级别 $log['lv'] = $level['lv'];//日志级别:TRACE,DEBUG等 $log['lvn'] = $level['lvn'];//日志级别的数字 $log['txt'] = $t;//日志内容//$_SERVER $logStr = json_encode($log); $logStr = $logStr . ''; $root = $this->_logdir; $fileName = $root . '/eslog_'; $fileName = $fileName . date('Y-m-d') . ".log"; $this->writeFile($fileName, $logStr, 'a'); } } } } /* ** * 日志级别 * @param string $lv 日志级别字符串 * @return int 返回数字 */ function getLoglevel($lv) { $lv = strtoupper($lv); $loglevel = array( 'ERROR' => '40000',//40000 'WARN' => '30000',//30000 'INFO' => '20000',//20000 'DEBUG' => '10000',//10000 'TRACE' => '5000',//5000 ); if ($loglevel[$lv]) { $level['lv'] = $lv; $level['lvn'] = $loglevel[$lv]; } else { $level['lv'] = 'ERROR'; $level['lvn'] = 40000; } return $level; }}