11
\$\begingroup\$

I have implemented the gossip algorithm in distributed systems for review.

Gossip class

import random
import socket
from threading import Thread
import time


class GossipNode:
    # hold infected nodes
    infected_nodes = []

    # initialization method.
    # pass the port of the node and the ports of the nodes connected to it
    def __init__(self, port, connected_nodes):
        # create a new socket instance
        # use SOCK_DGRAM to be able to send data without a connection
        # being established (connectionless protocol)
        self.node = socket.socket(type=socket.SOCK_DGRAM)

        # set the address, i.e(hostname and port) of the socket
        self.hostname = socket.gethostname()
        self.port = port

        # bind the address to the socket created
        self.node.bind((self.hostname, self.port))

        # set the ports of the nodes connected to it as susceptible nodes
        self.susceptible_nodes = connected_nodes

        print("Node started on port {0}".format(self.port))
        print("Susceptible nodes =>", self.susceptible_nodes)

        # call the threads to begin the magic
        self.start_threads()

    def input_message(self):
        while True:
            # input message to send to all nodes
            message_to_send = input("Enter a message to send:\n")

            # call send message method and pass the input message.
            # encode the message into ascii
            self.transmit_message(message_to_send.encode('ascii'))

    def receive_message(self):
        while True:
            # since we are using connectionless protocol,
            # we will use 'recvfrom' to receive UDP message
            message_to_forward, address = self.node.recvfrom(1024)

            # remove the port(node), from which the message came from,
            # from the list of susceptible nodes and
            # add it to the list of infected nodes
            self.susceptible_nodes.remove(address[1])
            GossipNode.infected_nodes.append(address[1])

            # sleep for 2 seconds in order to show difference in time
            time.sleep(2)

            # print message with the current time.
            # decode message so as to print it, as it was sent
            print("\nMessage is: '{0}'.\nReceived at [{1}] from [{2}]\n"
                  .format(message_to_forward.decode('ascii'), time.ctime(time.time()), address[1]))

            # call send message to forward the message to other susceptible(connected) nodes
            self.transmit_message(message_to_forward)

    def transmit_message(self, message):
        # loop as long as there are susceptible(connected) ports(nodes) to send to
        while self.susceptible_nodes:
            # select a random port from the list of susceptible nodes
            selected_port = random.choice(self.susceptible_nodes)

            print("\n")
            print("-"*50)
            print("Susceptible nodes =>", self.susceptible_nodes)
            print("Infected nodes =>", GossipNode.infected_nodes)
            print("Port selected is [{0}]".format(selected_port))

            # since we are using connectionless protocol,
            # we will use 'sendto' to transmit the UDP message
            self.node.sendto(message, (self.hostname, selected_port))

            # remove the node which the message has been sent to,
            # from the list of susceptible nodes and
            # add it to the list of infected nodes
            self.susceptible_nodes.remove(selected_port)
            GossipNode.infected_nodes.append(selected_port)

            print("Message: '{0}' sent to [{1}].".format(message.decode('ascii'), selected_port))
            print("Susceptible nodes =>", self.susceptible_nodes)
            print("Infected nodes =>", GossipNode.infected_nodes)
            print("-"*50)
            time.sleep(2)
            print("\n")

    def start_threads(self):
        # two threads for entering and getting a message.
        # it will enable each node to be able to
        # enter a message and still be able to receive a message
        Thread(target=self.input_message).start()
        Thread(target=self.receive_message).start()

Nodes: to be run separately

Node_5000

# import the GossipNode class
from Gossip import GossipNode

# port for this node
port = 5000
# ports for the nodes connected to this node
connected_nodes = [5010, 5020]

node = GossipNode(port, connected_nodes)

Node_5010

# import the GossipNode class
from Gossip import GossipNode

# port for this node
port = 5010
# ports for the nodes connected to this node
connected_nodes = [5000, 5030, 5040]

node = GossipNode(port, connected_nodes)

Node_5020

# import the GossipNode class
from Gossip import GossipNode

# port for this node
port = 5020
# ports for the nodes connected to this node
connected_nodes = [5000]

