Examples

This section provides examples of how to use tmodbus in various scenarios.

Async TCP Client

 1"""Example of an asynchronous TCP Modbus client using tmodbus."""
 2
 3import asyncio
 4
 5from tmodbus import create_async_tcp_client
 6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
 7
 8
 9async def example_tcp_client() -> None:
10    """Asynchronous TCP Modbus client example."""
11    # Replace with your Modbus server's IP and port
12    host = "127.0.0.1"
13    port = 502
14
15    unit_id = 1  # Modbus unit ID of the target device
16
17    # The create_async_tcp_client function returns an instance of AsyncModbusClient
18    client = create_async_tcp_client(host, port, unit_id=unit_id)
19
20    try:
21        await client.connect()
22        # Read 2 holding registers starting at address 100
23        response = await client.read_holding_registers(start_address=100, quantity=2)
24
25        print("Contents of holding registers 100 and 101: ", response)
26
27        client_for_unit_id_2 = client.for_unit_id(2)
28        response2 = await client_for_unit_id_2.read_holding_registers(start_address=100, quantity=2)
29        print("Contents of holding registers 100 and 101 for unit ID 2: ", response2)
30
31        response6 = await client.for_unit_id(6).read_float(start_address=1)
32        print("Float value at address 1 for unit ID 6: ", response6)
33
34        # Write value 123 to holding register at address 1
35        await client.write_single_register(address=1, value=123)
36        print("Wrote 123 to holding register at address 1")
37
38        # Write values [10, 20, 30] to holding registers starting at address 10
39        await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
40        print("Wrote [10, 20, 30] to holding registers starting at address 10")
41
42    except ModbusResponseError as e:
43        print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
44    except InvalidResponseError as e:
45        print(f"Received invalid response: {e}")
46    except ModbusConnectionError as e:
47        print(f"A connection error occurred: {e}")
48    finally:
49        await client.disconnect()
50
51    # Alternatively, you can use the client as an async context manager
52    # which automatically handles connection and disconnection
53    async with create_async_tcp_client(host, port, unit_id=unit_id) as client2:
54        print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
55
56
57if __name__ == "__main__":
58    asyncio.run(example_tcp_client())

Async RTU Client

 1"""Example of an asynchronous RTU Modbus client using tmodbus."""
 2
 3import asyncio
 4
 5from tmodbus import create_async_rtu_client
 6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
 7
 8
 9async def example_rtu_client() -> None:
10    """Asynchronous RTU Modbus client example."""
11    # Replace with your Modbus server's IP and por
12    port = "/dev/ttyUSB0"
13    baudrate = 9600  # Adjust baudrate as needed
14
15    unit_id = 1  # Modbus unit ID of the target device
16
17    # The create_async_rtu_client function returns an instance of AsyncModbusClient
18    client = create_async_rtu_client(port, baudrate=baudrate, unit_id=unit_id)
19
20    try:
21        await client.connect()
22        # Read 2 holding registers starting at address 100
23        coils_response = await client.read_holding_registers(start_address=100, quantity=2)
24
25        print("Contents of holding registers 100 and 101: ", coils_response)
26
27        # Write value 123 to holding register at address 1
28        await client.write_single_register(address=1, value=123)
29
30        # Write values [10, 20, 30] to holding registers starting at address 10
31        await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
32
33    except ModbusResponseError as e:
34        print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
35    except InvalidResponseError as e:
36        print(f"Received invalid response: {e}")
37    except ModbusConnectionError as e:
38        print(f"A connection error occurred: {e}")
39    finally:
40        await client.disconnect()
41
42    # Alternatively, you can use the client as an async context manager
43    # which automatically handles connection and disconnection
44    async with create_async_rtu_client(port, baudrate=baudrate, unit_id=unit_id) as client2:
45        print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
46
47
48if __name__ == "__main__":
49    asyncio.run(example_rtu_client())

Async TCP over SSL Client

 1"""Example of an asynchronous TCP Modbus client over an SSL connection using tmodbus."""
 2
 3import asyncio
 4import ssl
 5
 6from tmodbus import create_async_tcp_client
 7from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
 8
 9
