Migrate Data from MySQL to DynamoDB

MigrateDatafromMySQLtoDynamoDB

DirectlywritestotheDynamoDB

https://github.com/audienceproject/spark-dynamodb

Iwasthinkingthisshouldwork,butitdoesnotworkingatreading

%spark.dep

z.load("mysql:mysql-connector-java:5.1.47")

z.load("com.github.traviscrawford:spark-dynamodb:0.0.13")

z.load("com.audienceproject:spark-dynamodb_2.11:0.4.1")

Thisreadingdoesnotwork

importcom.audienceproject.spark.dynamodb.implicits._

valaccountDF=spark.read.option("region","us-west-1").dynamodb("account-int-accounts")

accountDF.printSchema()

accountDF.show(2)

Thisreadingwork

importcom.github.traviscrawford.spark.dynamodb._

valaccountDF=sqlContext.read.dynamodb("us-west-1","account-int-accounts")

accountDF.printSchema()

accountDF.show(1)

Thisisworkingforwritingdata,butIdonotthinkitworkswellwiththecapacity

%spark.dep

z.load("mysql:mysql-connector-java:5.1.47")

z.load("com.github.traviscrawford:spark-dynamodb:0.0.13")

z.load("com.audienceproject:spark-dynamodb_2.11:0.4.1")

z.load("com.google.guava:guava:14.0.1")

importcom.github.traviscrawford.spark.dynamodb._

valaccountDF=sqlContext.read.dynamodb("us-west-1","account-int-accounts")

accountDF.printSchema()

accountDF.show(1)

importcom.audienceproject.spark.dynamodb.implicits._

accountDF.write.option("region","us-west-1").dynamodb("account-int-accounts2")

TheReadWorksasWell

importcom.audienceproject.spark.dynamodb.implicits._

valdynamoDF=spark.read.option("region","us-west-1").dynamodb("account-int-accounts")

dynamoDF.printSchema()

dynamoDF.show(5)

DynamoDBFormatandAWSCommand

https://github.com/lmammino/json-dynamo-putrequest

Firstofall,preparetheJSONfileontheserver,usuallyIwilldownloadthat

>hdfsdfs-gethdfs://localhost:9000/mysqltodynamodb/account2./account2

FindtheJSONfileaccount2.json

InstallNodeJSifitisnotonthesystem

>sudoaptinstallnodejs

>sudoaptinstallnpm

>node--version&&npm--version

v8.10.0

3.5.2

Installthesoftware

>sudonpminstall--globaljson-dynamo-putrequest

Checkinstallation

>json-dynamo-putrequest--help

>json-dynamo-putrequest--version

1.0.0

Command

>json-dynamo-putrequestaccount-int-accounts2--outputaccount-dynamo.json<account2.json

Error:Inputdataneedstobeanarray

Add[and],replace}to},trythedataagain.

>json-dynamo-putrequestaccount-int-accounts2--outputaccount-dynamo.json<account2.json

Outputsavedin/home/ubuntu/data/account-dynamo.json

Fileisreadyasaccount-dynamo.json

https://github.com/lmammino/json-dynamo-putrequest

ThenfollowdocumentstoimportdataintoTable

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/SampleData.LoadData.html

Createatablefromtheconsole,executetheimportcommand

>awsdynamodbbatch-write-item--request-itemsfile:///home/ubuntu/data/account-dynamo.json

at'requestItems'failedtosatisfyconstraint:Mapvaluemustsatisfyconstraint:[Membermusthavelengthlessthanorequalto25,Membermusthavelengthgreaterthanorequalto1]

Haha,inthedocs,allthesamplearelessthan25items.

DirectlywriteNodeJStoparsetheJSONfileanddotheimportworks

"usestrict";

//Howtorun

//nodedynamodb-scripts/import-devices-to-dynamodb.js{ENV}./css_devices_only_csv.txt

//eg:nodedynamodb-scripts/import-devices-to-dynamodb.jsint./css_devices_only_csv.txt

varimportDevicesToDynamo;

process.env.AWS_SDK_LOAD_CONFIG=true;

