####################################################################################################################### # Copyright (C) 2024, Aechelon Technology, Inc. All Rights Reserved. # THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF AECHELON TECHNOLOGY, INC. # The copyright notice and/or Restricted Rights Legend below does not evidence any actual or intended publication or # disclosure of this source code, which includes information that is the confidential and/or proprietary to, and is a # trade secret of, Aechelon Technology, Inc. Any use, duplication or disclosure not specifically authorized in writing # by Aechelon Technology is strictly prohibited. # # U.S. GOVERNMENT RESTRICTED RIGHTS LEGEND. Use, duplication or disclosure by the Government is subject to restrictions # as set forth in FAR 52.227.14, DFARS 252.227-7014 and/or in similar or successor clauses in the FAR, DFARS or any FAR # Supplement. Unpublished-- rights reserved under the copyright laws of the United States. Contractor/manufacturer is # Aechelon Technology, Inc., 888 Brannan Street, Suite 210, San Francisco, CA 94103. ####################################################################################################################### # DMS Client Simulator # Implements protocol defined in # //depot/sw_design/runtime/Calibration/VITAL Communication Interface for Alignment Systems.docx import socket import struct from argparse import ArgumentParser, RawDescriptionHelpFormatter # Constants from VITAL communication interface DMS_START = 0x53 DMS_LOAD = 0x1d DMS_SET = 0x24 DMS_GET = 0x23 DMS_TEST_PATTERN = 0x0100 DMS_TP_ILLUMINATION = 0x696c DMS_EDGE_MASK = 0x6562 DMS_EDGE_BLEND = 0x7862 DMS_EYEPOINT = 0x7263 DMS_REFRESH_RENDER = 0x7272 DMS_SYSTEM_STATUS = 0x7374 def initArgParser(parser): parser.add_argument("--server", required=True, help="Server name") parser.add_argument("--port", required=False, type=int, default=8002, help="Server port") def printStatus(reply): if len(reply) == 4: unpacked = struct.unpack('!I', reply) print("Received status %x" % unpacked[0]) elif len(reply) == 12: unpacked = struct.unpack('!f f f', reply) print("red=%f green=%f blue=%f" % unpacked) else: print("Unexpected reply length: %d" % len(reply)) def sendCommand(command, datum, data): if command == DMS_GET: header = (DMS_START, command, datum) packer = struct.Struct('!B B I') packed_header = packer.pack(*header) sock.sendall(packed_header) else: header = (DMS_START, command, datum, len(data)) packer = struct.Struct('!B B I I') packed_header = packer.pack(*header) if isinstance(data, str): sock.sendall((packed_header.decode('utf-8') + data).encode('utf-8')) else: sock.sendall(packed_header + data) replyLength = sock.recv(4) reply = sock.recv(struct.unpack('!I', replyLength)[0]) printStatus(reply) def connectToServer(server, port): global sock sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serverAddress = (server, port) sock.connect(serverAddress) def composeDatum(datum, channel): return channel * 65536 + datum def menu(): print("") print("Choose a DMS command by number:") print("================================") print("1. Load Test Pattern") print("2. Unload Test Pattern") print("3. Set Test Pattern Illumination") print("4. Get Test Pattern Illumination") print("5. Set Edge Mask") print("6. Get Edge Mask State") print("7. Set Edge Blends") print("8. Get Edge Blends State") print("9. Select Eyepoint") print("10. Get Eyepoint Selection") print("11. Refresh Render") print("12. Get System Status") print("13. Quit") print("================================") choice = int(input("Choice: ")) if choice == 13: sock.close() print("Bye.") exit() elif choice < 1 or choice > 13: print("Invalid option") return else: channel = int(input("Enter Channel Number: ")) if choice == 1: payload = input("Enter Test Pattern Name: ") payload += '\0' # Add null-terminator sendCommand(DMS_LOAD, composeDatum(DMS_TEST_PATTERN, channel), payload) elif choice == 2: print("Unloading Test Pattern") sendCommand(DMS_LOAD, composeDatum(DMS_TEST_PATTERN, channel), "") elif choice == 3: red = float(input("Enter Red Illimination: ")) green = float(input("Enter Green Illumination: ")) blue = float(input("Enter Blue Illumination: ")) payload = struct.pack('!f f f', red, green, blue) sendCommand(DMS_SET, composeDatum(DMS_TP_ILLUMINATION, channel), payload) elif choice == 4: print("Querying illumination for channel %d..." % channel) sendCommand(DMS_GET, composeDatum(DMS_TP_ILLUMINATION, channel), "") elif choice == 5: state = int(input("Enter State: ")) payload = struct.pack('!I', state) sendCommand(DMS_SET, composeDatum(DMS_EDGE_MASK, channel), payload) elif choice == 6: print("Querying Mask State for channel %d..." % channel) sendCommand(DMS_GET, composeDatum(DMS_EDGE_MASK, channel), "") elif choice == 7: state = int(input("Enter State: ")) payload = struct.pack('!I', state) sendCommand(DMS_SET, composeDatum(DMS_EDGE_BLEND, channel), payload) elif choice == 8: print("Querying Blend State for channel %d..." % channel) sendCommand(DMS_GET, composeDatum(DMS_EDGE_BLEND, channel), "") elif choice == 9: index = int(input("Enter Selection: ")) payload = struct.pack('!I', index) sendCommand(DMS_SET, composeDatum(DMS_EYEPOINT, channel), payload) elif choice == 10: print("Querying Eyepoint for channel %d..." % channel) sendCommand(DMS_GET, composeDatum(DMS_EYEPOINT, channel), "") elif choice == 11: print("Refreshing Render for channel %d..." % channel) payload = struct.pack('!I', 0x01) sendCommand(DMS_SET, composeDatum(DMS_REFRESH_RENDER, channel), payload) elif choice == 12: print("Querying System Status of channel %d..." % channel) sendCommand(DMS_GET, composeDatum(DMS_SYSTEM_STATUS, channel), "") else: print("Unknown Command.") def main(): print("DMS Client Simulator") parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, description=__doc__) initArgParser(parser) args = parser.parse_args() if args.server is None: parser.error("You must specify --server") print("Connecting to %s:%d" % (args.server, args.port)) connectToServer(args.server, args.port) while True: menu() if __name__ == "__main__": main()