LineRate and Redis pub/sub

Using the pre-installed Redis server on LineRate proxy, we can use pub/sub to push new configuration options and modify the layer 7 data path in real time.

Background

Each LineRate proxy has multiple data forwarding path processors; each of these processors runs an instance of the Node.js scripting engine. When a new HTTP request has to be processed, one of these script engines will process the request. Which actual script engine handles the request is not deterministic. A traditional approach to updating a script's configuration might be to embed an onRequest function in your script to handle an HTTP POST with new configuration options. However, given that only a single script engine will process this request, only the script running on that engine will 'see' the new options. Scripts running on any other engines will continue to use old options. Here we show you and easy way to push new options to all running processes.

How to

If you're not already familiar with Node.js and the LineRate scripting engine, you should check out the LineRate Scripting Developer's Guide.

Let's jump into the Redis details. Redis is already installed and running on your LineRate proxy, so you can begin to use it immediately.

If you're not familiar with the pub/sub concept, it's pretty straight-forward. The idea is that a publisher publishes a message on a particular channel and any subscribers that are subscribed to that same channel will receive the message. In this example, the message will be in JSON format. Be sure to take note of the fact that if your subscriber is not listening on the channel when a message is published, this message will never be seen by that subscriber. I've added some additional code that will store the published options in redis. Any time a subscriber connects, it will check for these values in redis and use them if they exist. This way, any subscriber will always use the most recently published options.

Here is the code that will be added to the main script to handle the Redis subscribe function:

// load the redis module
var redis = require("redis");

// create the subscriber redis client
var sub = redis.createClient(); 

// listen to and process 'message' events 
sub.on('message', function(channel, message) {
    // ...

    // get 'message' and store in 'opts'
    var opts = JSON.parse(message); 

    // ...
});

// subscribe to the 'options' channel
sub.subscribe('options');

We can now insert these code pieces into a larger script that uses the options published in the options message. For example, you might want to use LineRate for it's ability to do HTTP traffic replication. You could publish an option called "load" that controls what percentage of requests get replicated. All script engines will 'consume' this message, update the load variable and immediately start replicating only the percentage of requests that you specified. It just so happens we have some code for this. See below in the section

sampled_traffic_replication.js
for a fully commented script that does sampled traffic replication using parameters received from Redis.

It should also be mentioned that if you have multiple LineRate's all performing the same function, they can all subscribe to the same Redis server/channel and all take the appropriate action - all at the same time, all in real-time.

Of course we need a way to actually publish new options to the 'options' channel. This could be done in myriad ways. I've included a fully commented Node.js script below called

publish_config.js
that prompts the user for the appropriate options on the command line and then publishes those options to the 'options' channel.

Lastly, here's a sample proxy config snippet for LineRate that you would need:

[...]

virtual-server vsPrimary
  attach vipPrimary default
  attach real-server group ...

virtual-server vsReplicate
  attach vipReplicate default
  attach real-server group ...

virtual-ip vipPrimary
  admin-status online
  ip address 192.0.2.10 80

virtual-ip vipReplicate
  admin-status online
  ip address 127.0.0.1 8080

[...]

sampled_traffic_replication.js

"use strict";

var vsm = require('lrs/virtualServerModule');
var http = require('http');
var redis = require("redis"),
    sub = redis.createClient();

// define initial config options
var replicateOptions = {
  ip: '127.0.0.1',
  port: 8080,
  // Replicate every pickInterval requests
  // by default, don't replicate any traffic
  pickInterval: 0
};

// pubsub error handling
sub.on("error", function(err) {
  console.log("Redis subscribe Error: " + err);
});

// if options already exist in redis, use them
sub.on("ready", function() {
    sub.get('config_options', function (err, reply) {
        if (reply !== null) {
            process_options(reply);
        }
    });
    // switch to subscriber mode to
    // listen for any published options
    sub.subscribe('options');
});