(function(importDevicesToDynamo){

constfs=require('fs');

constbabyparse=require("babyparse");

constAWS=require("aws-sdk");

constlog4js=require('log4js');

constlogger=log4js.getLogger();

constsleep=require('sleep');

constenv=process.argv[2];//Mustbeint,stageorprod

constcsvFilePath=process.argv[3];

constconfig={

delimiter:',',

newline:"",

quoteChar:'"',

header:true,

dynamicTyping:false,

preview:0,

encoding:"utf8",

worker:false,

comments:false,

skipEmptyLines:true

};

lettableName=`lifesize_device-${env}-devicePairingInfo`;

letaccessKey="";

letsignatureKey="";

letregion="";

letdynamoDbUrl="";

//validateparameters

if(!env){

console.log("\nMustpassinenvironmentfor1stargument.Mustbeoneof'int,'stage'or'prod'");

console.log("\nUsage-nodedynamodb-scripts/import-devices-to-dynamodb.js{env}{csvpath/file}");

console.log("\nExample-nodedynamodb-scripts/import-devices-to-dynamodb.jsint./css_devices_only_csv.txt\n");

process.exit(1);

}

if(!csvFilePath){

console.log("\nMustpassincsvFilePathfor2ndargument.");

console.log("\nUsage-nodedynamodb-scripts/import-devices-to-dynamodb.js{env}{csvpath/file}");

console.log("\nExample-nodedynamodb-scripts/import-devices-to-dynamodb.jsint./css_devices_only_csv.txt\n");

process.exit(2);

}

console.log("Env="+env);

console.log("Filetoimport="+csvFilePath);

letcontent=fs.readFileSync(csvFilePath,config);

letparsed=babyparse.parse(content,config);

letrows=JSON.parse(JSON.stringify(parsed.data));

console.log("Rowcount="+Object.keys(rows).length);

let_id;

//Forthebatchsizeof10,weneedtotemporarilychangethewritecapacityunitsto50inDynaoDBfortheappropriatetable,thenresettodefaultwhenscriptisfinished

letsize=10;

console.log("dynamoDbURL="+dynamoDbUrl);

console.log("tablename="+tableName);

varcredentials=newAWS.SharedIniFileCredentials();

AWS.config.credentials=credentials;

constdynamoDb=newAWS.DynamoDB.DocumentClient();

letuniqueSerialNumbers=[];

for(leti=0;i<rows.length;i+=size){

//Slicethearrayintosmallerarraysof10,000items

letsmallarray=rows.slice(i,i+size);

console.log("i="+i+"serialNumber="+smallarray[0].serialNumber);

letbatchItems=smallarray.map(function(item){

try{

constserialNumber=item.serialNumber;

if(uniqueSerialNumbers.includes(serialNumber)){

//console.log("Systemignoreduplicatedrecord",item);

returnnull;

}else{

uniqueSerialNumbers.push(serialNumber);

}

//Replaceemptystringvalueswithnull.DynamoDBdoesn'tallowemptystrings,willthrowerroronrequest.

for(letitemsinitem){

letvalue=item[items];

if(value===undefined||value===""){

item[items]=null;

}

if(items=="enabled"){

if(value==="f"){

item[items]=false;

}elseif(value==="t"){

item[items]=true;

}

}

}

item.adminAccountUUID=null;

item.sessionID=null;

item.pairingCodeCreateTime=null;

if(item.systemName===null){

item.systemName=item.userExtension.toString()

}

if(item.pairingstatus==='DEFAULT'){

item.pairingstatus="COMPLETE"

}

if(item.plaform==='GRAPHITE'){

item.deviceUUID=item.serialNumber

}

if(item.userExtension&&!item.extension){

item.extension=item.userExtension.toString();

console.log(`++++++++++++++++++++++++++++++++++++`);

}

letparams={

PutRequest:{Item:JSON.parse(JSON.stringify(item))}

};

console.log("params="+JSON.stringify(params,null,2));

returnparams;

}

catch(error){

console.log("****ERRORprocessingfile:"+error);

}

}).filter((obj)=>

obj!==null

);

if(batchItems.length===0){

console.log("Systemfilterallthedupicatedata,nothingleft");

continue;

}

letbatchRequestParams='{"RequestItems":{"'+tableName+'":'+JSON.stringify(batchItems)+'},"ReturnConsumedCapacity":"TOTAL","ReturnItemCollectionMetrics":"SIZE"}';

console.log("batchRequestParams============================================================");//+batchRequestParams);

callDynamo(batchRequestParams).then(function(data){

sleep.msleep(100);

}).catch(console.error);

}

functioncallDynamo(batchRequestParams){

returnnewPromise(function(resolve,reject){

dynamoDb.batchWrite(JSON.parse(batchRequestParams),function(err,data){

try{

if(err){

logger.error(`Error-${err}=Tryingagain:`);

sleep.msleep(100);

dynamoDb.batchWrite(JSON.parse(batchRequestParams),function(err,data){

try{

if(err){

//console.log("-------------dataisbeauty:",batchRequestParams);

logger.error("Unabletoadditema2ndtime,Error:",err);

returnreject(err);

}

else{

logger.debug("2ndPutItemsucceeded");

resolve(data);

}

}

catch(error){

//console.log("-------------dataishere:",batchRequestParams);

console.log("errorcallingDynamoDB-"+error);

returnreject(err);

}

});

}

else{

logger.debug("PutItemsucceeded");

resolve(data);

}

}

catch(error){

console.log("errorcallingDynamoDB-"+error);

returnreject(err);

}

});

});

}

})(importDevicesToDynamo||(importDevicesToDynamo={}));

References:

https://github.com/audienceproject/spark-dynamodb

https://stackoverflow.com/questions/37444607/writing-from-spark-to-dynamodb

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/SampleData.LoadData.html

https://github.com/lmammino/json-dynamo-putrequest

相关推荐