node = GossipNode(port, connected_nodes)

Node_5030

# import the GossipNode class
from Gossip import GossipNode

# port for this node
port = 5030
# ports for the nodes connected to this node
connected_nodes = [5010]

node = GossipNode(port, connected_nodes)

Node_5040

# import the GossipNode class
from Gossip import GossipNode

# port for this node
port = 5040
# ports for the nodes connected to this node
connected_nodes = [5010]

node = GossipNode(port, connected_nodes)
\$\endgroup\$
1
  • 5
    \$\begingroup\$ Can you edit the question to explain what your implementation does? "Gossip" refers to a class of related protocols, not to a particular algorithm, so we need you to tell us a bit more about it. \$\endgroup\$ Commented Jul 3, 2015 at 10:39

4 Answers 4

4
\$\begingroup\$

First off, you should be using docstrings to describe your functions and classes, not regular comments. A typical docstring might look something like this:

def my_func( ... ):
    """
    Describe my_func and it's arguments
    in detail here.
    """
    ...

Secondly, you have a few useless comments, like # sleep for 2 seconds in order to show difference in time, or # call the threads to begin the magic. Comments like these are fairly useless and can be removed.

Other than that, this code is really nice looking!

\$\endgroup\$
1
  • \$\begingroup\$ Why the downvote? \$\endgroup\$ Commented Jul 3, 2015 at 15:28
3
\$\begingroup\$

I've edited the code you wrote above to make it such that the nodes would continue to listen and broadcast messages even after the first message. I don't know if it was intentional, but it seems that your script is removing the connected nodes after just the first message, which I find weird... In any case, try the code below and let me know what you think! The only thing that I have not added yet, is that if the same message is sent twice, the nodes will ignore as they think that they have already received it. This can be fixed by adding timestamps to the messages, and I will work on that soon!

import random
import socket
from threading import Thread
import time
from datetime import datetime


class GossipNode:
    # pass the port of the node and the ports of the nodes connected to it
    def __init__(self, port, connected_nodes):
        # create a new socket instance
        # use SOCK_DGRAM to be able to send data without a connection
        # being established (connectionless protocol)
        self.node = socket.socket(type=socket.SOCK_DGRAM)
        self.previous_message = ''

        # set the address, i.e(hostname and port) of the socket
        self.hostname = socket.gethostname()
        self.port = port

        # bind the address to the socket created
        self.node.bind((self.hostname, self.port))

        # set the ports of the nodes connected to it as susceptible nodes
        self.susceptible_nodes = connected_nodes

        # call the threads to begin the magic
        self.start_threads()

    def input_message(self):
        while True:
            # input message to send to all nodes
            message_to_send = input("Enter a message to send: ")

            # call send message method and pass the input message.
            # encode the message into ascii
            self.transmit_message(message_to_send.encode('ascii'), 0)

    def receive_message(self):
        while True:

            # since we are using connectionless protocol,
            # we will use 'recvfrom' to receive UDP message
            
            message_to_forward, address = self.node.recvfrom(1024)
            

            if self.previous_message == message_to_forward:
                continue
            self.previous_message = message_to_forward


            previous_node = address[1]
            print('\nMessage to forward, address: {}, {}'.format(message_to_forward, address[1]))

            # sleep for 2 seconds in order to show difference in time
            time.sleep(1)

            # print message with the current time.
            # decode message so as to print it, as it was sent
            print("\nReceived message: '{0}'. From [{1}]"
                    .format(message_to_forward.decode('ascii'), address[1]))

            # call send message to forward the message to other susceptible(connected) nodes
            self.transmit_message(message_to_forward, previous_node)

    def transmit_message(self, message, previous_node=0):
        for i in range(len(self.susceptible_nodes)):
            selected_port = self.susceptible_nodes[i]

            if selected_port == previous_node:
                continue

            # since we are using connectionless protocol,
            # we will use 'sendto' to transmit the UDP message
            self.node.sendto(message, (self.hostname, selected_port))

            time.sleep(1)

    def start_threads(self):
        # two threads for entering and getting a message.
        # it will enable each node to be able to
        # enter a message and still be able to receive a message
        Thread(target=self.input_message).start()
        Thread(target=self.receive_message).start()