10async def example_tcp_client() -> None:
11    """Asynchronous TCP Modbus client example."""
12    # Replace with your Modbus server's IP and port
13    host = "127.0.0.1"
14    port = 502
15
16    unit_id = 1  # Modbus unit ID of the target device
17
18    # You can pass any additional parameters supported by `asyncio.open_connection`
19    # For example, to use an SSL context, you can set `ssl=True` or provide a custom SSLContext
20
21    # The create_async_tcp_client function returns an instance of AsyncModbusClient
22    client = create_async_tcp_client(host, port, unit_id=unit_id, ssl=True)
23
24    try:
25        await client.connect()
26        # Read 2 holding registers starting at address 100
27        coils_response = await client.read_holding_registers(start_address=100, quantity=2)
28
29        print("Contents of holding registers 100 and 101: ", coils_response)
30
31        # Write value 123 to holding register at address 1
32        await client.write_single_register(address=1, value=123)
33
34        # Write values [10, 20, 30] to holding registers starting at address 10
35        await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
36
37    except ModbusResponseError as e:
38        print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
39    except InvalidResponseError as e:
40        print(f"Received invalid response: {e}")
41    except ModbusConnectionError as e:
42        print(f"A connection error occurred: {e}")
43    finally:
44        await client.disconnect()
45
46    # Alternatively, you can use the client as an async context manager
47    # which automatically handles connection and disconnection.
48    # We also demonstrate passing of a custom SSL context here.
49
50    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
51    ssl_context.load_default_certs(purpose=ssl.Purpose.SERVER_AUTH)
52    ssl_context.check_hostname = True
53    ssl_context.verify_mode = ssl.CERT_REQUIRED
54
55    async with create_async_tcp_client(host, port, unit_id=unit_id, ssl=ssl_context) as client2:
56        print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
57
58
59if __name__ == "__main__":
60    asyncio.run(example_tcp_client())

Async ASCII Client

 1"""Example of an asynchronous ASCII Modbus client using tmodbus."""
 2
 3import asyncio
 4
 5from tmodbus import create_async_ascii_client
 6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
 7
 8
 9async def example_ascii_client() -> None:
10    """Asynchronous RTU Modbus client example."""
11    # Replace with your Modbus server's IP and por
12    port = "/dev/ttyUSB0"
13    baudrate = 9600  # Adjust baudrate as needed
14
15    unit_id = 1  # Modbus unit ID of the target device
16
17    # The create_async_ascii_client function returns an instance of AsyncModbusClient
18    client = create_async_ascii_client(port, baudrate=baudrate, unit_id=unit_id)
19
20    try:
21        await client.connect()
22        # Read 2 holding registers starting at address 100
23        coils_response = await client.read_holding_registers(start_address=100, quantity=2)
24
25        print("Contents of holding registers 100 and 101: ", coils_response)
26
27        # Write value 123 to holding register at address 1
28        await client.write_single_register(address=1, value=123)
29
30        # Write values [10, 20, 30] to holding registers starting at address 10
31        await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
32
33    except ModbusResponseError as e:
34        print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
35    except InvalidResponseError as e:
36        print(f"Received invalid response: {e}")
37    except ModbusConnectionError as e:
38        print(f"A connection error occurred: {e}")
39    finally:
40        await client.disconnect()
41
42    # Alternatively, you can use the client as an async context manager
43    # which automatically handles connection and disconnection
44    async with create_async_ascii_client(port, baudrate=baudrate, unit_id=unit_id) as client2:
45        print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
46
47
48if __name__ == "__main__":
49    asyncio.run(example_ascii_client())

Async RTU over TCP Client

 1"""Example of an asynchronous RTU over TCP Modbus client using tmodbus."""
 2
 3import asyncio
 4
 5from tmodbus import create_async_rtu_over_tcp_client
 6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
 7
 8
 9async def example_rtu_over_tcp_client() -> None:
10    """Asynchronous RTU over TCP Modbus client example."""
11    # Replace with your Modbus server's IP and port
12    host = "127.0.0.1"
13    port = 502
14    unit_id = 1
15
16    unit_id = 1  # Modbus unit ID of the target device
17
18    # The create_async_rtu_client function returns an instance of AsyncModbusClient
19    client = create_async_rtu_over_tcp_client(host, port, unit_id=unit_id)
20
21    try:
22        await client.connect()
23        # Read 2 holding registers starting at address 100
24        coils_response = await client.read_holding_registers(start_address=100, quantity=2)
25
26        print("Contents of holding registers 100 and 101: ", coils_response)
27
28        # Write value 123 to holding register at address 1
29        await client.write_single_register(address=1, value=123)
30
31        # Write values [10, 20, 30] to holding registers starting at address 10
32        await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
33
34    except ModbusResponseError as e:
35        print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
36    except InvalidResponseError as e:
37        print(f"Received invalid response: {e}")
38    except ModbusConnectionError as e:
39        print(f"A connection error occurred: {e}")
40    finally:
41        await client.disconnect()
42
43    # Alternatively, you can use the client as an async context manager
44    # which automatically handles connection and disconnection
45    async with create_async_rtu_over_tcp_client(host, port, unit_id=unit_id) as client2:
46        print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
47
48
49if __name__ == "__main__":
50    asyncio.run(example_rtu_over_tcp_client())