Java 类名:com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp
Python 类名:CatalogSinkBatchOp

功能介绍

Catalog描述了数据库的属性和数据库的位置, 支持Mysql, Derby, Sqlite, Hive.
在使用时,需要先下载插件,详情请看https://www.yuque.com/pinshu/alink_guide/czg4cx
定义分成三步:
第一步,定义Catalog

| 数据库 | Java 接口 | | —- | —- |

| Derby | DerbyCatalog(String catalogName, String defaultDatabase, String derbyVersion, String derbyPath) |

| MySql | MySqlCatalog(String catalogName, String defaultDatabase, String mysqlVersion,String mysqlUrl, String port, String userName, String password) |

| Sqlite | SqliteCatalog(String catalogName, String defaultDatabase, String sqliteVersion, String dbUrl) |

| Hive | HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir)

HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, FilePath hiveConfDir)

HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir,String kerberosPrincipal, String kerberosKeytab) |

  1. 示例:
  2. derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+'/'+DERBY_DB)
  3. 各插件提供的版本:
  4. Hive2.3.4
  5. MySQL: 5.1.27
  6. Derby: 10.6.1.0
  7. SQLite: 3.19.3
  8. odps: 0.36.4-public

第二步, 定义CatalogObject

  1. dbName = "sqlite_db"
  2. tableName = "table"
  3. # 第一个参数是Catalog, 第二个参数是DB/Project
  4. catalogObject = CatalogObject(derby, ObjectPath(dbName, tableName))

第三步,定义Source和Sink

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| catalogObject | catalog object | catalog object | String | ✓ | | |

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

Derby

  1. derbyFolder = "*"
  2. DERBY_SCHEMA = "derby_schema"
  3. DERBY_DB = "derby_db"
  4. derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+'/'+DERBY_DB)
  5. catalogObject = CatalogObject(derby, ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3"))
  6. catalogSinkBatchOp = CatalogSinkBatchOp()\
  7. .setCatalogObject(catalogObject)
  8. source.link(catalogSinkBatchOp)
  9. BatchOperator.execute()
  10. catalogSourceBatchOp = CatalogSourceBatchOp()\
  11. .setCatalogObject(catalogObject)
  12. catalogSourceBatchOp.print()

Java 代码

  1. String derbyFolder = "*";
  2. String DERBY_SCHEMA = "derby_schema";
  3. String DERBY_DB = "derby_db";
  4. DerbyCatalog derby = new DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0",
  5. derbyFolder + '/' + DERBY_DB);
  6. CatalogObject catalogObject = new CatalogObject(derby,
  7. new ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3"));
  8. catalogSinkBatchOp catalogSinkStreamOp = new catalogSinkBatchOp()
  9. .setCatalogObject(catalogObject);
  10. source.link(catalogSinkStreamOp);
  11. StreamOperator.execute();
  12. CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp()
  13. .setCatalogObject(catalogObject);
  14. catalogSourceStreamOp.print();
  15. StreamOperator.execute();

Sqlite

Python 代码

  1. sqliteFolder = "*"
  2. SQLITE_SCHEMA = "sqlite_schema"
  3. SQLITE_DB = "sqlite_db"
  4. sqlite = SqliteCatalog("sqlite_test_catalog", None, "3.19.3", [sqliteFolder+'/'+SQLITE_DB])
  5. catalogObject = CatalogObject(sqlite, ObjectPath(SQLITE_DB, "test_catalog_source_sink3"))
  6. catalogSinkBatchOp = CatalogSinkBatchOp()\
  7. .setCatalogObject(catalogObject)
  8. source.link(catalogSinkBatchOp)
  9. BatchOperator.execute()
  10. catalogSourceBatchOp = CatalogSourceBatchOp()\
  11. .setCatalogObject(catalogObject)
  12. catalogSourceBatchOp.print()

Java代码

  1. String sqliteFolder = "*";
  2. String SQLITE_SCHEMA = "sqlite_schema";
  3. String SQLITE_DB = "sqlite_db";
  4. SqliteCatalog sqlite = new SqliteCatalog("sqlite_test_catalog", null, "3.19.3", sqliteFolder + '/' +
  5. SQLITE_DB);
  6. CatalogObject catalogObject = new CatalogObject(sqlite, new ObjectPath(SQLITE_DB,
  7. "test_catalog_source_sink3"));
  8. CatalogSinkBatchOp catalogSinkStreamOp = CatalogSinkBatchOp()
  9. .setCatalogObject(catalogObject);
  10. source.link(catalogSinkBatchOp);
  11. StreamOperator.execute();
  12. CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp()
  13. .setCatalogObject(catalogObject);
  14. catalogSourceStreamOp.print();
  15. StreamOperator.execute();