function process_options(opts) {
  opts = JSON.parse(opts);

  // only update config params if not null
  if (opts.virtual_server_ip) {
    replicateOptions.ip = opts.virtual_server_ip;
  }
  if (opts.virtual_server_port) {
    replicateOptions.port = opts.virtual_server_port;
  }

  // convert 'load' to 'pickInterval' for internal use
  if (opts.load) {
      // convert opts.load string to int
      opts.load = parseInt(opts.load, 10);
      if (opts.load === 0) {
       replicateOptions.pickInterval = opts.load;
      } else {
       replicateOptions.pickInterval = Math.round(1 / (opts.load / 100));
      }
   }

    console.log("Updated options: " + JSON.stringify(replicateOptions));
}

var tapServerRequest = function(servReq, servResp, options, onResponse) {

  // There is no built-in method to clone a request, so we must generate a new
  // request based on the existing request and send it to the replicate VIP
  var newReq = http.request({ host: options.ip,
                              port: options.port,
                              method: servReq.method,
                              path: servReq.url},
                              onResponse);
  servReq.bindHeaders(newReq);
  servReq.pipe(newReq);

  return newReq;
}

// Listen for config updates on 'options' channel
sub.on('message', function(channel, message) {
  console.log(process.pid + ' Updating config(' + channel + ': ' + message);
  process_options(message);
});

vsm.on('exist', 'vsPrimary', function(vs) {
  console.log('Replicate traffic script installed on Virtual Server: ' + vs.id);

  var reqCount = 0;

  vs.on('request', function(servReq, servResp, cliReq) {
    var tapStart = Date.now();
    var to;
    var aborted = false;

    reqCount++;

    //
    // decide if request should be replicated; replicate if so
    //
    // be sure to handle "0" load scenario gracefully
    //
    if (replicateOptions.pickInterval && reqCount % replicateOptions.pickInterval == 0) {
      var newReq = tapServerRequest(servReq, servResp, replicateOptions, function(resp) {

        // a close event indicates an improper connection termination
        resp.on('close', function(err) {
          console.log('Replicated response error: ' + err);
        });
        resp.on('end', function(err) {
          console.log('Replicated response error:' + aborted);
          clearTimeout(to);
        });
      });

      //
      // if replicated request timer exceeds 500ms, abort the request
      //
      to = setTimeout(function() {
        aborted = true;
        newReq.abort();
      }, 500);
    }

    //
    // also send original request along the normal data path
    //
    servReq.bindHeaders(cliReq);
    servReq.pipe(cliReq);
    cliReq.on('response', function(cliResp) {
      cliResp.bindHeaders(servResp);
      cliResp.fastPipe(servResp);
    });
  });
});

publish_config.js

"use strict";

var prompt = require('prompt');
var redis = require('redis');

// note you might need Redis server connection
// parameters defined here for use in createClient()
// method.

var redis_client = redis.createClient();

var config_channel = 'options';

var schema = {
    properties: {
      virtual_server_ip: {
        description: 'Virtual Server IP',
        pattern: /^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$/,
        message: 'Must be an IP address',
        required: false
      },
      virtual_server_port: {
        description: 'Virtual Server port number',
        pattern: /^[0-9]{1,5}$/,
        message: 'Must be an integer between 1 and 65535',
        require: false
      },
      load: {
        description: 'Percentage of load to replicate',
        pattern: /^[0-9]{1,3}$/,
        message: 'Must be an integer between 0 and 100',
        require: false
      }
    }
};

prompt.start();

function get_info() {

    console.log('Enter values; leave blank to not update');

    prompt.get(schema, function (err, result) {

        if (err) {
            return prompt_error(err);
        }

        //
        // TODO: do some input validation here
        //

        var config = JSON.stringify(result)

        console.log('Sending: ' + config)

        // publish options via pub/sub
        redis_publish_message(config_channel, config);
        redis_client.end();

      });
}

function redis_publish_message(channel, message) {
    // publish
    redis_client.publish(channel,message);
    // also store JSON message in redis
    redis_client.set("config_options", message)
}

function prompt_error(err) {
    console.log(err);
    return 1;
}

//main
get_info();

Additional Resources

  1. Download LineRate
  2. LineRate Scripting Developer's Guide
  3. LineRate solution articles
  4. LineRate DevCentral
Updated Jun 06, 2023
Version 2.0

Was this article helpful?

No CommentsBe the first to comment