Write lambda function

You will now copy the files used by the Lambda function into your Cloud9 IDE.

Executing the following commands from your Cloud9 terminal.

mkdir -p ~/environment/ambSupplyChainConnector/src
touch ~/environment/ambSupplyChainConnector/src/index.js
touch ~/environment/ambSupplyChainConnector/connection-profile-template.yaml
touch ~/environment/ambSupplyChainConnector/publishLambda.sh
chmod 755 ~/environment/ambSupplyChainConnector/publishLambda.sh
Select the index.js source file in your IDE to paste the code into

Paste the following code into the index.js file inside your ambSupplyChainConnector/src folder on your Cloud9 IDE. This code retrieves the necessary arguments from the AppSync caller, uses them to retrieve the user’s credentials from AWS Secrets Manager, and finally submits a query or transaction to the Fabric peer nodes, returning the result to AppSync.

'use strict';

let log = require('loglevel').getLogger('amb-supply-chain-connector');
log.setLevel('DEBUG');

const AWS = require('aws-sdk');
const FabricClient = require('fabric-client');
const path = require('path');
const fs = require("fs");


async function getSecret(key, username) {
  log.debug('begin getSecret()');
  const client = new AWS.SecretsManager({region: 'us-east-1'});
  const secretId = `amb/supplychain/${username}/${key}`;
  log.debug(`retrieving secret ${secretId}`);
  return new Promise((resolve, reject) => {
    client.getSecretValue({SecretId: secretId}, (err, data) => {
      let secret = '';
      if (err) {
        return reject(err);
      }
      if ('SecretString' in data) {
        secret = data.SecretString;
      } else {
        let buff = new Buffer(data.SecretBinary, 'base64');
        secret = buff.toString('ascii');
      }
      return resolve(secret);
    });
  });
}


async function prepFabricClient(username) {
  log.debug('begin prepFabricClient()');
	let client = FabricClient.loadFromConfig(path.join(__dirname, "./connection-profile.yaml"));
	const store_path = path.join(process.env.CRYPTO_FOLDER || '/tmp');
	const crypto_suite = FabricClient.newCryptoSuite();
	const crypto_store = FabricClient.newCryptoKeyStore({path: store_path});
	crypto_suite.setCryptoKeyStore(crypto_store);
	client.setCryptoSuite(crypto_suite);
  const privatePEM = await getSecret('pk', username);
  const signedPEM = await getSecret('signcert', username);
	const fabricUser = await client.createUser({
	  username,
	  mspid: process.env.MSP,
	  cryptoContent: {
	    privateKeyPEM: privatePEM,
	    signedCertPEM: signedPEM
	  },
	  skipPersistence: true
	});
  client.setUserContext(fabricUser, true);
  return client;
}


async function prepChannel(username) {
  log.debug('begin prepChannel()');
  const client = await prepFabricClient(username);
  const channelName = process.env.CHANNEL_NAME || 'mainchannel';
  const channel = client.getChannel(channelName, false);
  if (channel === null) {
    channel = client.newChannel(channelName);
  }
  let peer = channel.getPeers()[0];
  const pemfile = fs.readFileSync(path.resolve(__dirname, './managedblockchain-tls-chain.pem'), 'utf8');
  if (!peer) {
    log.debug('prepChannel - found no peers');
    let peerEndpoints = process.env.PEER_ENDPOINTS.split(',');
    for (let i in peerEndpoints) {
      channel.addPeer(client.newPeer(peerEndpoints[i], {pem: pemfile}));
    }
  }
  let orderer = channel.getOrderers()[0];
  if (!orderer) {
    log.debug('prepChannel - found no orderer');
    orderer = client.newOrderer(process.env.ORDERER_ENDPOINT, {pem: pemfile});
    channel.addOrderer(orderer);
  }
  return channel;
}


class FabricConnector {
  constructor(options) {
    log.debug('FabricConnector.constructor()');
    if (typeof options === 'undefined')
      throw new Error('FabricConnector constructor should not be called directly--use FabricConnector.build() instead.');
    ['username', 'request','channel','client'].forEach(varName => {
      this[varName] = options[varName];
    });
    this.transactionId = null;
    this.results = [];
    this.proposalResponses = [];
    this.eventHubs = [];
  }

  static async build(options = {}) {
    log.debug('FabricConnector.build()');
    const initParams = {
      username: options.username,
      request: options.request,
      channel: await prepChannel(options.username),
      client: await prepFabricClient(options.username)
    };
    return new FabricConnector(initParams);
  }

