ea-ps_2084-03b/telegram.rb

289 lines
9.2 KiB
Ruby

# encoding: utf-8
# ruby: 1.9
class Telegram
attr_accessor :direction, :cast, :transmission, :node, :object, :data
# transmission types
RESERVED = 0
QUERY = 1
ANSWER = 2
SEND = 3
# object types
OBJECTS = []
OBJECTS[0] = "device type"
OBJECTS[1] = "serial no."
OBJECTS[2] = "nominal voltage (V)"
OBJECTS[3] = "nominal current (A)"
OBJECTS[4] = "nominal power (W)"
OBJECTS[6] = "article no."
OBJECTS[8] = "manufacturer"
OBJECTS[9] = "software version"
OBJECTS[19] = "device class"
OBJECTS[38] = "over voltage protection threshold (%V)"
OBJECTS[39] = "over current protection threshold (%A)"
OBJECTS[50] = "set voltage (%V)"
OBJECTS[51] = "set current (%I)"
OBJECTS[54] = "power supply control"
OBJECTS[71] = "status and actual values"
OBJECTS[72] = "status and set values"
OBJECTS[149] = "switch to bootlaoder?"
OBJECTS[150] = "unlock code 1?"
OBJECTS[151] = "unlock code 2?"
OBJECTS[152] = "write/check memory?"
OBJECTS[255] = "error"
# length of the object type
LENGTHS = []
LENGTHS[0] = 16
LENGTHS[1] = 16
LENGTHS[2] = 4
LENGTHS[3] = 4
LENGTHS[4] = 4
LENGTHS[6] = 16
LENGTHS[8] = 16
LENGTHS[9] = 16
LENGTHS[19] = 2
LENGTHS[38] = 2
LENGTHS[39] = 2
LENGTHS[50] = 2
LENGTHS[51] = 2
LENGTHS[54] = 2
LENGTHS[71] = 6
LENGTHS[72] = 6
LENGTHS[149] = 4
LENGTHS[150] = 4
LENGTHS[151] = 4
LENGTHS[152] = 8
LENGTHS[255] = 1
# possibles errors
ERRORS = []
ERRORS[0] = "no error"
ERRORS[3] = "checksum incorrect"
ERRORS[4] = "start delimiter incorrect"
ERRORS[5] = "wrong address for output"
ERRORS[7] = "object not defined"
ERRORS[8] = "object length incorrect"
ERRORS[9] = "no access permission"
ERRORS[15] = "device in lock state"
ERRORS[48] = "upper limit exceeded"
ERRORS[49] = "lower limit exceeded"
# create a query or send telegram for this object
# query if data is nil, else send data
# direction: true = control unit to device, false = device to control unit
def initialize (object, data=nil, direction=true)
# telegram direction: true = control unit to device, false = device to control unit
@direction = direction
# cast type: true = query, false = answer
@cast = direction
# device node
@node = 0
# set object
@object = object
# verify data
if direction then
if data==nil or data.empty? then
@transmission = QUERY
@data = []
else
raise "wrong data length" if LENGTHS[@object] and data.length!=LENGTHS[@object]
@transmission = SEND
@data = data
end
else
raise "wrong data length" if data.empty? or (LENGTHS[@object] and data.length>LENGTHS[@object])
@transmission = ANSWER
@data = data
end
end
# create a Telegram from the raw telegram data
def Telegram.parse (telegram)
# check there are at least 5 bytes (minimum message size)
return nil if telegram==nil
return nil if telegram.length<5
bytes = telegram.bytes.to_a # get bytes
to_return = new(bytes[2]) # new Telegram
# parse start delimiter (SD)
length = bytes[0]&0x0f # get length
to_return.direction = (bytes[0]&0x10!=0)
to_return.cast = (bytes[0]&0x20!=0)
to_return.transmission = ((bytes[0]>>6)&0x03)
# parse device node (DN)
to_return.node = bytes[1]
# parse object (OBJ)
to_return.object = bytes[2]
# parse data field
to_return.data = bytes[3..-3]
# parse checksum (CS)
checksum = (bytes[-2]<<8)+bytes[-1]
bytes[0..-3].each { |b| checksum -= b }
# run some checks
raise "wrong length. expected #{length}, got #{to_return.data.length-1}" if (to_return.transmission==SEND or to_return.transmission==ANSWER) and length != to_return.data.length-1
raise "wrong checksum. off by #{checksum}" if checksum != 0
return to_return
end
# is the telegram from the control unit to the device
def to_device?
return @direction
end
# is the telegram from the device to the control unit
def to_control?
return !@direction
end
# is the telegram a query
def query?
return @cast
end
# is the telegram an answer
def answer?
return !@cast
end
# pack all except checksum
def pack_data
# make start delimiter
raise "too much data" if data.length-1>0xf
length = nil
if @transmission==ANSWER or @transmission==RESERVED then
raise "wrong data field length. expected #{LENGTHS[@object]}, got #{data.length}" if LENGTHS[@object] and data.length>LENGTHS[@object]
end
if @data and !@data.empty? then
start = data.length-1
elsif LENGTHS[@object] then
start = LENGTHS[@object]-1
else
start = 0x0f
end
start += 1<<4 if @direction
start += 1<<5 if @cast
start += @transmission<<6
# add rest
return [start,@node,@object]+@data
end
# pack telegram as string
def pack
# calculate checksum
data = pack_data
data += [checksum>>8,checksum&0xff]
return data.pack("C*")
end
# calculate checksum
def checksum
# calculate checksum
data = pack_data
cs = 0
data.each { |b| cs += b }
return cs
end
# check packet
def check_direction
raise "wrong direction" if (@direction and !@cast) or (@cast and !(@transmission==QUERY or @transmission==SEND)) or (!@direction and @cast) or (!@cast and !(@transmission==ANSWER or @transmission==RESERVED))
end
def to_s
str = @direction ? "<" : ">"
if OBJECTS[@object] then
if @object==0 and @data.length==2 and @data[0]==0xFF then # bug in the firmware. error should use object FF but uses object 00 and first byte is FF. second byte is error"
str += " error"
else
str += " "+OBJECTS[@object]
end
else
str += " #{@object}"
end
if @data and !@data.empty? then
str += ": "
case @object
when 0 # string or error (that's a bug)
if @data.length==2 and @data[0]==0xFF then # error
str += (ERRORS[@data[1]] or "unknown")
else # string
data = @data.pack("C*")
str_end = data.index("\0")
str_end = data.length unless str_end
str += data[0,str_end]
end
when 1,6,8,9 # strings
data = @data.pack("C*")
str_end = data.index("\0")
str_end = data.length unless str_end
str += data[0,str_end]
when 2,3,4 # float
str += @data.pack("C*").unpack("g")[0].to_s
when 19 # id
str += if @data == [0x00,0x10] then
"PS 2000 B Single"
elsif @data == [0x00,0x18] then
"PS 2000 B Triple"
else
"unknown"
end
when 38,39,50,51 # percentage
str += (@data.pack("C*").unpack("n")[0]/256.0).round(3).to_s
when 54 # changes
changes = []
changes << ((@data[1]&0x01)==0 ? "output off" : "output on")
changes[-1] += " (changed)" if (@data[0]&0x01)!=0
changes << ((@data[1]&0x0A)==0 ? nil : "acknowledge alarm")
changes[-1] += " (changed)" if (@data[0]&0x0A)!=0
changes << ((@data[1]&0x10)==0 ? "manual control" : "remote control")
changes[-1] += " (changed)" if (@data[0]&0x10)!=0
changes << "tracking on" if @data[1]&0xF0 == 0xF0
changes << "tracking off" if @data[1]&0xF0 == 0xF0
changes[-1] += " (changed)" if (@data[0]&0xF0)!=0
str += changes.compact*", "
when 71,72 # status + values
status = []
status << if @data[0]&0x03 == 0x00 then
"free access"
elsif @data[0]&0x03 == 0x01 then
"free access"
else
"unknown access"
end
status << ((@data[1]&(1<<1))==0 ? "output off" : "output on")
status << if @data[1]&0x06 == 0x00 then
"constant voltage"
elsif @data[1]&0x06 == 0x04 then
"constant current"
else
"unknown controller state"
end
status << ((@data[1]&(1<<3))==0 ? "tracking off" : "tracking on")
status << ((@data[1]&(1<<4))==0 ? "over-voltage protection off" : "over-voltage protection on")
status << ((@data[1]&(1<<5))==0 ? "over-current protection off" : "over-current protection on")
status << ((@data[1]&(1<<6))==0 ? "over-power protection off" : "over-power protection on")
status << ((@data[1]&(1<<7))==0 ? "over-temperature protection off" : "over-temperature protection on")
str += status.compact*", "
str += ", voltage %: "+(@data[2,2].pack("C*").unpack("n")[0]/256.0).round(3).to_s
str += ", current01 %: "+(@data[4,2].pack("C*").unpack("n")[0]/256.0).round(3).to_s
when 152 # write memory
case @data[0]
when 0x33
str += "select section #{@data[1]}"
when 0x34
str += "flush?"
when 0x30
str += "end of write?"
else
str += "page #{@data[0]}: "
str += @data[1..-1].collect { |b| sprintf("%02X ",b) }.join
end
when 255
str += (ERRORS[@data[0]] or "unknown")
else
str += @data.collect { |b| sprintf("%02X ",b) }.join if @data and !@data.empty?
end
end
return str
end
end