Examples
This section provides examples of how to use tmodbus in various scenarios.
Async TCP Client
1"""Example of an asynchronous TCP Modbus client using tmodbus."""
2
3import asyncio
4
5from tmodbus import create_async_tcp_client
6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
7
8
9async def example_tcp_client() -> None:
10 """Asynchronous TCP Modbus client example."""
11 # Replace with your Modbus server's IP and port
12 host = "127.0.0.1"
13 port = 502
14
15 unit_id = 1 # Modbus unit ID of the target device
16
17 # The create_async_tcp_client function returns an instance of AsyncModbusClient
18 client = create_async_tcp_client(host, port, unit_id=unit_id)
19
20 try:
21 await client.connect()
22 # Read 2 holding registers starting at address 100
23 response = await client.read_holding_registers(start_address=100, quantity=2)
24
25 print("Contents of holding registers 100 and 101: ", response)
26
27 client_for_unit_id_2 = client.for_unit_id(2)
28 response2 = await client_for_unit_id_2.read_holding_registers(start_address=100, quantity=2)
29 print("Contents of holding registers 100 and 101 for unit ID 2: ", response2)
30
31 response6 = await client.for_unit_id(6).read_float(start_address=1)
32 print("Float value at address 1 for unit ID 6: ", response6)
33
34 # Write value 123 to holding register at address 1
35 await client.write_single_register(address=1, value=123)
36 print("Wrote 123 to holding register at address 1")
37
38 # Write values [10, 20, 30] to holding registers starting at address 10
39 await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
40 print("Wrote [10, 20, 30] to holding registers starting at address 10")
41
42 except ModbusResponseError as e:
43 print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
44 except InvalidResponseError as e:
45 print(f"Received invalid response: {e}")
46 except ModbusConnectionError as e:
47 print(f"A connection error occurred: {e}")
48 finally:
49 await client.disconnect()
50
51 # Alternatively, you can use the client as an async context manager
52 # which automatically handles connection and disconnection
53 async with create_async_tcp_client(host, port, unit_id=unit_id) as client2:
54 print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
55
56
57if __name__ == "__main__":
58 asyncio.run(example_tcp_client())
Async RTU Client
1"""Example of an asynchronous RTU Modbus client using tmodbus."""
2
3import asyncio
4
5from tmodbus import create_async_rtu_client
6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
7
8
9async def example_rtu_client() -> None:
10 """Asynchronous RTU Modbus client example."""
11 # Replace with your Modbus server's IP and por
12 port = "/dev/ttyUSB0"
13 baudrate = 9600 # Adjust baudrate as needed
14
15 unit_id = 1 # Modbus unit ID of the target device
16
17 # The create_async_rtu_client function returns an instance of AsyncModbusClient
18 client = create_async_rtu_client(port, baudrate=baudrate, unit_id=unit_id)
19
20 try:
21 await client.connect()
22 # Read 2 holding registers starting at address 100
23 coils_response = await client.read_holding_registers(start_address=100, quantity=2)
24
25 print("Contents of holding registers 100 and 101: ", coils_response)
26
27 # Write value 123 to holding register at address 1
28 await client.write_single_register(address=1, value=123)
29
30 # Write values [10, 20, 30] to holding registers starting at address 10
31 await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
32
33 except ModbusResponseError as e:
34 print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
35 except InvalidResponseError as e:
36 print(f"Received invalid response: {e}")
37 except ModbusConnectionError as e:
38 print(f"A connection error occurred: {e}")
39 finally:
40 await client.disconnect()
41
42 # Alternatively, you can use the client as an async context manager
43 # which automatically handles connection and disconnection
44 async with create_async_rtu_client(port, baudrate=baudrate, unit_id=unit_id) as client2:
45 print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
46
47
48if __name__ == "__main__":
49 asyncio.run(example_rtu_client())
Async TCP over SSL Client
1"""Example of an asynchronous TCP Modbus client over an SSL connection using tmodbus."""
2
3import asyncio
4import ssl
5
6from tmodbus import create_async_tcp_client
7from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
8
9
10async def example_tcp_client() -> None:
11 """Asynchronous TCP Modbus client example."""
12 # Replace with your Modbus server's IP and port
13 host = "127.0.0.1"
14 port = 502
15
16 unit_id = 1 # Modbus unit ID of the target device
17
18 # You can pass any additional parameters supported by `asyncio.open_connection`
19 # For example, to use an SSL context, you can set `ssl=True` or provide a custom SSLContext
20
21 # The create_async_tcp_client function returns an instance of AsyncModbusClient
22 client = create_async_tcp_client(host, port, unit_id=unit_id, ssl=True)
23
24 try:
25 await client.connect()
26 # Read 2 holding registers starting at address 100
27 coils_response = await client.read_holding_registers(start_address=100, quantity=2)
28
29 print("Contents of holding registers 100 and 101: ", coils_response)
30
31 # Write value 123 to holding register at address 1
32 await client.write_single_register(address=1, value=123)
33
34 # Write values [10, 20, 30] to holding registers starting at address 10
35 await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
36
37 except ModbusResponseError as e:
38 print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
39 except InvalidResponseError as e:
40 print(f"Received invalid response: {e}")
41 except ModbusConnectionError as e:
42 print(f"A connection error occurred: {e}")
43 finally:
44 await client.disconnect()
45
46 # Alternatively, you can use the client as an async context manager
47 # which automatically handles connection and disconnection.
48 # We also demonstrate passing of a custom SSL context here.
49
50 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
51 ssl_context.load_default_certs(purpose=ssl.Purpose.SERVER_AUTH)
52 ssl_context.check_hostname = True
53 ssl_context.verify_mode = ssl.CERT_REQUIRED
54
55 async with create_async_tcp_client(host, port, unit_id=unit_id, ssl=ssl_context) as client2:
56 print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
57
58
59if __name__ == "__main__":
60 asyncio.run(example_tcp_client())
Async ASCII Client
1"""Example of an asynchronous ASCII Modbus client using tmodbus."""
2
3import asyncio
4
5from tmodbus import create_async_ascii_client
6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
7
8
9async def example_ascii_client() -> None:
10 """Asynchronous RTU Modbus client example."""
11 # Replace with your Modbus server's IP and por
12 port = "/dev/ttyUSB0"
13 baudrate = 9600 # Adjust baudrate as needed
14
15 unit_id = 1 # Modbus unit ID of the target device
16
17 # The create_async_ascii_client function returns an instance of AsyncModbusClient
18 client = create_async_ascii_client(port, baudrate=baudrate, unit_id=unit_id)
19
20 try:
21 await client.connect()
22 # Read 2 holding registers starting at address 100
23 coils_response = await client.read_holding_registers(start_address=100, quantity=2)
24
25 print("Contents of holding registers 100 and 101: ", coils_response)
26
27 # Write value 123 to holding register at address 1
28 await client.write_single_register(address=1, value=123)
29
30 # Write values [10, 20, 30] to holding registers starting at address 10
31 await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
32
33 except ModbusResponseError as e:
34 print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
35 except InvalidResponseError as e:
36 print(f"Received invalid response: {e}")
37 except ModbusConnectionError as e:
38 print(f"A connection error occurred: {e}")
39 finally:
40 await client.disconnect()
41
42 # Alternatively, you can use the client as an async context manager
43 # which automatically handles connection and disconnection
44 async with create_async_ascii_client(port, baudrate=baudrate, unit_id=unit_id) as client2:
45 print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
46
47
48if __name__ == "__main__":
49 asyncio.run(example_ascii_client())
Async RTU over TCP Client
1"""Example of an asynchronous RTU over TCP Modbus client using tmodbus."""
2
3import asyncio
4
5from tmodbus import create_async_rtu_over_tcp_client
6from tmodbus.exceptions import InvalidResponseError, ModbusConnectionError, ModbusResponseError
7
8
9async def example_rtu_over_tcp_client() -> None:
10 """Asynchronous RTU over TCP Modbus client example."""
11 # Replace with your Modbus server's IP and port
12 host = "127.0.0.1"
13 port = 502
14 unit_id = 1
15
16 unit_id = 1 # Modbus unit ID of the target device
17
18 # The create_async_rtu_client function returns an instance of AsyncModbusClient
19 client = create_async_rtu_over_tcp_client(host, port, unit_id=unit_id)
20
21 try:
22 await client.connect()
23 # Read 2 holding registers starting at address 100
24 coils_response = await client.read_holding_registers(start_address=100, quantity=2)
25
26 print("Contents of holding registers 100 and 101: ", coils_response)
27
28 # Write value 123 to holding register at address 1
29 await client.write_single_register(address=1, value=123)
30
31 # Write values [10, 20, 30] to holding registers starting at address 10
32 await client.write_multiple_registers(start_address=10, values=[10, 20, 30])
33
34 except ModbusResponseError as e:
35 print(f"The server responded with error code {e.error_code:#04x} for function {e.function_code:#04x}")
36 except InvalidResponseError as e:
37 print(f"Received invalid response: {e}")
38 except ModbusConnectionError as e:
39 print(f"A connection error occurred: {e}")
40 finally:
41 await client.disconnect()
42
43 # Alternatively, you can use the client as an async context manager
44 # which automatically handles connection and disconnection
45 async with create_async_rtu_over_tcp_client(host, port, unit_id=unit_id) as client2:
46 print("Status of coils 0-7: ", await client2.read_coils(start_address=0, quantity=8))
47
48
49if __name__ == "__main__":
50 asyncio.run(example_rtu_over_tcp_client())