  async query(requestUpdates = {}) {
    const request = {...this.request, ...requestUpdates};
    return this.channel.queryByChaincode(request)
      .then(responses => {
        // query_responses could have more than one results
        // if multiple peers were used as targets
        let result;
        for (let i in responses) {
          if (responses[i] instanceof Error)
            throw new Error(responses[i]);
          result = responses[i].toString();
        }
        return result;
      })
      .catch(err => {
        log.error('Query error: ' + err);
        throw err;
      });
  }

  prepTransaction() {
    this.transactionId = this.client.newTransactionID();
    log.debug(`created transactionId = ${this.transactionId.getTransactionID()}`);
    this.request['txId'] = this.transactionId;
    this.request['targets'] = this.channel.getPeers();
  }

  async proposeTransaction() {
    log.debug('calling channel.sendTransactionProposal()');
    this.results = await this.channel.sendTransactionProposal(this.request);
    this.responses = this.results[0];
    this.proposal = this.results[1];
  }

  responsesAreValid() {
    return this.responses.every(e => e && e.response && e.response.status === 200);
  }

  getInvokeEventPromise(hub) {
    const EVENT_HUB_TIMEOUT = 10000; // 10 seconds

    return new Promise((resolve, reject) => {
      log.debug('Setting up invoke promise');
      const eventTimeout = setTimeout(() => {
        log.error(`REQUEST_TIMEOUT: ${hub.getPeerAddr()}`);
        hub.disconnect();
      }, EVENT_HUB_TIMEOUT);

      const successFunc = (tx, code, blockNum) => {
        log.debug(`The invoke transaction has been committed on peer ${hub.getPeerAddr()}`);
        log.debug(`Transaction ${tx} has status of ${code} in block ${blockNum}`);
        clearTimeout(eventTimeout);
        if (code !== 'VALID') {
          const msg = `The invoke tx was invalid. The code was ${code}`;
          log.error(msg);
          return reject(new Error(msg));
        } else {
          const msg = `The invoke tx was valid.`;
          log.debug(msg);
          return resolve(msg);
        }
      };

      const failureFunc = err => {
        clearTimeout(eventTimeout);
        log.error(err);
        reject(err);
      };

      hub.registerTxEvent(
        this.transactionId.getTransactionID(),
        successFunc,
        failureFunc,
        { unregister: true, disconnect: true }
      );
      hub.connect();
    });
  }

  getOrdererRequestPromise() {
    let ordererRequest = {
      txId: this.transactionId,
      proposalResponses: this.responses,
      proposal: this.proposal
    };
    return this.channel.sendTransaction(ordererRequest);
  }

  async awaitConfirmation() {
    let promises = [];
    this.eventHubs = this.channel.getChannelEventHubsForOrg();

    this.eventHubs.forEach(hub => {
      let invokeEventPromise = this.getInvokeEventPromise(hub);
      promises.push(invokeEventPromise);
    });
    promises.push(this.getOrdererRequestPromise());
    const results = await Promise.all(promises);
    return results;
  }

  reportResults(results) {
    for (let i in results) {
      let hubResult = results[i];
      let hub = this.eventHubs[i];
      log.debug(`Event result for event hub: ${hub.getPeerAddr()}: ${hubResult.toString()}`);
    }
  }

  async invoke() {
    let toReturn = null;
    try {
      this.prepTransaction();
      await this.proposeTransaction();
      if (this.responsesAreValid()) {
        const status = this.responses[0].response.status;
        const payload = this.responses[0].response.payload;
        log.info('Successfully sent proposal and received response: ' +
                  `status - ${status}, message - "${payload}"`);

        // wait for the channel-based event hub to tell us
        // that the commit was good or bad on each peer in our organization
        const results = await this.awaitConfirmation();
        let response = results.pop(); // ordering service result is last in the results
        if (response.status === 'SUCCESS') {
          toReturn = payload.toString();
          log.debug(`Successfully sent transaction to the ordering service. Payload = ${payload}`);
        } else {
          const msg = `Failed to order the transaction. Error code: ${response.status}`;
          log.error(msg);
        }
        this.reportResults(results);
      } else {
        const r = this.responses[0];
        log.error(`Failed proposal response. Status code: ${r.status}. Message: ${r.message}. Stack: ${r.stack}.`);
      }
    } catch (error) {
      log.error(`Error during invoke: ${error.toString()}`);
      throw error;
    }
    return toReturn;
  }
}


