Query QLDB With API Gateway and Lambda (Nodejs)
- 6 minutes read - 1112 wordsBackground
In a previous blog post I wrote about creating a ledger with a table and index in Amazon QLDB. This post expands on that by providing a simple API to create a new vehicle record, and get an existing vehicle record. The code all exists as infrastructure as code in this Github Repo.
Serverless Framework
The code uses the Serverless Framework to set up all the resources. The configuration for the create vehicle
function is shown below:
functions:
createVehicle:
name: create-vehicle-${self:provider.stage}
handler: functions/create-vehicle.handler
events:
- http:
path: /vehicle
method: post
iamRoleStatements:
- Effect: Allow
Action:
- qldb:SendCommand
Resource: arn:aws:qldb:#{AWS::Region}:#{AWS::AccountId}:ledger/qldb-simple-demo-${self:provider.stage}
environment:
LEDGER_NAME: qldb-simple-demo-${self:provider.stage}
There are a number of things to point out here:
- This automatically configures an API Gateway endpoint. The create vehicle function is run using a lambda proxy integration whenever an HTTP POST is made to the /vehicle endpoint.
- I am using a seperate IAM role for each function as a best practice for least privilege. This role needs to be able to call the
qldb:SendCommand
action to issue a command on the ledger, otherwise you will end up with the following exception:
AccessDeniedException: User: xxx is not authorized to perform: qldb:SendCommand on resource: xxx
- There is a LEDGER_NAME lambda environment variable set up
Create Vehicle
The first goal was to insert a new vehicle record into a table in QLDB. We already have a Vehicle table configured with an index on the VRN attribute.
As a basic example, I want the VRN attribute to be unique. QLDB automatically assigns each document that gets inserted a unique document id that appears in the metadata section. There is no way to mark an attribute as being unique and enforce it through the ledger, in the same way you might using a primary key
in AWS RDS or a partition key
in DynamoDB.
Instead, the way to guarantee uniqueness of an attribute is done client side. This means carrying out a select first and ensuring the record doesn’t exist, before inserting the new document. If two competing transactions carry out the select, only one of them will succeed in committing the transaction, and the other will fail under Optimistic Concurrency Control.
I created the following function to carry out the select first, and return the number of records found:
async function checkVRNUnique(txn, vrn) {
const query = `SELECT VRN FROM Vehicle AS v WHERE v.VRN = ?`;
const vrnWriter = createQldbWriter();
writeValueAsIon(vrn, vrnWriter);
let recordsReturned;
await txn.executeInline(query, [vrnWriter]).then((result) => {
recordsReturned = result.getResultList().length;
});
return recordsReturned;
}
The function above gets called first when an attempt is made to create a new record. This is all handled in the createVehicle function shown below:
const createVehicle = async (vrn, make, model, colour ) => {
const VEHICLE = [{"VRN": vrn, "Make": make, "Model": model, "Colour": colour }];
let session;
let result;
let responseMessage;
try {
session = await createQldbSession();
await session.executeLambda(async (txn) => {
const recordsReturned = await checkVRNUnique(txn, vrn);
log(`Number of records found for ${vrn} is ${recordsReturned}`);
if (recordsReturned === 0) {
result = await insertNewVehicleRecord(txn, VEHICLE);
prettyPrintResultList(result.getResultList());
responseMessage = `New vehicle record with VRN ${vrn} created`;
} else {
responseMessage = `Vehicle record with VRN ${vrn} already exists. No new record created`;
}
}, () => log("Retrying due to OCC conflict..."));
} catch (e) {
error(`Unable to create vehicle record: ${e}`);
} finally {
closeQldbSession(session);
}
return responseMessage;
}
Note that this is a fairly basic example.
The first step is to create a QldbSession
. The session is created using the PooledQldbDriver
. This is recommended as it provides session pooling capabilities and convenience methods for handling Optimistic Concurrency Control retries. The PooledQldbDriver
itself takes in the QLDB ledger name to connect to, as well as options for configuring the low level client.
The executeLambda
function call implicitly starts a transaction, excutes the lambda, and commits the transaction, whilst retrying up to the retry limit in some exception scenarios like an OCC conflict.
Get Vehicle
The final goal was to be retrieve the current details for a given record, and the main code for this function is shown below:
const getVehicle = async (vrn) => {
let result;
let responseMessage;
let session;
try {
session = await createQldbSession();
await session.executeLambda(async (txn) => {
result = await getVehicleByVRN(txn, vrn);
const resultList = result.getResultList();
if (resultList.length === 0) {
responseMessage = `No vehicle found: ${vrn}.`;
throw new Error(`No vehicle found: ${vrn}.`);
} else if (resultList.length > 1) {
responseMessage = `More than one vehicle found: ${vrn}.`;
throw new Error(`More than one vehicle found: ${vrn}.`);
} else {
const writer = makeTextWriter();
resultList.forEach((reader) => {
writer.writeValues(reader);
});
writer.close();
let ionReader = makeReader(decodeUtf8(writer.getBytes()));
ionReader.next();
ionReader.stepIn(); // Step into the list.
let stringBuilder = '{';
let FIRST_LOOP = true;
while (ionReader.next() != null) {
if (FIRST_LOOP) {
stringBuilder += `'${ionReader.fieldName()}':'${ionReader.stringValue()}'`;
FIRST_LOOP = false;
} else {
stringBuilder += `, '${ionReader.fieldName()}':'${ionReader.stringValue()}'`;
}
}
stringBuilder += '}';
responseMessage = stringBuilder.toString();
}
}, () => log("Retrying due to OCC conflict..."));
} catch (e) {
log(`Error retrieving documents: ${e}`);
} finally {
closeQldbSession(session);
}
return responseMessage;
}
There are a number of important points to note from this:
- The query to the driver returns a
Result
class, that represents the fully buffered set of results returned from QLDB. ThegetResultList()
method returns the list of results of the statement execution. Each record returned is aReader
object that wraps the Ion values. This means the size of the array shows how many records where returned. - The data returned in the value stream with text encoding was a sequence of Unicode code points with a UTF-8 encoding. There are issues down-converting from Ion to JSON as Ion has a richer type system. The Ion Java library provides support for rewriting from Ion to JSON, but this does not exist in the Node library - see Down Converting to JSON.
- I used two steps to format a response message. The first step was to create an
IonTextWriter
, iterate through theReader
object, and write out all values. The second step was to get the UTF-8 encoded buffer and decode it. I then stepped into the structure and retrieved each field name and value.
Observations
There is still a lot more to do with this demo, to make it fit for purpose. This includes request and response schema validation, use of mapping templates to use HTTP status codes in the API Gateway, and so on. I also want to look further into the best approach for picking out multiple values returned from a request to QLDB, and see if I can build out some better helper methods. However, for anyone looking to get going with API Gateway, Lambda and QLDB using Nodejs, I’m hoping this will help.