Java 类名:com.alibaba.alink.operator.stream.regression.GlmPredictStreamOp
Python 类名:GlmPredictStreamOp

功能介绍

GLM(Generalized Linear Model)又称为广义线性回归模型,是一种常用的统计模型,也是一种非线性模型族,许多常用的模型都属于广义线性回归。
它描述了响应和预测因子之间的非线性关系。广义线性回归模型具有线性回归模型的广义特征。响应变量遵循正态、二项式、泊松分布、伽马分布或逆高斯分布,链接函数f定义了μ和预测值的线性组合之间的关系。
GLM功能包括GLM训练,GLM预测(批和流)和GLM评估, 其中训练使用迭代最小二乘方法。

算法使用

| 分布 | 连接函数 | 对应算法 | | —- | —- | —- |

| 二项分布 | Logit | 逻辑回归 |

| 多项分布 | Logit | softmax |

| 高斯分布 | Identity | 线性回归 |

| Poisson分布 | Log | Possion回归 |

文献或出处

[1] https://en.wikipedia.org/wiki/Generalized_linear_model

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| linkPredResultCol | 连接函数结果的列名 | 连接函数结果的列名 | String | | | null |

| modelFilePath | 模型的文件路径 | 模型的文件路径 | String | | | null |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

| modelStreamFilePath | 模型流的文件路径 | 模型流的文件路径 | String | | | null |

| modelStreamScanInterval | 扫描模型路径的时间间隔 | 描模型路径的时间间隔,单位秒 | Integer | | | 10 |

| modelStreamStartTime | 模型流的起始时间 | 模型流的起始时间。默认从当前时刻开始读。使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s) | String | | | null |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. # data
  5. df = pd.DataFrame([
  6. [1.6094,118.0000,69.0000,1.0000,2.0000],
  7. [2.3026,58.0000,35.0000,1.0000,2.0000],
  8. [2.7081,42.0000,26.0000,1.0000,2.0000],
  9. [2.9957,35.0000,21.0000,1.0000,2.0000],
  10. [3.4012,27.0000,18.0000,1.0000,2.0000],
  11. [3.6889,25.0000,16.0000,1.0000,2.0000],
  12. [4.0943,21.0000,13.0000,1.0000,2.0000],
  13. [4.3820,19.0000,12.0000,1.0000,2.0000],
  14. [4.6052,18.0000,12.0000,1.0000,2.0000]
  15. ])
  16. source = BatchOperator.fromDataframe(df, schemaStr='u double, lot1 double, lot2 double, offset double, weights double')
  17. featureColNames = ["lot1", "lot2"]
  18. labelColName = "u"
  19. # train
  20. train = GlmTrainBatchOp()\
  21. .setFamily("gamma")\
  22. .setLink("Log")\
  23. .setRegParam(0.3)\
  24. .setMaxIter(5)\
  25. .setFeatureCols(featureColNames)\
  26. .setLabelCol(labelColName)
  27. source.link(train)
  28. # batch predict
  29. predict = GlmPredictBatchOp()\
  30. .setPredictionCol("pred")
  31. predict.linkFrom(train, source)
  32. predict.print()
  33. # eval
  34. eval = GlmEvaluationBatchOp()\
  35. .setFamily("gamma")\
  36. .setLink("Log")\
  37. .setRegParam(0.3)\
  38. .setMaxIter(5)\
  39. .setFeatureCols(featureColNames)\
  40. .setLabelCol(labelColName)
  41. eval.linkFrom(train, source)
  42. eval.print()
  43. # stream predict
  44. source_stream = StreamOperator.fromDataframe(df, schemaStr='u double, lot1 double, lot2 double, offset double, weights double')
  45. predict_stream = GlmPredictStreamOp(train)\
  46. .setPredictionCol("pred")
  47. predict_stream.linkFrom(source_stream)
  48. predict_stream.print()
  49. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.regression.GlmEvaluationBatchOp;
  4. import com.alibaba.alink.operator.batch.regression.GlmPredictBatchOp;
  5. import com.alibaba.alink.operator.batch.regression.GlmTrainBatchOp;
  6. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  7. import com.alibaba.alink.operator.stream.StreamOperator;
  8. import com.alibaba.alink.operator.stream.regression.GlmPredictStreamOp;
  9. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  10. import org.junit.Test;
  11. import java.util.Arrays;
  12. import java.util.List;
  13. public class GlmPredictStreamOpTest {
  14. @Test
  15. public void testGlmPredictStreamOp() throws Exception {
  16. List <Row> df = Arrays.asList(
  17. Row.of(1.6094, 118.0000, 69.0000, 1.0000, 2.0000),
  18. Row.of(2.3026, 58.0000, 35.0000, 1.0000, 2.0000),
  19. Row.of(2.7081, 42.0000, 26.0000, 1.0000, 2.0000),
  20. Row.of(2.9957, 35.0000, 21.0000, 1.0000, 2.0000),
  21. Row.of(3.4012, 27.0000, 18.0000, 1.0000, 2.0000),
  22. Row.of(3.6889, 25.0000, 16.0000, 1.0000, 2.0000),
  23. Row.of(4.0943, 21.0000, 13.0000, 1.0000, 2.0000),
  24. Row.of(4.3820, 19.0000, 12.0000, 1.0000, 2.0000),
  25. Row.of(4.6052, 18.0000, 12.0000, 1.0000, 2.0000)
  26. );
  27. BatchOperator <?> source = new MemSourceBatchOp(df,
  28. "u double, lot1 double, lot2 double, offset double, weights double");
  29. String[] featureColNames = new String[] {"lot1", "lot2"};
  30. String labelColName = "u";
  31. BatchOperator <?> train = new GlmTrainBatchOp()
  32. .setFamily("gamma")
  33. .setLink("Log")
  34. .setRegParam(0.3)
  35. .setMaxIter(5)
  36. .setFeatureCols(featureColNames)
  37. .setLabelCol(labelColName);
  38. source.link(train);
  39. BatchOperator <?> predict = new GlmPredictBatchOp()
  40. .setPredictionCol("pred");
  41. predict.linkFrom(train, source);
  42. predict.print();
  43. BatchOperator <?> eval = new GlmEvaluationBatchOp()
  44. .setFamily("gamma")
  45. .setLink("Log")
  46. .setRegParam(0.3)
  47. .setMaxIter(5)
  48. .setFeatureCols(featureColNames)
  49. .setLabelCol(labelColName);
  50. eval.linkFrom(train, source);
  51. eval.print();
  52. StreamOperator <?> source_stream = new MemSourceStreamOp(df,
  53. "u double, lot1 double, lot2 double, offset double, weights double");
  54. StreamOperator <?> predict_stream = new GlmPredictStreamOp(train)
  55. .setPredictionCol("pred");
  56. predict_stream.linkFrom(source_stream);
  57. predict_stream.print();
  58. StreamOperator.execute();
  59. }
  60. }