async function getProduct(event, product_id) {
  log.debug(`getProduct(${product_id})`);
  const request = {
    chaincodeId: process.env.CHAINCODE_ID || 'supplychaincc',
    fcn: 'query',
    args: [`product_${product_id}`]
  };
  const connector = await FabricConnector.build({
    username: event.identity.username,
    request: request
  });
  const queryResponse = JSON.parse(await connector.query());
  return {...queryResponse, id: product_id};
}


async function getAllProducts(event) {
  log.debug('getProducts()');
  const connector = await FabricConnector.build({
    username: event.identity.username,
    request: {
      chaincodeId: process.env.CHAINCODE_ID || 'supplychaincc',
      fcn: 'query',
      args: ['productIDs']
    }
  });
  const product_ids = JSON.parse(await connector.query());
  log.debug(`product_ids = ${JSON.stringify(product_ids)}, type = ${typeof product_ids}`);
  return Promise.all(product_ids.map(id =>
    connector.query({args: [id]}).then(product => {
      const parsedProduct = JSON.parse(product);
      const product_id = id.split('_')[1];
      return {...parsedProduct, id: product_id};
    })
  ));
}


async function updateProductState(event) {
  log.debug('updateProductState()');
  const connector = await FabricConnector.build({
    username: event.identity.username,
    request: {
      chaincodeId: process.env.CHAINCODE_ID || 'supplychaincc',
      fcn: 'updateProductState',
      args: [event.arguments.id, event.arguments.transition]
    }
  });
  const parsedProduct = JSON.parse(await connector.invoke());
  return {...parsedProduct, id: event.arguments.id};
}


async function createProduct(event) {
  log.debug('createProduct()');
  const connector = await FabricConnector.build({
    username: event.identity.username,
    request: {
      chaincodeId: process.env.CHAINCODE_ID || 'supplychaincc',
      fcn: 'createProduct',
      args: [event.arguments.id]
    }
  });
  const parsedProduct = JSON.parse(await connector.invoke());
  return {...parsedProduct, id: event.arguments.id};
}


async function dispatchRequest(event) {
  let response = {};
  if (event.info.parentTypeName === 'Query') {
    switch (event.info.fieldName) {
      case 'product': response = await getProduct(event, event.arguments.id); break;
      case 'products': response = await getAllProducts(event); break;
    }
  }
  if (event.info.parentTypeName === 'Mutation') {
    switch (event.info.fieldName) {
      case 'updateProductState': response = await updateProductState(event); break;
      case 'createProduct': response = await createProduct(event); break;
    }
  }
  log.debug(`Response from dispatchRequest was ${JSON.stringify(response)}`);
  return response;
}


exports.handler = async (event, context) => {
  log.debug(`ambSupplyChainConnector called, event = ${JSON.stringify(event)}, context = ${JSON.stringify(context)}`);
  const response = await dispatchRequest(event);
  log.debug(`response sent back from lambda was ${JSON.stringify(response)}.`);
  return response;
};

Paste the following into your connection-profile-template.yaml file in your Cloud9 IDE. This file is used by the Lambda function to determine how to connect to the Fabric blockchain network.

name: SupplyChain
x-type: hlfv1
description: Supply Chain Network
version: "1.0"

channels:
  mainchannel:
    orderers:
      - AMBSupplyChainOrderer
    peers:
      peer1:
        endorsingPeer: true
        chaincodeQuery: true
        ledgerQuery: true
        eventSource: true

organizations:
  %MEMBER_NAME%:
    mspid: %MEMBERID%
    peers:
      - peer1
    certificateAuthorities:
      - %MEMBER_NAME%CA

orderers:
  AMBSupplyChainOrderer:
    url: grpcs://%ORDERER%
    grpcOptions:
      ssl-target-name-override: %ORDERERNOPORT%
    tlsCACerts:
      path: managedblockchain-tls-chain.pem

peers:
  peer1:
    url: grpcs://%PEER1ENDPOINT%
    eventUrl: grpcs://%PEER1EVENTENDPOINT%
    grpcOptions:
      ssl-target-name-override: %PEER1ENDPOINTNOPORT%
    tlsCACerts:
      path: managedblockchain-tls-chain.pem

certificateAuthorities:
  %MEMBER_NAME%CA:
    url: https://%CASERVICEENDPOINT%
    httpOptions:
      verify: false
    tlsCACerts:
      path: managedblockchain-tls-chain.pem
    caName: %MEMBERID%

Paste the following shell script into publishLambda.sh. This file is used to generate the connection profile from the Fabric environment settings and deploy the Lambda code.

#!/usr/bin/env bash

source $HOME/.bash_profile

