版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
): 5 13): 22 37 52 68): 74 97): 107): 123 133 151 168): 180):):特性。):CN是一个多模块的Java项目,模块之间通过接口暴露服务,模块关系记录在pom.xml中,通过mvndependency:tre):包–):–– ––):––– 码码e 码):g1.整体了解):•协议解析是将协议数据对象分发到具体执行逻辑的过程,入口在物理计划优化五个步骤,优化产出物理执行计划,传入执行器。优化器使用了执行器接收到物理执行计划后,首先根据计划类型确定执行模式,包括):2.深入了解3.小结CNServer层的代码主要包含在polardbx-server模块中,main函数位于TddlLauncher。1.CobarServer对象的创建2.参数加载->CobarConfig.initCobarConfig()->SStringconf=System.getProperty("server.conf","classpath:perties");3.从MetaDB读取元数据,并初始化实例级的系统组件TddlLauncher.main()->CobarServer.new()->CobarConTddlLauncher.main()->CobarServer.new()->CobarConConfig.initCobarConfig()->ServerLoader.load()->SeMetaDbDataSource.initMetaDbDataSoMetaDbDataSource.initMetaDbDataSoSchemaChangeManager.geSchemaChangeManager.gepolardbx-gms\src\main\resources\ddl\中保存了系统表的表结构,并且使用polardbx.meta.table.d1.t1polardbx.meta.table.d1.t1并且会注册对应的listener,这样当inst_config表发生变化的时候,会回调),7)StorageHaManager4.创建线程池5.CobarServer.init路径:路径:TddlLauncher.main()->2)GmsClusterLoader.loadPolarDbXCluster3)warmup6.网络层的初始化processorsprocessors=newNIoProcessor[system.getProcessoprocessors[i]=newNIoProces}):publicpublicNIOAcceptor(Stringname,intFrontendConnectionFactoryfactory,booleathis.serverChannel=ServerSocketthis.serverChannel.}同时,NIOAcceptor也是一个线程,会处理连接建立的请求。当连接建立后,):7.结语):):):能。):):•[模块]polardbx-cdc-assemble•[模块]polardbx-cdc-canal):•[模块]polardbx-cdc-daemon):•[模块]polardbx-cdc-dumper•[模块]polardbx-cdc-format•[模块]polardbx-cdc-meta整形的基础支撑。此外,该模块还维护了CDC系统库表的Sql脚本定义):•[模块]polardbx-cdc-storage•[模块]polardbx-cdc-task):•[模块]polardbx-cdc-transfer•binlog_system_config•binlog_task_config•binlog_node_info•binlog_dumper_info•binlog_task_info):•binlog_logic_meta_history信息。•binlog_phy_ddl_history•binlog_oss_record•binlog_polarx_command化等。•binlog_schedule_history•binlog_storage_history):•binlog_env_config_history•binlog_schema_historypolardbx.instance.idmem_sizemetaDb_urlmetaDb_usernamemetaDbPasswdpolarx_urlpolarx_usernamepolarx_passworddnPasswordKey):createdatabasetransfer_test;CREATETABLE`transfer_test`.`accounts`(`id`int(11)NOTNULL,`balance`int(11)NOTNULL,`gmt_created`datetimenotnull,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8dbpartitionbyhash(`id`)tbpartitionbyhash(`id`)tbpartitions2;INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(1,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(2,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(3,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(4,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(5,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(6,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(7,100,now());):INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(8,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(9,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(10,100,now());dockerrun-itd--namemysql_3309-p3309:3306-eMYSQL_ROOT_PASSWORD=rootmysql登录dokcer实例:dockerexec-itmysql_3309bash编辑/etc/mysql/f,a.增加如下配置来关闭Gtid(polardbx-cdc全局Binlog暂不支持Gtid)gtid_mode=OFFenforce_gtid_consistency=OFFb.更改serverid,避免与主库重复server_id=2重启docker实例:dockerrestartmysql_3309stopslave;resetslave;CHANGEMASTERTOMASTER_HOST='xxx',MASTER_USER='xxx',MASTER_PASSWORD='xxx',MASTER_PORT=xxx,MASTER_LOG_FILE='binlog.000001',MASTER_LOG_POS=4,MASTER_CONNECT_RETRY=100;startslave;):测试程序之后,可以用下面的SQL,验证两边的数据是否完全一致SELECTISNULL(balance))))ASUNSIGNED))ASchecksumFROMaccou):):):节。过,这部分逻辑在NIOAcceptor的构造函数中,每个CN进程只启动一个):):封装了优化执行部分的逻辑,其中Planner#plan为优化器入口,):):四、Parser),SELECT*FROMt1WHEREid>1;SELECT(keyword),*(identifier),FROM(keyword),t1(identifier),WHERE(keword),id(identifier),>(gt),1(literal_int),;(semicolon)):):划管理。六、Validator):些对AST改写的内容,主要用于屏蔽相同语义的不同语法结构。然后),):八、PlanEnumerator替换原来的子树,而是将生成的新的执行计划保存在RelSubset中,后续从):十、PostPlannerSELECT*FROMrJOINtON=WHEREr.id=0ANDt.id=1;):ExecutorHelper#execute中根据优化器确定的执行模式选择执行链路。以下以Cursor链路中,首先根据执行计中的算子找到对应的handler,代码位置在):):):):#一个简单的PolarDB-X中的分库分表sbtestCREATETABLE`sbtest`(`id`int(11)NOTNULLAUTO_INCREMENT,`k`int(11)NOTNULLDEFAULT'0',`c`char(120)NOTNULLDEFAULT'',`pad`char(60)NOTNULLDEFAULT'',PRIMARYKEY(`id`))dbpartitionbyhash(`id`)tbpartitionbyhash(`id`)tbpartitions2;insertintosbtest(id)values(100);values(100);PolarDB-X接收到该字符串语句后,开始执行该SQL,可见):ExecutionPlanplan=Planner.getInstance().plan(sql,executionContext);):SqlNodevalidatedNode=converter.validate(ast);):...finalSqlValidatorNamespacetargetNamespace=getNamespace(insert);validateNamespace(targetNamespace,unknownType);...finalSqlNodesource=insert.getSource();if(sourceinstanceofSqlSelect){finalSqlSelectsqlSelect=(SqlSelect)source;validateSelect(sqlSelect,targetRowType);}else{finalSqlValidatorScopescope=scopes.get(source);validateQuery(source,scope,targetRowType);}...RelNoderelNode=converter.toRel(validatedNode,plannerContext);):...finalSqlToRelConvertersqlToRelConverter=newTddlSqlToRelConverter(...);RelRootroot=sqlToRelConverter.convertQuery(validatedNode,false,true);...RelNoderelNode=super.convertInsert(call);if(relNodeinstanceofTableModify){...}):ToDrdsRelVisitortoDrdsRelVisitor=newToDrdsRelVisitor(validatedNode,plannerContext);RelNodedrdsRelNode=relNode.accept(toDrdsRelVisitor);if((otherinstanceofLogicalTableModify)){...if(operation==TableModify.operation.INSERT||...){LogicalInsertlogicalInsert=newLogicalInsert(modify);...}}):):privateRelNodesqlRewriteAndPlanEnumerate(RelNodeinput,PlannerContextplannerContext){CalcitePlanoptimizerTrace.getoptimizerTracer().get().addSnapshot("Start",input,plannerContext);//RBo优化RelNodelogicaloutput=optimizeBySqlWriter(input,plannerContext);CalcitePlanoptimizerTrace.getoptimizerTracer().get().addSnapshot("PlanEnumerate",logicaloutput,plannerContext);//CBo优化RelNodebestPlan=optimizeByPlanEnumerator(logicaloutput,plannerContext);//finallyweshouldcleartheplannertoreleasememorybestPlan.getCluster().getPlanner().clear();bestPlan.getCluster().invalidateMetadataQuery();returnbestPlan;}publicenumExecutionStrategy{/***Foreachrow,existsonlyonetargetpartition.*Pushdownoriginstatement,withfunctioncallnotpushable(likesequencecall)replacedbyRexCallParam.*Typicalforsingletableandpartitionedtablewithoutgsi.*/):PUSHDOWN,/***Foreachrow,mightexistsmorethanonetargetpartition.*Pushdownoriginstatement,withnondeterministicfunctioncallreplacedbyRexCallParam.*Typicalforbroadcasttable.*/DETERMINISTIC_PUSHDOWN,/***Foreachrow,mightexistsmorethanonetargetpartition,anddataindifferenttargetpartitionsmightbedifferent.*Selectthenexecute,withallfunctioncallreplacedbyRexCallParam.*Typicalfortablewithgsiortablearedoingscaleout.*/LOGICAL;};BuildFinalPlanVisitorvisitor=newBuildFinalPlanVisitor(executionPlan.getAst(),plannerContext);):executionPlan=executionPlan.copy(executionPlan.getPlan().accept(visitor));ResultCursorresultCursor=executor.execute(plan,executionContext);):...MyPhyTableModifyCursormodifyCursor=(MyPhyTableModifyCursor)repo.getCursorFactory().repoCursor(executionContext,logicalPlan);...affectRows=modifyCursor.batchUpdate();...publicint[]batchUpdate(){try{returnhandler.executeUpdate(this.plan);}catch(SQLExceptione){throwGeneralUtil.nestedException(e);}}):publicCursorhandle(RelNodelogicalPlan,ExecutionContextexecutionContext){...LogicalInsertlogicalInsert=(LogicalInsert)logicalPlan;...if(!logicalInsert.isSourceSelect()){affectRows=doExecute(logicalInsert,executionContext,handlerParams);}else{affectRows=selectForInsert(logicalInsert,executionContext,handlerParams);):}...}会根据来源是否是Select语句选择不同的执行方式,具体执行过程在...finalInsertWriterprimaryWriter=logicalInsert.getPrimaryInsertWriter();List<RelNode>inputs=primaryWriter.getInput(executionContext);...//如果有GSI,生成GSI表的物理执行计划finalList<InsertWriter>gsiWriters=logicalInsert.getGsiInsertWriters();gsiWriters.stream().map(gsiWriter->gsiWriter.getInput(executionContext))...;...finalinttotalAffectRows=executePhysicalPlan(allPhyPlan,executionContext,schemaName,isBroadcast);...):其中dbIndex是物理库名,tableNames是物理表名,param保存了这条):publicint[]executeUpdate(BaseQueryoperationphyTableModify)throwsSQLException{...Pair<String,Map<Integer,ParameterContext>>dbIndexAndParam=phyTableModify.getDbIndexAndParam(executionContext.getParams()==null?null:executionContext.getParams().getCurrentParameter(),executionContext);...connection=getPhyConnection(transaction,rw,groupName);...//根据参数组成字符串SQLStringsql=buildSql(sqlAndParam.sql,executionContext);...//根据连接创建prepareStatementps=prepareStatement(sql,connection,executionContext,isInsert,false);...//设置参数ParameterMethod.setParameters(ps,sqlAndParam.param);...//执行affectRow=((PreparedStatement)ps).executeUpdate();...}): 死锁检测功能属于事务模块的功能,死锁检测任务则挂载在事务管理器if(!hasLeadership()){return;}//GetallglobaltransactioninformationfinalTrxLookupSetlookupSet=fetchTransInfo();finallongbeforeTimeMillis=System.currentTimeMillis()-1000L;finallongbeforeTxid=IdGenerator.assembleId(beforeTimeMillis,0,0);for(ITransactiontran:transactions){if(!tran.isDistributed()){continue;}//Dodeadlockdetectiononlyfortransactionsthattakelongerthan1s.if(tran.getId()>=beforeTxid){continue;}//Getinformationfromthistran.......}//Getallgroupdatasources,andgroupbyDN'sID(host:port)finalMap<String,List<TGroupDataSource>>instId2GroupList=ExecUtils.getInstId2GroupList(allSchemas);finalDiGraph<TrxLookupSet.Transaction>graph=newDiGraph<>();for(List<TGroupDataSource>groupDataSources:instId2GroupList.values()){if(CollectionUtils.isNotEmpty(groupDataSources)){//SincealldatasourcesareinthesameDN,anydatasourceisok.finalTGroupDataSourcegroupDataSource=groupDataSources.get(0);//GetallgroupnamesinthisDN.finalSet<String>groupNames=groupDataSources.stream().map(TGroupDataSource::getDbGroupKey).collect(Collectors.toSet());//Fetchlock-waitinformationforthisDN,//andupdatethelookupsetandthegraphwiththeinformation.fetchLockWaits(groupDataSource,groupNames,lookupSet,graph);}}graph.detect().ifPresent((cycle)->{DeadlockParser.parseGlobalDeadlock(cycle);killByFrontendConnId(cycle.get(0));});privatevoiddoCancel()throwsSQLException{//这个futureCancelErrorCode用在后面的错误判断中,//死锁导致的kill,错误码都是ERR_TRANS_DEADLOCKfutureCancelErrorCode=this.errorCode;//kill掉所有物理连接上正在运行的SQLif(conn!=null){conn.kill();}//这里这个f是正在执行逻辑SQL的任务Futuref=executingFuture;if(f!=null){f.cancel(true);}}码是ERR_TRANS_DEADLOCK时,就会将当前事务回滚掉,并给客户端发送Deadlockfoundwhentryingtogetlock;tryrestartingtransaction的错误提示。//Handledeadlockerror.if(isDeadLockException(t)){//Preventthistransactionfromcommitting.this.conn.getTrx().setCrucialError(ERR_TRANS_DEADLOCK);//Rollbackthistrx.try{innerRollback();}catch(SQLExceptionexception){logger.warn("rollbackfailedwhendeadlockfound",exception);}} 实现的一套精简的定制化Reactor框架。这部分代码改进自polardbx-sql中的码。会话。度私有协议相关的定时任务,这个就是XConnectionManager的工作了,件在XConnection这里。JDBC兼容层提供了包括DataSource、Connection、publicclassGalaxyTest{publicfinalstaticStringSERVER_IP="";publicfinalstaticintSERVER_PORT=31306;publicfinalstaticStringSERVER_USR="root";publicfinalstaticStringSERVER_PSW="root";privatefinalstaticStringDATABASE="test";staticXDataSourcedataSource=newXDataSource(SERVER_IP,SERVER_PoRT,SERVER_USR,SERVER_PSW,DATABASE,null);publicstaticXConnectiongetConn()throwsException{return(XConnection)dataSource.getConnection();}publicstaticList<List<object>>getResult(XResultresult)throwsException{returngetResult(result,false);}publicstaticList<List<object>>getResult(XResultresult,booleanstringorBytes)throwsException{finalList<PolarxResultset.ColumnMetaData>metaData=result.getMetaData();finalList<List<object>>ret=newArrayList<>();while(result.next()!=null){finalList<ByteString>data=result.current().getRow();assertmetaData.size()==data.size();finalList<object>row=newArrayList<>();for(inti=0;i<metaData.size();++i){finalPair<object,byte[]>pair=XResultUtil.resultToobject(metaData.get(i),data.get(i),true,result.getSession().getDefaultTimezone());finalobjectobj=stringorBytes?(pair.getKey()instanceofbyte[]||null==pair.getValue()?pair.getKey():newString(pair.getValue())):pair.getKey();row.add(obj);}ret.add(row);}returnret;}privatevoidshow(XResultresult)throwsException{List<PolarxResultset.ColumnMetaData>metaData=result.getMetaData();for(PolarxResultset.ColumnMetaDatameta:metaData){System.out.print(meta.getName().toStringUtf8()+"\t");}System.out.println();finalList<List<object>>objs=getResult(result);for(List<object>list:objs){for(objectobj:list){System.out.print(obj+"\t");}System.out.println();}System.out.println(""+result.getRowsAffected()+"rowsaffected.");}@Ignore@Testpublicvoidplayground()throwsException{try(XConnectionconn=getConn()){conn.setStreamMode(true);finalXResultresult=conn.execQuery("select1");show(result);}}}首先XDataSource会根据存储的【IP,端口,用户名】这三元组查找到在我们这个代码的场景下,由于数据源刚新建,后台的定时任务还没跑过,所以),们直接调用了XConnection中的execQuery,这个函数等价于直接创建一个首先execQuery会记录各种调用信息进行相关统计,然后会进入关键的经过一些列的变量设置,lazyDB设置,我们会构造一个用于发送具体请求的),结果。并缓存到rows里面,而对应上述测试代码中流式执行的情况,结果。):):二、Task组件):):):):):):):privatevoidconsume(TxnMessagemessage,MessageTypeprocessType)throwsIOException,InterruptedException{...switch(processType){caseBEGIN:...break;caseDATA:...break;caseEND:...break;caseTAG:currentToken=message.getTxnTag().getTxnMergedToken();if(currentToken.getType()==TxnType.META_DDL){...}elseif(currentToken.getType()==TxnType.META_DDL_PRIVATE){...}elseif(currentToken.getType()==TxnType.META_SCALE){...}elseif(currentToken.getType()==TxnType.META_HEARTBEAT){...}elseif(currentToken.getType()==TxnType.META_CONFIG_ENV_CHANGE){...}break;default:thrownewPolardbxException("invalidmessagetypeforlogfilegenerator:"+processType);}}):): ),我们将重点关注DDL在执行器中的执行流程,在阅读本文前,可预先阅读与):•一条逻辑DDL语句在解析后进入优化器,仅做简单的类型转化后生成publicCursorhandle(RelNodelogicalPlan,ExecutionContextexecutionContext){BaseDdloperationlogicalDdlPlan=(BaseDdloperation)logicalPlan;initDdlContext(logicalDdlPlan,executionContext);//Validatetheplanfirstandthenreturnimmediatelyifneeded.booleanreturnImmediately=validatePlan(logicalDdlPlan,executionContext);):.....setPartitionDbIndexAndPhyTable(logicalDdlPlan);//BuildaspecificDDLjobbysubclassthatoverridebuildDdlJobDdlJobddlJob=returnImmediately?newTransientDdlJob()://@overridebuildDdlJob(logicalDdlPlan,executionContext);//ValidatetheDDLjobbeforerequest.//@overridevalidateJob(logicalDdlPlan,ddlJob,executionContext);//HandletheclientDDLrequestontheworkerside.handleDdlRequest(ddlJob,executionContext);.....returnbuildResultCursor(logicalDdlPlan,executionContext);}):),),##本文涉及的SQL语句####createdatabasedb1mode="auto";usedb1;createtablet1(xint,yint);##本文详解的DDL语句altertablet1addglobalindex`g_i_y`(`y`)COVERING(`x`)partitionbyhash(`y`);):privateResultCursorexecuteQuery(ByteStringsql,ExecutionContextexecutionContext,AtomicBooleantrxPolicyModified){//Getallmetaversionbeforeoptimizationfinallong[]metaVersions=MdlContext.snapshotMetaVersions();//PlannerExecutionPlanplan=Planner.getInstance().plan(sql,executionContext);...//forrequireMDLtransactionif(requireMdl&&enableMdl){if(!isClosed()){//AcquiremetadatalockforeachstatementmodifiestabledataacquireTransactionalMdl(sql.toString(),plan,executionContext);}...//updatePlanifmetaVersionchanged,whichindicatemetaupdated}...):ResultCursorresultCursor=executor.execute(plan,executionContext);...returnresultCursor;}publicclassCreatePartitionGsiJobFactoryextendsCreateGsiJobFactory{@overrideprotectedvoidexcludeResources(Set<String>resources){super.excludeResources(resources);//metadatalockinMetaDBresources.add(concatWithDot(schemaName,primaryTableName));//db1.t1resources.add(concatWithDot(schemaName,indexTableName));//db1.g_i_y...}@overrideprotectedExecutableDdlJobdoCreate(){...):if(needOnlineSchemaChange){bringUpGsi=GsiTaskFactory.addGlobalIndexTasks(schemaName,primaryTableName,indexTableName,stayAtDeleteOnly,stayAtWriteOnly,stayAtBackFill);}...List<DdlTask>taskList=newArrayList<>();//1.validatetaskList.add(validateTask);//2.creategsitable//2.1inserttablePartitionmetaforgsitabletaskList.add(createTableAddTablesPartitionInfoMetaTask);//2.2creategsiphysicaltableCreateGsiPhyDdlTaskcreateGsiPhyDdlTask=newCreateGsiPhyDdlTask(schemaName,primaryTableName,indexTableName,physicalPlanData);taskList.add(createGsiPhyDdlTask);//2.3inserttablesmetaforgsitabletaskList.add(addTablesMetaTask);taskList.add(showTableMetaTask);//3.//3.1insertindexesmetaforprimarytabletaskList.add(addIndexMetaTask);//3.2gsistatus:CREATING->DELETE_ONLY->WRITE_ONLY->WRITE_REORG->PUBLICtaskList.addAll(bringUpGsi);//lasttableSyncTaskDdlTasktableSyncTask=newTableSyncTask(schemaName,indexTableName);taskList.add(tableSyncTask);finalExecutableDdlJob4CreatePartitionGsiresult=newExecutableDdlJob4CreatePartitionGsi();result.addSequentialTasks(taskList);....returnresult;}...}):publicclassCreateTableAddTablesMetaTaskextendsBaseGmsTask{@overridepublicvoidexecuteImpl(ConnectionmetaDbConnection,ExecutionContextexecutionContext){PhyInfoSchemaContextphyInfoSchemaContext=TableMetaChanger.buildPhyInfoSchemaContext(schemaName,):logicalTableName,dbIndex,phyTableName,sequenceBean,tablesExtRecord,partitioned,ifNotExists,sqlKind,executionContext);FailPoint.injectRandomExceptionFromHint(executionContext);FailPoint.injectRandomSuspendFromHint(executionContext);TableMetaChanger.addTableMeta(metaDbConnection,phyInfoSchemaContext);}@overridepublicvoidrollbackImpl(ConnectionmetaDbConnection,ExecutionContextexecutionContext){TableMetaChanger.removeTableMeta(metaDbConnection,schemaName,logicalTableName,false,executionContext);}@overrideprotectedvoidonRollbackSuccess(ExecutionContextexecutionContext){TableMetaChanger.afterRemovingTableMeta(schemaName,logicalTableName);}}•addTableMetaTask•TableSyncTask):executor/src/main/java/com/alibaba/polardbx/execucom.alibaba.polardbx.executor.ddl.job.task.factpublicstaticList<DdlTask>addGlobalIndexTasks(StringschemaName,StringprimaryTableName,StringindexName,booleanstayAtDeleteonly,booleanstayAtWriteonly,booleanstayAtBackFill){....DdlTaskwriteonlyTask=newGsiUpdateIndexStatusTask(schemaName,):primaryTableName,indexName,IndexStatus.DELETE_oNLY,IndexStatus.WRITE_oNLY).onExceptionTryRecoveryThenRollback();....taskList.add(deleteonlyTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));....taskList.add(writeonlyTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));...taskList.add(newLogicalTableBackFillTask(schemaName,primaryTableName,indexName));...taskList.add(writeReorgTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));taskList.add(publicTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));returntaskList;}):com.alibaba.polardbx.gms.metadb.table.IndexStatus//optimizeLogicalInsertRule.javaprivateLogicalInserthandlePushdown(LogicalInsertorigin,booleandeterministicPushdown,ExecutionContextec){...//otherwritersfinalList<InsertWriter>gsiInsertWriters=newArrayList<>();IntStream.range(0,gsiMetas.size()).forEach(i->{finalTableMetagsiMeta=gsiMetas.get(i);finalReloptTablegsiTable=catalog.getTableForMember(ImmutableList.of(schema,gsiMeta.getTableName()));finalList<Integer>gsiValuePermute=gsiColumnMappings.get(i);finalbooleanisGsiBroadcast=TableTopologyUtil.isBroadcast(gsiMeta);finalbooleanisGsiSingle=TableTopologyUtil.isSingle(gsiMeta);//differentwritestragetyforcorrespondingtabletype.gsiInsertWriters.add(WriterFactory.createInsertorReplaceWriter(newInsert,gsiTable,sourceRowType,gsiValuePermute,gsiMeta,gsiKeywords,):null,isReplace,isGsiBroadcast,isGsiSingle,isValueSource,ec));});...}//LogicalInsertWriter.javaprotectedintexecuteInsert(LogicalInsertlogicalInsert,ExecutionContextexecutionContext,HandlerParamshandlerParams){...finalList<InsertWriter>gsiWriters=logicalInsert.getGsiInsertWriters();gsiWriters.stream().map(gsiWriter->gsiWriter.getInput(executionContext)).filter(w->!w.isEmpty()).forEach(w->{writableGsiCount.incrementAndGet();allPhyPlan.addAll(w);});//IndexStatus.java...publicstaticfinalEnumSet<IndexStatus>WRITABLE=EnumSet.of(WRITE_ONLY,WRITE_REORG,PUBLIC,DROP_WRITE_ONLY);publicbooleanisWritable(){returnWRITABLE.contains(this);}...):•expireSchemaManager(t1,g_i1,v0):消除旧版本元信息,将新版本元信息):com.alibaba.polardbx.executor.gms.GmsTableMetpublicvoidtonewversion(StringtableName,booleanpreemptive,LonginitWait,Longinterval,TimeUnittimeUnit){synchronized(optimizerContext.getContext(schemaName)){GmsTableMetaManageroldSchemaManager=(GmsTableMetaManager)optimizerContext.getContext(schemaName).getLatestSchemaManager();TableMetacurrentMeta=oldSchemaManager.getTableWithNull(tableName);longversion=-1;....//查询当前MetaDB中的元数据版本并将其赋值给vesion//1.loadSchemaSchemaManagernewSchemaManager=newGmsTableMetaManager(oldSchemaManager,tableName,rule);newSchemaManager.init();optimizerContext.getContext(schemaName).setSchemaManager(newSchemaManager);//2.mdl(v0).writeLockfinalMdlContextcontext;if(preemptive){):context=MdlManager.addContext(schemaName,initWait,interval,timeUnit);}else{context=MdlManager.addContext(schemaName,false);}MdlTicketticket=context.acquireLock(newMdlRequest(1L,MdlKey.getTableKeyWithLowerTableName(schemaName,currentMeta.getDigest()),MdlType.MDL_EXCLUSIVE,MdlDuration.MDL_TRANSACTION));//3.expireSchemaManager(t1,g_i1,v0)oldSchemaManager.expire();....//失效使用旧版本元信息的PlanCache.context.releaseLock(1L,ticket);}}): ):2.DDLTask3.Worker和Leader):):com.alibaba.polardbx.executor.handler.ddl.Locom.alibaba.polardbx.executor.ddl.newengine..alibaba.polardbx.executor.ddl.newengine.):com.alibaba.polardbx.executor.ddl.newengine.DdlEngin):publicclassDdlEngineDagExecutor{publicstaticvoidrestoreAndRun(StringschemaName,LongjobId,ExecutionContextexecutionContext){booleanrestoreSuccess=DdlEngineDagExecutorMap.restore(schemaName,jobId,executionContext);DdlEngineDagExecutordag=DdlEngineDagExecutorMap.get(schemaName,jobId);dag.run();}privatevoidrun(){//Startthejobstatemachine.if(ddlContext.getState()==DdlState.QUEUED){onQueued();}if(ddlContext.getState()==DdlState.RUNNING){onRunning();}if(ddlContext.getState()==DdlState.ROLLBACK_RUNNING){onRollingBack();}//Handletheterminatedstates.switch(ddlContext.getState()){caseROLLBACK_PAUSED:casePAUSED:onTerminated();break;caseROLLBACK_COMPLETED:caseCOMPLETED:onFinished();break;default:break;}}}com.alibaba.polardbx.executor.ddl.newengine.):别队列。privatevoidonRunning(){while(true){if(hasFailureOnState(DdlState.RUNNING)){if(waitForAllTasksToStop(50L,TimeUnit.MILLISECONDS)){LOGGER.info(String.format("JobId:[%s],alltasksstopped",ddlContext.getJobId()));return;}else{continue;}}if(executingTaskScheduler.isAllTaskDone()){updateDdlState(DdlState.RUNNING,DdlState.COMPLETED);return;}if(executingTaskScheduler.hasMoreExecutable()){//fetch&executenextbatchsubmitDdlTask(executingTaskScheduler.pollBatch(),true,executingTaskScheduler);continue;}//getsomerestsleep(50L);}):privatevoidonRollingBack(){if(
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 营养素补充剂的市场前景与监管-洞察分析
- 网络时代的文化传播路径-洞察分析
- 图像处理与排版协同优化-洞察分析
- 异常流量监测与识别-洞察分析
- 《临江仙》(梦后楼台高锁)课件
- 人际关系与沟通风格-组织行为学课件
- 办公环境中的创新教育实践与思考
- 办公环境下的学生运动团队建设与组织
- 企业中层管理岗位的工作规划与管理执行
- 企业员工子女教育的政策支持
- 阿托品化课件
- 《休闲学概论》课后习题参考答案
- (新版教材)苏教版三年级上册科学全册单元测试卷
- 刚晓观所缘缘论略讲
- 双桥静力触探分层统计及承载力表0421
- 八卦五行-PPT课件
- ISO8573-2测定悬浮状油含量的试验方法学习资料
- 薪酬管理试卷及答案
- 大学无机及分析化学----气体练习题及答案
- 保险行业新会计准则实施指南征求意见稿
- 形式发票模板 PI模板 英文版
评论
0/150
提交评论