--[[ ------------------------------------------------------------- Raw Latency --------------------------------------------------------------- $Header --------------------------------------------------------------- Program Name : RawLatency Function : Script executed by Geneos netprobe : to compare tick Time to System Time Author : Richard Gould Language : Lua (5.1) Creation : 01/02/2015 History : 01/02/2015 0.00 -> 1.00 RG Creation 25/06/2015 1.00 -> 2.00 RG Adaption 2015/10/07 2.00 -> 3.01 EAM ReWrite 1) Removed unused functions. 2) Added in monitoring of feed status every sample if not up it will not process ticks. 3) ReWrote Calclatency to convert both inputs as opposed to expecting one to already be converted. Also made it so that it returns nil if something is wrong. 4) ReWrote process ticks to check if there are ticks before doing work. Added in logic to throw away bad latency (nil) 5) Added in really basic debug logging 6) Tried to comment existing code, rename functions 7) Changed Date format to an ISO format 2015/10/14 3.01 -> 3.02 EAM Enhancement 1) Added in a rolling average latency that keeps track of X valid samples of latency and provides rolling average. 2) Finished commenting all sections ]]-- _______________________________________________________________ -- --------------------------------------------------------------- local Program = "RawLatency" local Version = "3" local Revision = "02" local debugScript = false -- _______________________________________________________________ -- --------------------------------------------------------------- local sa = require "geneos.sampler" local md = require "geneos.marketdata" local lt = require "geneos.latency" local he = require "geneos.helpers" local sc = require "geneos.sampler.commands" -- _______________________________________________________________ -- --------------------------------------------------------------- -- Config file is pathed by parameter from Gateway local ConfigDir = sa.params[ 'ConfigDir' ] package.path = package.path .. ";" .. ConfigDir .. "?.lua" local config = require "config" -- _______________________________________________________________ -- --------------------------------------------------------------- -- Parse the gateway command for parameters -- if not found set a default value local feedName = sa.params[ 'feedName' ] or "bloomberg" local viewName = sa.params[ 'viewName' ] or "Raw Latency" local offset = sa.params[ 'offset' ] or "0" local rollingAvgCount = sa.params[ 'rollingAvgCount' ] or 5 -- _______________________________________________________________ -- --------------------------------------------------------------- -- Parse the config file for parameters -- offset is timezone offset, default of 0 -- instruments and fields are within feed pragma config.offset = tonumber( offset ) -- _______________________________________________________________ --[[ ------------------------------------------------------------- function get_timezone Auto calculate TZ offset @@return offset from GMT NOT USED YET ]] local function get_timezone() local now = os.time() return os.difftime(now, os.time(os.date("!*t", now))) end -- _______________________________________________________________ --[[ ------------------------------------------------------------- function fn_feedStatus Returns whether the feed is connected or not OK (Feed is up and running) Not started (Feed has not been started) Warning: Connection initialising (Feed has been started but is not fully connected) Other library errors as well @@param feed The feed object @@return status The status of the feed ]] local function fn_feedStatus(feed) if (debugScript) then sa.logMessage( "DEBUG","ent fn_feedStatus") end -- Currently not processing this status, just passing thru -- Givin diff formats we may need to normalize this local status = feed:getStatus() if (debugScript) then sa.logMessage( "DEBUG","ext fn_feedStatus") end return (status) end -- end fn_feedStatus -- _______________________________________________________________ -- --------------------------------------------------------------- -- Start initializing the feed -- Set the feed name local MyFeed = config[ feedName ] -- Create the feed object local Feed = md.addFeed( feedName, MyFeed ) -- Start the feed Feed:start() -- Assign the feedstatus to a variable local feedStatus = fn_feedStatus( Feed ) -- A table to map instrument names to the last tick seen for each instrument local lastTick = {} -- A counter for all the ticks we have processed local totalTicks = 0 -- _______________________________________________________________ -- --------------------------------------------------------------- -- Create the dataview -- setup the column titles local cols = { "instrument", "minLat", "avgLat", "maxLat", "avgRollingLat","ticksPerSample", "lastUpdate" , "secsSinceLastUpdate"} local view = assert( sa.createView( viewName, cols ) ) -- publish 1st dataview view.headline.feed = feedName view.headline.totalTicks = totalTicks view.headline.connection = feedStatus view.headline.now = he.formatTime( "%Y-%m-%d %H:%M:%S", he.gettimeofday() ) view.headline.offset = config.offset view.headline.version = Version .. "." .. Revision assert(view:publish()) -- _______________________________________________________________ --[[ ------------------------------------------------------------- function fn_makeEpochSeconds Takes in a human readable datestring and converts it to unix epoch seconds @@param dateString @@return Unix Epoch Seconds ]] local function fn_makeEpochSeconds( dateString ) if (debugScript) then sa.logMessage( "DEBUG","ent fn_makeEpochSeconds") end local divider = 1000000 local pattern = "(%d+)-(%d+)-(%d+) (%d+):(%d+):(%d+).(%d+)" local xyear, xmonth, xday, xhour, xminute, xsecond, xmicro = dateString:match( pattern ) if (xsecond == nil) then pattern = "(%d+):(%d+):(%d+)[^%d](%d+)" xhour, xminute, xsecond, xmicro = dateString:match( pattern ) if (dateString:len() < 14) then divider = 1000 end end local now = os.date("!*t", os.time()) if ( xyear ) then now.year = xyear end if ( xmonth ) then now.month = xmonth end if ( xday ) then now.day = xday end if ( xhour ) then now.hour = xhour end if ( xminute ) then now.min = xminute end if ( xsecond ) then now.sec = xsecond end if ( xmicro == nil) then xmicro = 0 end -- converts it to Unix epoch seconds local convertedTimestamp = os.time( now ) if ( xmicro ~= 0 ) then xmicro = xmicro / divider end if (debugScript) then sa.logMessage( "DEBUG","ext fn_makeEpochSeconds") end return convertedTimestamp + xmicro end -- end fn_makeEpochSeconds -- _______________________________________________________________ --[[ ------------------------------------------------------------- function - fn_getMillis @@param timestamp in seconds.microseconds @@return todayMilSecs number of milliseconds since "midnight" ]] local function fn_getMillis( timestamp ) if (debugScript) then sa.logMessage( "DEBUG","ent fn_getMillis" ) end -- Round the Epoch seconds to milliseconds local millis = math.floor( ( timestamp * 1000 ) + 0.5 ) -- Since midnight -- 24 * 60 * 60 * 1000 = 86400000 local todayMilSecs = millis % 86400000 if (debugScript) then sa.logMessage( "DEBUG","ext fn_getMillis" ) end return todayMilSecs end -- end fn_getMillis" -- _______________________________________________________________ --[[ ------------------------------------------------------------- function fn_calcLatency Calculate the difference between timestamp and today time values and convert timestamp into 99.999 format offset = time zone offset from config.lua @@param tsSource The SrcTime from the feed api @@param tsDest The dest timestamp as our library has set it. should be > than tsSource but this is not a guarantee @@return rc nil if either input is bad, otherwise the diff of the times ]] local function fn_calcLatency( tsSource, tsDest ) if (debugScript) then sa.logMessage( "DEBUG","ent fn_calcLatency") end local rc = 0 local mDest = 0 local mSource = 0 if ( tsSource == nil or tsDest == nil ) then -- if either input is nil this is invalid rc = nil else mDest = fn_getMillis( tsDest ) if ( string.find( tsSource, ":" ) ) then -- this is a string, convert it to epoch seconds first mSource = fn_getMillis( fn_makeEpochSeconds( tsSource ) + offset ) rc = ( mDest - mSource ) / 1000 elseif ( type( tsSource ) == "number") then -- this is already in epoch seconds or milliseconds mSource = math.log10( tsSource ) > 8 and fn_getMillis( tsSource ) or tsSource rc = ( mDest - mSource ) / 1000 else -- We don't recognize this format sa.logMessage( "INFO","calcLatency: time format not recognized " .. tsSource ) rc = nil end end --sa.logMessage("INFO","calc today "..today.." ts "..ts) if (debugScript) then sa.logMessage( "DEBUG","ext fn_calcLatency") end return ( rc ) end -- end fn_calcLatency -- _______________________________________________________________ --[[ ------------------------------------------------------------- function - fn_calcRollingAverage @@param instrument Name of instrument @@return rollingAvg The average over the stored latencies ]] local function fn_calcRollingAverage ( instrument ) -- calc rolling latency local rollingTotal=0 local rollingCount=0 if next(lastTick[instrument]['rollingAvgT']) ~= nil then for i, v in ipairs(lastTick[instrument]['rollingAvgT']) do rollingTotal = rollingTotal + v['total'] rollingCount = rollingCount + v['count'] end end local rollingAvg = nil -- Dividing by 0 is bad if (rollingCount ~= nil and rollingCount > 0) then rollingAvg = math.floor( ( 1000 * rollingTotal / rollingCount ) + 0.5 ) / 1000 end return (rollingAvg) end -- end fn_calcRollingAverage -- _______________________________________________________________ --[[ ------------------------------------------------------------- function - fn_processTicks @param instrument The name of the instrument to process @return rowValues Array of rowvalues @return tickCount Tick count as found from the statstable ]] local function fn_processTicks( instrument ) if (debugScript) then sa.logMessage( "DEBUG","ent fn_processTicks") end -- First time through tick will be null -- so create row and populate lastUpdate and generate latency column local tick = lastTick[ instrument ] --or { field = { lastUpdate = "Never" }, latency = 0 } -- init return array local rowValues = {} -- Check the higher level stats first local statsTable = Feed:getStats( instrument ) local tickCount = statsTable.currentTicks -- No new ticks found if (tickCount > 0) then tick.next = Feed:getTicks( instrument ) local lastSrcTime = 0 local latMin = nil local latMax = nil local latAvg = 0 local latTotal = 0 local validTickCount = 0 -- only inc if latncy is returned -- iterate over the ticks while tick.next do tick = tick.next -- if the srctime and time seen are not nil if (tick.field.SrcTime ~= nil and tick.timeFirst ~= nil) then -- Calculate the latency -- It is possible for this to be negative if the -- source and dest times are not in sync latency = fn_calcLatency( tick.field.SrcTime, tick.timeFirst ) if ( latency == nil ) then -- this is not a valid tick as there was error processing latency sa.logMessage( "ERROR","invalid tick" ) else -- this is valid tick -- Set Min Latency if ( latMin == nil or latency < latMin ) then latMin = latency end -- Set Min Latency if ( latMax == nil or latency > latMax ) then latMax = latency end -- Add total and count for average latTotal = latTotal + latency validTickCount = validTickCount + 1 -- Rolling Average Calc if (#lastTick[instrument]['rollingAvgT'] == rollingAvgCount) then -- pop off first element table.remove(lastTick[instrument]['rollingAvgT'],1) end -- add to end of table table.insert(lastTick[instrument]['rollingAvgT'],{ total=latTotal, count=validTickCount }) -- mark off the first time of the last valid tick to be the last update time lastTick[instrument]['lastUpdate'] = he.formatTime( "%H:%M:%S.%6f", tick.timeFirst ) lastTick[instrument]['lastUpdateEpoch'] = tick.timeFirst end end -- end if SrcTime and timeSeen are not nil end -- end while loop -- Calculate secs since last update by reusing latency function lastTick[instrument]['secsSinceLastUpdate'] = math.floor( 10*(he.gettimeofday() - lastTick[instrument]['lastUpdateEpoch']) + .5) / 10 -- call a function to calculate the rolling average local rollingAvg = fn_calcRollingAverage(instrument) if validTickCount > 0 then latAvg = math.floor( (1000 * latTotal / validTickCount ) + .5) / 1000 rowValues = { minLat = latMin, avgLat = latAvg, maxLat = latMax, ticksPerSample = tickCount, lastUpdate = lastTick[instrument]['lastUpdate'], secsSinceLastUpdate = lastTick[instrument]['secsSinceLastUpdate'], avgRollingLat = rollingAvg } else -- do something if no valid ticks found rowValues = { ticksPerSample = tickCount, lastUpdate = lastTick[instrument]['lastUpdate'], secsSinceLastUpdate = lastTick[instrument]['secsSinceLastUpdate'], avgRollingLat = rollingAvg } end else -- No ticks found -- Calculate secs since last update by reusing latency function lastTick[instrument]['secsSinceLastUpdate'] = math.floor( 10*(he.gettimeofday() - lastTick[instrument]['lastUpdateEpoch']) + .5) / 10 -- call a function to calculate the rolling average local rollingAvg = fn_calcRollingAverage(instrument) rowValues = { ticksPerSample = tickCount, lastUpdate = lastTick[instrument]['lastUpdate'], secsSinceLastUpdate = lastTick[instrument]['secsSinceLastUpdate'], avgRollingLat = rollingAvg } end if (debugScript) then sa.logMessage( "DEBUG","ext fn_processTicks") end return rowValues, tickCount end -- end fn_processTicks -- _______________________________________________________________ -- --------------------------------------------------------------- -- Initialize the dataview and the lastTick array for name, _ in pairs ( MyFeed.instruments ) do view.row[name] = { minLat = nil, avgLat = nil, maxLat = nil, ticksPerSample = nil, lastUpdate = 'Never', secsSinceLastUpdate = nil, avgRollingLat = nil } lastTick[name] = { lastUpdate = 'Never', lastUpdateEpoch = he.gettimeofday(), rollingAvgT = {}, } end -- _______________________________________________________________ -- --------------------------------------------------------------- -- Run the sampler sa.doSample = function() if (debugScript) then sa.logMessage( "DEBUG","ent doSample") end -- Query the feed status feedStatus = fn_feedStatus( Feed ) -- Update the headline to the current connection status view.headline.connection = feedStatus -- update the current time view.headline.now = he.formatTime( "%Y-%m-%d %H:%M:%S", he.gettimeofday() ) -- Only process ticks if the feed is connected if (feedStatus == 'OK') then -- foreach instrument for name, _ in pairs( MyFeed.instruments ) do -- process all ticks for this instrument local rowResult, tickCount = fn_processTicks( name ) if (tickCount > 0) then -- if 1 or more ticks returned update entire row view.row[ name ] = rowResult else -- 0 ticks returned -- update only certain stats view.row[name] = { lastUpdate = rowResult['lastUpdate'] , ticksPerSample = tickCount, secsSinceLastUpdate = rowResult['secsSinceLastUpdate'], avgRollingLat = rowResult['avgRollingLat'] } end totalTicks = totalTicks + tickCount end -- update the total tick headline variable view.headline.totalTicks = totalTicks else -- What to do if the feed is not connected? -- TBD end -- publish to gateway assert(view:publish()) if (debugScript) then sa.logMessage( "DEBUG","ext doSample") end end -- end doSample -- _______________________________________________________________