Source code for pysisyphus.socket_helper

import struct

import numpy as np


NINE_ZEROS = struct.pack("d" * 9, *[0.0] * 9)
EYE3 = struct.pack("d" * 9, *np.eye(3).flatten())


[docs] def send_closure(sock, hdrlen, fmts, verbose=False): def send_msg(msg, fmt=None, packed=False): """Send message through socket. If fmt is None and the message is not packed (converted to bytes), it is either convert to bytes via struct with the given fmt, or it is assumed to be a str and encoded as ascii. Strings are padded to a length of hdrlen (header length, default=12). If it is already packed the message is sent as is, e.g. for the virial, which is already in bytes (packed). """ if not packed: if fmt == "floats": msg = struct.pack(fmts[fmt], *msg) elif fmt is not None: msg = struct.pack(fmts[fmt], msg) else: msg = f"{msg: <{hdrlen}}".encode("ascii") if verbose: print(f"SENDING: {msg}") sock.sendall(msg) return send_msg
"""Vllt. recv_msg so verändern, dass es, wenn ein fmt gegeben ist auch automatisch die korrekte anzahl an bytes erwartet"""
[docs] def recv_closure(sock, hdrlen, fmts, verbose=False): def recv_msg(nbytes=None, fmt=None, expect=""): """Receive a message of given length from the socket. If nbytes is not given we expect hdrlen (header length) bytes. If fmt is given the message is also unpacked using struct, otherwise ascii bytes are assumed and a string is returned. """ if nbytes is None: nbytes = hdrlen msg = sock.recv(nbytes) if fmt: msg = struct.unpack(fmts[fmt], msg) else: msg = msg.decode("ascii").strip() if verbose: if expect: expect = f", expected '{expect}'" print(f"RECEIVED: {msg}{expect}") return msg return recv_msg
[docs] def get_fmts(cartesians): fmts = { "int": "i", # Atom number "float": "d", # Energy "nine_floats": "d" * 9, # (Inverse) cell vectors, virial, zeros "floats": "d" * cartesians, # Forces "floats_sq": "d" * cartesians**2, # Hessian } return fmts