运行结果

批预测结果

| u | lot1 | lot2 | offset | weights | pred | | —- | —- | —- | —- | —- | —- |

| 1.6094 | 118.0000 | 69.0000 | 1.0000 | 2.0000 | 1.4601 |

| 2.3026 | 58.0000 | 35.0000 | 1.0000 | 2.0000 | 2.6396 |

| 2.7081 | 42.0000 | 26.0000 | 1.0000 | 2.0000 | 3.0847 |

| 2.9957 | 35.0000 | 21.0000 | 1.0000 | 2.0000 | 3.4135 |

| 3.4012 | 27.0000 | 18.0000 | 1.0000 | 2.0000 | 3.5215 |

| 3.6889 | 25.0000 | 16.0000 | 1.0000 | 2.0000 | 3.6901 |

| 4.0943 | 21.0000 | 13.0000 | 1.0000 | 2.0000 | 3.9275 |

| 4.3820 | 19.0000 | 12.0000 | 1.0000 | 2.0000 | 3.9891 |

| 4.6052 | 18.0000 | 12.0000 | 1.0000 | 2.0000 | 3.9581 |

评估结果

{“rank”:3,”degreeOfFreedom”:6,”residualDegreeOfFreeDom”:6,”residualDegreeOfFreedomNull”:8,”aic”:9702.088569686532,”dispersion”:0.016006720896643168,”deviance”:0.0963859019919082,”nullDeviance”:0.8493577599031797,”coefficients”:[0.007797743508544201,-0.031175844426488047],”intercept”:1.609524324733497,”coefficientStandardErrors”:[0.030385113783605693,0.05301723001060941,0.10937960484661188],”tValues”:[0.25663038697427815,-0.5880323136506637,14.715031444761644],”pValues”:[0.8060371545112608,0.5779564640150403,6.188226474801439E-6]}

流预测结果

| u | lot1 | lot2 | offset | weights | pred | | —- | —- | —- | —- | —- | —- |

| 2.7081 | 42.0000 | 26.0000 | 1.0000 | 2.0000 | 3.0847 |

| 2.9957 | 35.0000 | 21.0000 | 1.0000 | 2.0000 | 3.4135 |

| 1.6094 | 118.0000 | 69.0000 | 1.0000 | 2.0000 | 1.4601 |

| 4.0943 | 21.0000 | 13.0000 | 1.0000 | 2.0000 | 3.9275 |

| 4.3820 | 19.0000 | 12.0000 | 1.0000 | 2.0000 | 3.9891 |

| 3.4012 | 27.0000 | 18.0000 | 1.0000 | 2.0000 | 3.5215 |

| 2.3026 | 58.0000 | 35.0000 | 1.0000 | 2.0000 | 2.6396 |

| 3.6889 | 25.0000 | 16.0000 | 1.0000 | 2.0000 | 3.6901 |

| 4.6052 | 18.0000 | 12.0000 | 1.0000 | 2.0000 | 3.9581 |