export SRCDIR=$HOME/environment/ambSupplyChainConnector/src
cp managedblockchain-tls-chain.pem $SRCDIR
cd $SRCDIR

# subsitute env vars in connection profile
cp ../connection-profile-template.yaml connection-profile.yaml
sed -i "s|%MEMBER_NAME%|$MEMBER_NAME|g" $SRCDIR/connection-profile.yaml
sed -i "s|%MEMBERID%|$MEMBERID|g" $SRCDIR/connection-profile.yaml
sed -i "s|%ORDERER%|$ORDERER|g" $SRCDIR/connection-profile.yaml
sed -i "s|%ORDERERNOPORT%|$ORDERERNOPORT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%PEER1ENDPOINT%|$PEER1ENDPOINT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%PEER1EVENTENDPOINT%|$PEER1EVENTENDPOINT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%PEER1ENDPOINTNOPORT%|$PEER1ENDPOINTNOPORT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%PEER2ENDPOINT%|$PEER2ENDPOINT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%PEER2EVENTENDPOINT%|$PEER2EVENTENDPOINT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%PEER2ENDPOINTNOPORT%|$PEER2ENDPOINTNOPORT|g" $SRCDIR/connection-profile.yaml
sed -i "s|%CASERVICEENDPOINT%|$CASERVICEENDPOINT|g" $SRCDIR/connection-profile.yaml

# bundle src files
zip -r ~/lambda.zip *

# create or update lambda
export LAMBDA_ROLE_ARN=$(aws iam list-roles | jq -r ".[][] | select(.RoleName == \"LambdaRoleForAmbSupplyChainSecretAccess\") | .Arn")
export LAYER_ARN=$(aws lambda list-layers | jq -r ".Layers[] | select(.LayerName == \"fabric-client-layer\") | .LayerArn")
export LAYER_VERSION=$(aws lambda list-layers | jq -r ".Layers[] | select(.LayerName == \"fabric-client-layer\") | .LatestMatchingVersion.Version")
export INTERFACE=$(curl --silent http://169.254.169.254/latest/meta-data/network/interfaces/macs/)
export SUBNETID=$(curl --silent http://169.254.169.254/latest/meta-data/network/interfaces/macs/${INTERFACE}/subnet-id)
export VPCID=$(curl --silent http://169.254.169.254/latest/meta-data/network/interfaces/macs/${INTERFACE}/vpc-id)
export SECURITY_GROUPS=$(curl --silent http://169.254.169.254/latest/meta-data/network/interfaces/macs/${INTERFACE}/security-group-ids)
export GROUPID=$(aws ec2 describe-security-groups --group-ids $SECURITY_GROUPS --filter "Name=group-name, Values=HFClientAndEndpoint" --query "SecurityGroups[0].GroupId" --output text)
export DEFAULT_GROUP_ID=$(aws ec2 describe-security-groups --filter "Name=group-name, Values=default" --query "SecurityGroups[0].GroupId" --output text)

aws lambda get-function --function-name ambSupplyChainConnector > /dev/null 2>&1
if [ $? -eq 0 ]; then
  echo "Function exists--updating..."
  aws lambda update-function-code \
    --function-name ambSupplyChainConnector \
    --zip-file fileb://$HOME/lambda.zip > /dev/null
else
  echo "Function doesn't exist--creating..."
  aws lambda create-function \
    --function-name ambSupplyChainConnector \
    --vpc-config "SubnetIds=$SUBNETID,SecurityGroupIds=$GROUPID,$DEFAULT_GROUP_ID" \
    --handler index.handler \
    --runtime "nodejs12.x" \
    --role "$LAMBDA_ROLE_ARN" \
    --timeout 30 \
    --memory-size 512 \
    --publish \
    --environment "Variables={CASERVICEENDPOINT=$CASERVICEENDPOINT,PEER1ENDPOINT=$PEER1ENDPOINT,ORDERER=$ORDERER,MEMBER_NAME=$MEMBER_NAME,MSP=$MEMBERID}" \
    --layers "$LAYER_ARN:$LAYER_VERSION" \
    --zip-file fileb://$HOME/lambda.zip > /dev/null
fi

Finally, build the Lambda function by pasting the following into the Cloud9 terminal.

cd
bash ~/environment/ambSupplyChainConnector/publishLambda.sh

The output from those commands should look like this:

carl:~/environment/ambSupplyChainConnector/fabric-client-layer $ ./publishLambda.sh
updating: connection-profile.yaml (deflated 69%)
updating: index.js (deflated 70%)
updating: managedblockchain-tls-chain.pem (deflated 34%)
Function doesn't exist--creating...