\$\endgroup\$
2
  • 1
    \$\begingroup\$ Welcome to Code Review! It appears the narrative does offer critique of the OP’s code and thus is a valid review. Bear in mind that one can provide fully rewritten code in the answer of a Code Review question but it is not required. \$\endgroup\$ Commented Jan 17, 2023 at 0:38
  • \$\begingroup\$ Thanks for the welcome and the tip! The above code is technically a fully rewritten answer I believe. I will change it so as to add a bit more explanation about the change I have made :) \$\endgroup\$ Commented Jan 17, 2023 at 9:16
3
\$\begingroup\$

I think you can make your code better by making the following changes:

  • Some of the comments are kind of useless. The code is more or less self explanatory. For example the comment in the following piece of code is useless:

    # remove the node which the message has been sent to
    # from the list of susceptible nodes and
    # add it to the list of infected nodes
    self.susceptible_nodes.remove(selected_port)
    GossipNode.infected_nodes.append(selected_port)
    

    Here the comments are making the code less readable.

  • The following comment is also useless since sendto function's definition clearly says when it is used.

    # since we are using connectionless protocol,
    # we will use 'sendto' to transmit the UDP message
    
  • Also try writing shorter comments. You can usually skip the logic behind using the function in the comments, unless it is like a very important part of your code and is not clear from the function name.

  • Instead of using different code for different machines, use a single code. Save the port information in a separate file and read the port from that file.

    from Gossip import GossipNode
    FILE_NAME = "randomFile.txt"
    port = read_port(FILE_NAME)
    connected_nodes = [5010]
    …
    

I am also removing the comments from this code. It is clear from the code what you are trying to do and the comments are not adding any useful information.

\$\endgroup\$
7
  • \$\begingroup\$ i'll cut down on the comments. what would i achieve by saving the port information in a separate file? \$\endgroup\$ Commented Jul 4, 2015 at 10:39
  • \$\begingroup\$ Saving port information in a file ensures that the same code can be used in all the machines. It is always better to have a code which satisfies DRY (Don't Repeat Yourself) principle. Assume a case when you need to change something in the repeated code. Now you need to make changes in each of those files. If you had used the same file in all machines and lets assume they were syncd using SVN, then you just need to make change in one place. \$\endgroup\$ Commented Jul 4, 2015 at 10:49
  • \$\begingroup\$ Also in the future if you decide on what port should be used by each system, you just need to change that separate file (maybe the read_port function too) and voila your code will do your job. I hope this helps :) \$\endgroup\$ Commented Jul 4, 2015 at 10:49
  • \$\begingroup\$ each node (socket) starts on its own port. i can't start two nodes with the same port number. \$\endgroup\$ Commented Jul 4, 2015 at 10:51
  • \$\begingroup\$ In that case the separate file can store a mapping between the machine's ip with the port number. read_port will just return the port number for that machine. \$\endgroup\$ Commented Jul 4, 2015 at 10:57
3
\$\begingroup\$

Outside of refining the documentation - docstrings, comments, and type hints - I'd suggest:

Improving input and output

Some of your prints seem more suitable for a logger (logging). For example print("Susceptible nodes =>", self.susceptible_nodes), which could be an info or debug level log message - something like logger.debug(f"{self.susceptible_nodes=}").

I'd also switch to using f-strings in general for readability, and to avoid manually styling text (e.g. print("\n") and print("-"*50)).

Create a node manager

Instead of manually keeping track of your nodes and which ports they use, you could create a class that does this. You'd just store a list of GossipNodes, and then you can optionally check if the port is free, and even make port an optional argument (you'd automatically find an available port if that argument isn't passed).

Offer a way out of the NoReturns (the infinite loops)

Instead of just having a blank while True, you could have e.g.

try:
    while True:
        # loop
except KeyboardInterrupt:
    # exit
\$\endgroup\$

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.